Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,10 @@ void clusterCommand(client *c) {
addReplyBulkCString(c,ni);
sdsfree(ni);
}
} else if (!strcasecmp(c->argv[1]->ptr, "migration")) {
clusterMigrationCommand(c);
} else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) {
clusterSyncSlotsCommand(c);
} else if(!clusterCommandSpecial(c)) {
addReplySubcommandSyntaxError(c);
return;
Expand Down Expand Up @@ -2127,3 +2131,84 @@ void resetClusterStats(void) {

clusterSlotStatResetAll();
}

/* This function is called at server startup in order to initialize cluster data
* structures that are shared between the different cluster implementations. */
void clusterCommonInit(void) {
server.cluster_slot_stats = malloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Use of raw malloc instead of zmalloc breaks memory tracking

The original code in server.c:main() used zmalloc() to allocate cluster_slot_stats. The new clusterCommonInit() uses raw malloc() instead. This means:\n\n1. The allocation is invisible to Redis's INFO memory reporting and used_memory tracking\n2. Memory fragmentation metrics will be inaccurate\n3. If Redis is configured with maxmemory, this allocation won't count toward the limit\n4. It's inconsistent with every other allocation in cluster.c (all use zmalloc)\n\nThe PR context notes mention "Use malloc instead of zmalloc for allocation (consistent with pattern)" but this appears incorrect — no other allocation in cluster.c uses raw malloc.\n\nFix: Change malloc to zmalloc to match the original code and Redis conventions.

Was this helpful? React with 👍 / 👎

Suggested change
server.cluster_slot_stats = malloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
server.cluster_slot_stats = zmalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
  • Apply suggested fix

resetClusterStats();
asmInit();
}

/* This function is called after the node startup in order to check if there
* are any slots that we have keys for, but are not assigned to us. If so,
* we delete the keys. */
void clusterDeleteKeysInUnownedSlots(void) {
if (clusterNodeIsSlave(getMyClusterNode())) return;

/* Check that all the slots we have keys for are assigned to us. Otherwise,
* delete the keys. */
for (int i = 0; i < CLUSTER_SLOTS; i++) {
/* Skip if: no keys in the slot, it's our slot, or we are importing it. */
if (!countKeysInSlot(i) ||
clusterIsMySlot(i) ||
getImportingSlotSource(i))
{
continue;
}

serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is "
"assigned to another node. "
"Deleting keys in the slot.", i);
/* With atomic slot migration, it is safe to drop keys from slots
* that are not owned. This will not result in data loss under the
* legacy slot migration approach either, since the importing state
* has already been persisted in node.conf. */
clusterDelKeysInSlot(i, 0);
}
}


