From 762aba8929c9fc351e24522a85f46432deb3bca3 Mon Sep 17 00:00:00 2001 From: hll1213181368 Date: Mon, 29 Sep 2025 19:50:32 +0800 Subject: [PATCH 1/3] Fix sourceNodeClusterInfo.MigratingSlot maybe nil when tryUpdateMigrationStatus --- controller/cluster.go | 44 +++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index 5484311..afd51a6 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -338,48 +338,72 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu ).Error("Failed to get the cluster info from the source node", zap.Error(err)) continue } - if !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) { - log.Error("Mismatch migrating slot", + + // If there is no migration information on the master node, you need to clear the migration information on the controller. + if sourceNodeClusterInfo.MigratingSlot == nil { + log.Error("Mismatch migrating slot, no migrating info on node", zap.Int("shard_index", i), - zap.String("source_migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()), + zap.String("source_node", sourceNode.Addr()), zap.String("migrating_slot", shard.MigratingSlot.String()), ) + clonedCluster.Shards[i].ClearMigrateState() + if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { + log.Error("Failed to update the migrate state by UpdateCluster method", zap.Error(err)) + return + } + c.updateCluster(clonedCluster) + continue + } + // If the migration information on the master node is inconsistent with the controller, you need to clear the migration information on the controller. + if sourceNodeClusterInfo.MigratingSlot != nil && + !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) { + log.Error("Mismatch migrating slot", + zap.Int("shard_index", i), + zap.String("source node cluster info migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()), + zap.String("controller migrating_slot", shard.MigratingSlot.String()), + ) + clonedCluster.Shards[i].ClearMigrateState() + if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { + log.Error("Failed to update the migrate state by UpdateCluster method", zap.Error(err)) + return + } + c.updateCluster(clonedCluster) continue } + if shard.TargetShardIndex < 0 || shard.TargetShardIndex >= len(clonedCluster.Shards) { log.Error("Invalid target shard index", zap.Int("index", shard.TargetShardIndex)) return } + migratingSlot := shard.MigratingSlot.String() switch sourceNodeClusterInfo.MigratingState { case "none", "start": continue case "fail": - migratingSlot := shard.MigratingSlot clonedCluster.Shards[i].ClearMigrateState() - if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { + if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) return } c.updateCluster(clonedCluster) - log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot.String())) + log.Warn("Failed to migrate the slot", zap.String("slot", migratingSlot)) case "success": clonedCluster.Shards[i].SlotRanges = store.RemoveSlotFromSlotRanges(clonedCluster.Shards[i].SlotRanges, shard.MigratingSlot.SlotRange) clonedCluster.Shards[shard.TargetShardIndex].SlotRanges = store.AddSlotToSlotRanges( clonedCluster.Shards[shard.TargetShardIndex].SlotRanges, shard.MigratingSlot.SlotRange, ) - migratedSlot := shard.MigratingSlot clonedCluster.Shards[i].ClearMigrateState() - if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { + if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) return } else { - log.Info("Migrate the slot successfully", zap.String("slot", migratedSlot.String())) + log.Info("Migrate the slot successfully", zap.String("slot", migratingSlot)) } c.updateCluster(clonedCluster) default: clonedCluster.Shards[i].ClearMigrateState() - if err := c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { + if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { log.Error("Failed to update the cluster", zap.Error(err)) return } From 2982f642e7b78f4d0027172039fd36db1336d6c4 Mon Sep 17 00:00:00 2001 From: Lele Huang Date: Fri, 3 Oct 2025 21:30:21 +0800 Subject: [PATCH 2/3] Update cluster.go merge two if judges --- controller/cluster.go | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/controller/cluster.go b/controller/cluster.go index afd51a6..f3fe449 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -339,28 +339,12 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu continue } - // If there is no migration information on the master node, you need to clear the migration information on the controller. - if sourceNodeClusterInfo.MigratingSlot == nil { - log.Error("Mismatch migrating slot, no migrating info on node", - zap.Int("shard_index", i), - zap.String("source_node", sourceNode.Addr()), - zap.String("migrating_slot", shard.MigratingSlot.String()), - ) - clonedCluster.Shards[i].ClearMigrateState() - if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { - log.Error("Failed to update the migrate state by UpdateCluster method", zap.Error(err)) - return - } - c.updateCluster(clonedCluster) - continue - } - // If the migration information on the master node is inconsistent with the controller, you need to clear the migration information on the controller. - if sourceNodeClusterInfo.MigratingSlot != nil && - !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange) { + // If there is no migration information on the source node or source node migratingslot is not eauqls shard, you need to clear the migration information on the controller. + if sourceNodeClusterInfo.MigratingSlot == nil || (sourceNodeClusterInfo.MigratingSlot != nil && + !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange)) { log.Error("Mismatch migrating slot", zap.Int("shard_index", i), - zap.String("source node cluster info migrating_slot", sourceNodeClusterInfo.MigratingSlot.String()), - zap.String("controller migrating_slot", shard.MigratingSlot.String()), + zap.String("migrating_slot", shard.MigratingSlot.String()), ) clonedCluster.Shards[i].ClearMigrateState() if err = c.clusterStore.UpdateCluster(ctx, c.namespace, clonedCluster); err != nil { From bc722751be0f8152f82c348f7a8531474f86b016 Mon Sep 17 00:00:00 2001 From: hulk Date: Sat, 4 Oct 2025 15:34:18 +0800 Subject: [PATCH 3/3] Update controller/cluster.go --- controller/cluster.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/controller/cluster.go b/controller/cluster.go index f3fe449..752ec60 100755 --- a/controller/cluster.go +++ b/controller/cluster.go @@ -339,7 +339,8 @@ func (c *ClusterChecker) tryUpdateMigrationStatus(ctx context.Context, clonedClu continue } - // If there is no migration information on the source node or source node migratingslot is not eauqls shard, you need to clear the migration information on the controller. + // If there is no migration information on the source node or the source node migration slot is not equal to the shard, + // you need to clear the migration information on the controller. if sourceNodeClusterInfo.MigratingSlot == nil || (sourceNodeClusterInfo.MigratingSlot != nil && !sourceNodeClusterInfo.MigratingSlot.Equal(shard.MigratingSlot.SlotRange)) { log.Error("Mismatch migrating slot",