Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 54 additions & 43 deletions pkg/reconciliation/steps/role/join_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/tarantool/tarantool-operator/pkg/utils"
v1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/utils/strings/slices"
)

type JoinInstancesStep[
Expand Down Expand Up @@ -42,6 +43,8 @@ func (r *JoinInstancesStep[PhaseType, RoleType, CtxType, CtrlType]) Reconcile(ct
return Error(err)
}

var advertiseURIs []string

for key := range stsList.Items {
sts := &stsList.Items[key]
if sts.GetDeletionTimestamp() != nil {
Expand All @@ -58,14 +61,16 @@ func (r *JoinInstancesStep[PhaseType, RoleType, CtxType, CtrlType]) Reconcile(ct
}

allJoined = false
advertiseURIs = nil

continue
break
}

if !utils.IsPodRunning(pod) || utils.IsPodDeleting(pod) {
allJoined = false
advertiseURIs = nil

continue
break
}

instanceUUID, uuidErr := topologyClient.GetInstanceUUID(ctx, pod)
Expand All @@ -76,54 +81,60 @@ func (r *JoinInstancesStep[PhaseType, RoleType, CtxType, CtrlType]) Reconcile(ct
}

if instanceUUID == "" {
leader := ctx.GetLeader()
advertiseURIs = append(advertiseURIs, ctrl.GetReplicasetsManger().GetAdvertiseURI(cluster, pod))
}
}
}

if leader.Name != pod.Name {
switch leaderUUID, uuidErr := topologyClient.GetInstanceUUID(ctx, leader); {
case uuidErr != nil:
ctx.GetLogger().Error(uuidErr, "unable to get leader instance uuid")
if len(advertiseURIs) == 0 {
continue
}

return Error(uuidErr)
case leaderUUID == "":
allJoined = false
leader := ctx.GetLeader()

continue
}
}
if !slices.Contains(advertiseURIs, ctrl.GetReplicasetsManger().GetAdvertiseURI(cluster, leader)) {
switch leaderUUID, uuidErr := topologyClient.GetInstanceUUID(ctx, leader); {
case uuidErr != nil:
ctx.GetLogger().Error(uuidErr, "unable to get leader instance uuid")

vshard := role.GetVShardConfig()
return Error(uuidErr)
case leaderUUID == "":
allJoined = false

alias, err := role.GetReplicasetName(stsOrdinal)
if err != nil {
return Error(err)
}
continue
}
}

joinErr := topologyClient.Join(
ctx,
leader,
alias,
ctrl.GetReplicasetsManger().GetReplicasetUUID(role, stsOrdinal),
vshard.GetRoles(),
vshard.GetWeight(),
vshard.GetGroupName(),
role.IsAllRw(),
ctrl.GetReplicasetsManger().GetAdvertiseURI(cluster, pod),
)
if joinErr != nil {
ctx.GetLogger().Error(joinErr, "unable to join instance")

var configErr *topology.UnknownRoleError
if errors.As(joinErr, &configErr) {
ctrl.GetEventsRecorder().Event(role, NewWrongVShardRolesEvent(configErr))
role.SetPhase(r.ConfigErrorPhase)

return Complete()
}

return Error(joinErr)
}
}
vshard := role.GetVShardConfig()

alias, err := role.GetReplicasetName(stsOrdinal)
if err != nil {
return Error(err)
}

joinErr := topologyClient.Join(
ctx,
leader,
alias,
ctrl.GetReplicasetsManger().GetReplicasetUUID(role, stsOrdinal),
vshard.GetRoles(),
vshard.GetWeight(),
vshard.GetGroupName(),
role.IsAllRw(),
advertiseURIs...,
)
if joinErr != nil {
ctx.GetLogger().Error(joinErr, "unable to join instance")

var configErr *topology.UnknownRoleError
if errors.As(joinErr, &configErr) {
ctrl.GetEventsRecorder().Event(role, NewWrongVShardRolesEvent(configErr))
role.SetPhase(r.ConfigErrorPhase)

return Complete()
}

return Error(joinErr)
}
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/topology/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ func (r *CommonCartridgeTopology) Join(
replicasetWeight int32,
replicasetVshardGroup string,
replicasetIsAllRw bool,
advertiseURI string,
advertiseURIs ...string,
) error {
joinServers := make([]JoinServerParams, len(advertiseURIs))

for i, uri := range advertiseURIs {
joinServers[i].URI = uri
}

editTopology := EditTopologyParams{
Replicasets: []EditReplicasetParams{
{
Expand All @@ -41,9 +47,7 @@ func (r *CommonCartridgeTopology) Join(
Weight: replicasetWeight,
VShardGroup: replicasetVshardGroup,
AllRw: replicasetIsAllRw,
JoinServers: []JoinServerParams{{
URI: advertiseURI,
}},
JoinServers: joinServers,
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/topology/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type CartridgeTopology interface {
replicasetWeight int32,
replicasetVshardGroup string,
replicasetIsAllRw bool,
advertiseURI string,
advertiseURIs ...string,
) error

GetInstanceUUID(ctx context.Context, pod *v1.Pod) (string, error)
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (f *FakeCartridgeTopology) Join(
replicasetWeight int32,
replicasetVshardGroup string,
replicasetIsAllRw bool,
advertiseURI string,
advertiseURI ...string,
) error {
args := f.Called(
ctx,
Expand Down