/* This function is called after the node startup in order to verify that data
* loaded from disk is in agreement with the cluster configuration:
*
* 1) If we find keys about hash slots we have no responsibility for, the
* following happens:
* A) If no other node is in charge according to the current cluster
* configuration, we add these slots to our node.
* B) If according to our config other nodes are already in charge for
* this slots, we set the slots as IMPORTING from our point of view
* in order to justify we have those slots, and in order to make
* redis-cli aware of the issue, so that it can try to fix it.
* 2) If we find data in a DB different than DB0 we return C_ERR to
* signal the caller it should quit the server with an error message
* or take other actions.
*
* The function always returns C_OK even if it will try to correct
* the error described in "1". However if data is found in DB different
* from DB0, C_ERR is returned.
*
* The function also uses the logging facility in order to warn the user
* about desynchronizations between the data we have in memory and the
* cluster configuration. */
int verifyClusterConfigWithData(void) {
/* Return ASAP if a module disabled cluster redirections. In that case
* every master can store keys about every possible hash slot. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return C_OK;

/* If this node is a slave, don't perform the check at all as we
* completely depend on the replication stream. */
if (clusterNodeIsSlave(getMyClusterNode())) return C_OK;

/* Make sure we only have keys in DB0. */
for (int i = 1; i < server.dbnum; i++) {
if (kvstoreSize(server.db[i].keys)) return C_ERR;
}

/* Delete keys in unowned slots */
clusterDeleteKeysInUnownedSlots();
/* Take over slots that we have keys for, but are assigned to no one. */
clusterClaimUnassignedSlots();
Comment on lines +2209 to +2212
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚨 Bug: Two-pass refactor causes data loss for keys in unassigned slots

The original verifyClusterConfigWithData() used a single pass through all slots. For each slot with keys that wasn't owned by this node and wasn't being imported, it checked:\n- If unassigned (NULL owner) → claim the slot (no data loss)\n- If assigned to another node → delete the keys\n\nThe refactored code splits this into two sequential passes:\n1. clusterDeleteKeysInUnownedSlots() — deletes keys in slots we don't own\n2. clusterClaimUnassignedSlots() — claims unassigned slots that have keys\n\nThe problem: clusterIsMySlot(i) returns false for unassigned slots (since getNodeBySlot() returns NULL ≠ myself). So in step 1, keys in unassigned slots are deleted. Then in step 2, countKeysInSlot(i) returns 0 for those same slots (keys were just deleted), so they are skipped and never claimed.\n\nResult: keys in unassigned slots are silently destroyed instead of being preserved by claiming the slot. This is a data loss regression.\n\nFix: Reverse the call order — call clusterClaimUnassignedSlots() before clusterDeleteKeysInUnownedSlots(), so unassigned slots are claimed first (preserving keys), and only then are keys in slots owned by other nodes deleted.

Was this helpful? React with 👍 / 👎

Suggested change
/* Delete keys in unowned slots */
clusterDeleteKeysInUnownedSlots();
/* Take over slots that we have keys for, but are assigned to no one. */
clusterClaimUnassignedSlots();
/* Take over slots that we have keys for, but are assigned to no one. */
clusterClaimUnassignedSlots();
/* Delete keys in unowned slots */
clusterDeleteKeysInUnownedSlots();
  • Apply suggested fix

return C_OK;
}
2 changes: 2 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ static inline unsigned int keyHashSlot(const char *key, int keylen) {
/* functions requiring mechanism specific implementations */
void clusterInit(void);
void clusterInitLast(void);
void clusterCommonInit(void);
void clusterCron(void);
void clusterBeforeSleep(void);
void clusterClaimUnassignedSlots(void);
int verifyClusterConfigWithData(void);

int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
Expand Down
114 changes: 27 additions & 87 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1036,10 +1036,8 @@ void clusterInit(void) {
clusterUpdateMyselfIp();
clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename();
resetClusterStats();

getRandomHexChars(server.cluster->internal_secret, CLUSTER_INTERNALSECRETLEN);
asmInit();
}

void clusterInitLast(void) {
Expand Down Expand Up @@ -4934,9 +4932,6 @@ void clusterCron(void) {

if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();

/* Atomic slot migration cron */
asmCron();
}

/* This function is called before the event handler returns to sleep for
Expand Down Expand Up @@ -4978,8 +4973,6 @@ void clusterBeforeSleep(void) {
/* Broadcast a PONG to all the nodes. */
if (flags & CLUSTER_TODO_BROADCAST_PONG)
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

asmBeforeSleep();
}

void clusterDoBeforeSleep(int flags) {
Expand Down Expand Up @@ -5252,82 +5245,6 @@ void clusterUpdateState(void) {
}
}

/* This function is called after the node startup in order to verify that data
* loaded from disk is in agreement with the cluster configuration:
*
* 1) If we find keys about hash slots we have no responsibility for, the
* following happens:
* A) If no other node is in charge according to the current cluster
* configuration, we add these slots to our node.
* B) If according to our config other nodes are already in charge for
* this slots, we set the slots as IMPORTING from our point of view
* in order to justify we have those slots, and in order to make
* redis-cli aware of the issue, so that it can try to fix it.
* 2) If we find data in a DB different than DB0 we return C_ERR to
* signal the caller it should quit the server with an error message
* or take other actions.
*
* The function always returns C_OK even if it will try to correct
* the error described in "1". However if data is found in DB different
* from DB0, C_ERR is returned.
*
* The function also uses the logging facility in order to warn the user
* about desynchronizations between the data we have in memory and the
* cluster configuration. */
int verifyClusterConfigWithData(void) {
int j;
int update_config = 0;

/* Return ASAP if a module disabled cluster redirections. In that case
* every master can store keys about every possible hash slot. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return C_OK;

/* If this node is a slave, don't perform the check at all as we
* completely depend on the replication stream. */
if (nodeIsSlave(myself)) return C_OK;

/* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) {
if (kvstoreSize(server.db[j].keys)) return C_ERR;
}

/* Check that all the slots we see populated memory have a corresponding
* entry in the cluster table. Otherwise fix the table. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (!countKeysInSlot(j)) continue; /* No keys in this slot. */
/* Check if we are assigned to this slot or if we are importing it.
* In both cases check the next slot as the configuration makes
* sense. */
if (server.cluster->slots[j] == myself ||
server.cluster->importing_slots_from[j] != NULL) continue;

/* If we are here data and cluster config don't agree, and we have
* slot 'j' populated even if we are not importing it, nor we are
* assigned to this slot. Fix this condition. */

update_config++;
/* Case A: slot is unassigned. Take responsibility for it. */
if (server.cluster->slots[j] == NULL) {
serverLog(LL_NOTICE, "I have keys for unassigned slot %d. "
"Taking responsibility for it.",j);
clusterAddSlot(myself,j);
} else {
serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is "
"assigned to another node. "
"Deleting keys in the slot.", j);

/* With atomic slot migration, it is safe to drop keys from slots
* that are not owned. This will not result in data loss under the
* legacy slot migration approach either, since the importing state
* has already been persisted in node.conf. */
clusterDelKeysInSlot(j, 0);
}
}
if (update_config) clusterSaveConfigOrDie(1);
return C_OK;
}

