diff --git a/pkg/reconciliation/steps/role/join_instances.go b/pkg/reconciliation/steps/role/join_instances.go index 75a1c30..a8f8f62 100644 --- a/pkg/reconciliation/steps/role/join_instances.go +++ b/pkg/reconciliation/steps/role/join_instances.go @@ -10,6 +10,7 @@ import ( "github.com/tarantool/tarantool-operator/pkg/utils" v1 "k8s.io/api/core/v1" apiErrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/utils/strings/slices" ) type JoinInstancesStep[ @@ -42,6 +43,8 @@ func (r *JoinInstancesStep[PhaseType, RoleType, CtxType, CtrlType]) Reconcile(ct return Error(err) } + var advertiseURIs []string + for key := range stsList.Items { sts := &stsList.Items[key] if sts.GetDeletionTimestamp() != nil { @@ -58,14 +61,16 @@ func (r *JoinInstancesStep[PhaseType, RoleType, CtxType, CtrlType]) Reconcile(ct } allJoined = false + advertiseURIs = nil - continue + break } if !utils.IsPodRunning(pod) || utils.IsPodDeleting(pod) { allJoined = false + advertiseURIs = nil - continue + break } instanceUUID, uuidErr := topologyClient.GetInstanceUUID(ctx, pod) @@ -76,54 +81,60 @@ func (r *JoinInstancesStep[PhaseType, RoleType, CtxType, CtrlType]) Reconcile(ct } if instanceUUID == "" { - leader := ctx.GetLeader() + advertiseURIs = append(advertiseURIs, ctrl.GetReplicasetsManger().GetAdvertiseURI(cluster, pod)) + } + } + } - if leader.Name != pod.Name { - switch leaderUUID, uuidErr := topologyClient.GetInstanceUUID(ctx, leader); { - case uuidErr != nil: - ctx.GetLogger().Error(uuidErr, "unable to get leader instance uuid") + if len(advertiseURIs) == 0 { + continue + } - return Error(uuidErr) - case leaderUUID == "": - allJoined = false + leader := ctx.GetLeader() - continue - } - } + if !slices.Contains(advertiseURIs, ctrl.GetReplicasetsManger().GetAdvertiseURI(cluster, leader)) { + switch leaderUUID, uuidErr := topologyClient.GetInstanceUUID(ctx, leader); { + case uuidErr != nil: + ctx.GetLogger().Error(uuidErr, "unable to get leader instance uuid") - vshard := role.GetVShardConfig() + return Error(uuidErr) + case leaderUUID == "": + allJoined = false - alias, err := role.GetReplicasetName(stsOrdinal) - if err != nil { - return Error(err) - } + continue + } + } - joinErr := topologyClient.Join( - ctx, - leader, - alias, - ctrl.GetReplicasetsManger().GetReplicasetUUID(role, stsOrdinal), - vshard.GetRoles(), - vshard.GetWeight(), - vshard.GetGroupName(), - role.IsAllRw(), - ctrl.GetReplicasetsManger().GetAdvertiseURI(cluster, pod), - ) - if joinErr != nil { - ctx.GetLogger().Error(joinErr, "unable to join instance") - - var configErr *topology.UnknownRoleError - if errors.As(joinErr, &configErr) { - ctrl.GetEventsRecorder().Event(role, NewWrongVShardRolesEvent(configErr)) - role.SetPhase(r.ConfigErrorPhase) - - return Complete() - } - - return Error(joinErr) - } - } + vshard := role.GetVShardConfig() + + alias, err := role.GetReplicasetName(stsOrdinal) + if err != nil { + return Error(err) + } + + joinErr := topologyClient.Join( + ctx, + leader, + alias, + ctrl.GetReplicasetsManger().GetReplicasetUUID(role, stsOrdinal), + vshard.GetRoles(), + vshard.GetWeight(), + vshard.GetGroupName(), + role.IsAllRw(), + advertiseURIs..., + ) + if joinErr != nil { + ctx.GetLogger().Error(joinErr, "unable to join instance") + + var configErr *topology.UnknownRoleError + if errors.As(joinErr, &configErr) { + ctrl.GetEventsRecorder().Event(role, NewWrongVShardRolesEvent(configErr)) + role.SetPhase(r.ConfigErrorPhase) + + return Complete() } + + return Error(joinErr) } } diff --git a/pkg/topology/common.go b/pkg/topology/common.go index 5cd5109..b68ef39 100644 --- a/pkg/topology/common.go +++ b/pkg/topology/common.go @@ -30,8 +30,14 @@ func (r *CommonCartridgeTopology) Join( replicasetWeight int32, replicasetVshardGroup string, replicasetIsAllRw bool, - advertiseURI string, + advertiseURIs ...string, ) error { + joinServers := make([]JoinServerParams, len(advertiseURIs)) + + for i, uri := range advertiseURIs { + joinServers[i].URI = uri + } + editTopology := EditTopologyParams{ Replicasets: []EditReplicasetParams{ { @@ -41,9 +47,7 @@ func (r *CommonCartridgeTopology) Join( Weight: replicasetWeight, VShardGroup: replicasetVshardGroup, AllRw: replicasetIsAllRw, - JoinServers: []JoinServerParams{{ - URI: advertiseURI, - }}, + JoinServers: joinServers, }, }, } diff --git a/pkg/topology/topology.go b/pkg/topology/topology.go index a5e0963..16382ae 100644 --- a/pkg/topology/topology.go +++ b/pkg/topology/topology.go @@ -19,7 +19,7 @@ type CartridgeTopology interface { replicasetWeight int32, replicasetVshardGroup string, replicasetIsAllRw bool, - advertiseURI string, + advertiseURIs ...string, ) error GetInstanceUUID(ctx context.Context, pod *v1.Pod) (string, error) diff --git a/test/mocks/topology.go b/test/mocks/topology.go index 28a75c7..14cd0ba 100644 --- a/test/mocks/topology.go +++ b/test/mocks/topology.go @@ -29,7 +29,7 @@ func (f *FakeCartridgeTopology) Join( replicasetWeight int32, replicasetVshardGroup string, replicasetIsAllRw bool, - advertiseURI string, + advertiseURI ...string, ) error { args := f.Called( ctx,