/* Remove all the shard channel related information not owned by the current shard. */
static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
if (!kvstoreSize(server.pubsubshard_channels)) return;
Expand All @@ -5339,6 +5256,33 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
}
}

/* This function is called after the node startup in order to check if there
* are any slots that we have keys for, but are assigned to no one. If so,
* we take ownership of them. */
void clusterClaimUnassignedSlots(void) {
if (nodeIsSlave(myself)) return;

int update_config = 0;
for (int i = 0; i < CLUSTER_SLOTS; i++) {
/* Skip if: no keys, already has an owner, or we are importing it. */
if (!countKeysInSlot(i) ||
server.cluster->slots[i] != NULL ||
server.cluster->importing_slots_from[i] != NULL)
{
continue;
}

/* If we are here data and cluster config don't agree, and we have
* slot 'i' populated even if we are not importing it, nor anyone else
* is assigned to it. Fix this condition by taking ownership. */
update_config++;
serverLog(LL_NOTICE, "I have keys for unassigned slot %d. "
"Taking responsibility for it.", i);
clusterAddSlot(myself, i);
}
if (update_config) clusterSaveConfigOrDie(1);
}

/* -----------------------------------------------------------------------------
* SLAVE nodes handling
* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -6481,10 +6425,6 @@ int clusterCommandSpecial(client *c) {
} else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) {
/* CLUSTER LINKS */
addReplyClusterLinksDescription(c);
} else if (!strcasecmp(c->argv[1]->ptr, "migration")) {
clusterMigrationCommand(c);
} else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) {
clusterSyncSlotsCommand(c);
} else {
return 0;
}
Expand Down
13 changes: 10 additions & 3 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -1654,7 +1654,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

/* Run the Redis Cluster cron. */
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
if (server.cluster_enabled) {
asmCron();
clusterCron();
}
}

/* Run the Sentinel timer if we are in sentinel mode. */
Expand Down Expand Up @@ -1835,7 +1838,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
* later in this function, must be done before blockedBeforeSleep. */
if (server.cluster_enabled) clusterBeforeSleep();
if (server.cluster_enabled) {
clusterBeforeSleep();
asmBeforeSleep();
}

/* Handle blocked clients.
* must be done before flushAppendOnlyFile, in case of appendfsync=always,
Expand Down Expand Up @@ -7730,7 +7736,8 @@ int main(int argc, char **argv) {
redisAsciiArt();
checkTcpBacklogSettings();
if (server.cluster_enabled) {
server.cluster_slot_stats = zmalloc(CLUSTER_SLOTS*sizeof(clusterSlotStat));
/* clusterCommonInit() initializes slot-stats required by clusterInit() */
clusterCommonInit();
clusterInit();
}
if (!server.sentinel_mode) {
Expand Down
Loading