From be56696410e0cfe35d9a58ba799d8514854ac155 Mon Sep 17 00:00:00 2001 From: Kristijonas Zalys Date: Thu, 30 Oct 2025 21:53:49 -0700 Subject: [PATCH 1/4] Ensure auto-repair orphaned node cleanup happens regardless of when the node last ran repair --- CHANGES.txt | 1 + .../repair/autorepair/AutoRepair.java | 6 +- .../repair/AutoRepairOrphanCleanupTest.java | 222 ++++++++++++++++++ 3 files changed, 227 insertions(+), 2 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java diff --git a/CHANGES.txt b/CHANGES.txt index b1290e1fe867..cb825353ea9f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Ensure auto-repair orphaned node cleanup happens regardless of when the node last ran repair (CASSANDRA-20995) * Add cqlsh autocompletion for the identity mapping feature (CASSANDRA-20021) * Add DDL Guardrail enabling administrators to disallow creation/modification of keyspaces with durable_writes = false (CASSANDRA-20913) * Optimize Counter, Meter and Histogram metrics using thread local counters (CASSANDRA-20250) diff --git a/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java b/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java index 0cfe2fa208a6..8ebeabca436e 100644 --- a/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java +++ b/src/java/org/apache/cassandra/repair/autorepair/AutoRepair.java @@ -188,13 +188,15 @@ public void repair(AutoRepairConfig.RepairType repairType) //consistency level to use for local query UUID myId = StorageService.instance.getHostIdForEndpoint(FBUtilities.getBroadcastAddressAndPort()); - // If it's too soon to run repair, don't bother checking if it's our turn. + // Calculate repair turn first - this also cleans up orphan nodes from auto-repair system tables + RepairTurn turn = AutoRepairUtils.myTurnToRunRepair(repairType, myId); + + // Check if it's too soon to run repair after calculating turn to ensure cleanup happens if (tooSoonToRunRepair(repairType, repairState, config, myId)) { return; } - RepairTurn turn = AutoRepairUtils.myTurnToRunRepair(repairType, myId); if (turn == MY_TURN || turn == MY_TURN_DUE_TO_PRIORITY || turn == MY_TURN_FORCE_REPAIR) { repairState.recordTurn(turn); diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java new file mode 100644 index 000000000000..d25b684a8503 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.repair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import org.apache.cassandra.Util; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.repair.autorepair.AutoRepair; +import org.apache.cassandra.repair.autorepair.AutoRepairConfig; +import org.apache.cassandra.service.AutoRepairService; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.cassandra.schema.SchemaConstants.DISTRIBUTED_KEYSPACE_NAME; +import static org.apache.cassandra.schema.SystemDistributedKeyspace.AUTO_REPAIR_HISTORY; +import static org.junit.Assert.assertEquals; + +/** + * Test that verifies orphan nodes are cleaned up from auto_repair_history even when repairs + * are skipped due to min_repair_interval constraints. + */ +public class AutoRepairOrphanCleanupTest extends TestBaseImpl +{ + private static Cluster cluster; + + @BeforeClass + public static void init() throws IOException + { + // Configure a 3-node cluster with auto_repair enabled but with a very high min_repair_interval + // This ensures that when we test, repairs will be skipped due to "too soon to repair" + cluster = Cluster.build(3) + .withTokenCount(4) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3, 4)) + .withConfig(config -> config + .set("num_tokens", 4) + .set("auto_repair", + ImmutableMap.of( + "repair_check_interval", "1s", + "repair_type_overrides", + ImmutableMap.of(AutoRepairConfig.RepairType.FULL.getConfigName(), + ImmutableMap.builder() + .put("initial_scheduler_delay", "0s") + .put("enabled", "true") + // Set very high min_repair_interval to ensure repairs are skipped + .put("min_repair_interval", "24h") + .put("allow_parallel_replica_repair", "true") + .put("repair_by_keyspace", "true") + .build()))) + .set("auto_repair.enabled", "true")) + .start(); + } + + @AfterClass + public static void tearDown() + { + cluster.close(); + } + + @Test + public void testOrphanNodeCleanupWhenRepairSkipped() + { + // Insert 3 auto-repair records for each live node and 1 for the orphan node + List liveHostIds = new ArrayList<>(); + for (int i = 1; i <= 3; i++) + { + liveHostIds.add( + cluster.get(i).callOnInstance(() -> + StorageService.instance.getHostIdForEndpoint( + FBUtilities.getBroadcastAddressAndPort()))); + } + UUID orphanHostId = UUID.randomUUID(); + + long currentTime = System.currentTimeMillis(); + // Orphan node: oldest finish time, so it is next in line to run repair + long orphanStart = currentTime - TimeUnit.HOURS.toMillis(4); // 4 hours ago + long orphanFinish = currentTime - TimeUnit.HOURS.toMillis(3); // 3 hours ago + + // Live nodes: more recent finish times + long[] liveStart = { + currentTime - TimeUnit.HOURS.toMillis(3), // 3 hours ago + currentTime - TimeUnit.HOURS.toMillis(2), // 2 hours ago + currentTime - TimeUnit.HOURS.toMillis(1) // 1 hour ago + }; + long[] liveFinish = { + currentTime - TimeUnit.HOURS.toMillis(2), // 2 hours ago + currentTime - TimeUnit.HOURS.toMillis(1), // 1 hour ago + currentTime // now + }; + + // Insert live node records + for (int i = 0; i < 3; i++) + { + cluster.coordinator(1).execute(String.format( + "INSERT INTO %s.%s (repair_type, host_id, repair_start_ts, repair_finish_ts, repair_turn) " + + "VALUES ('%s', %s, %d, %d, '%s')", + DISTRIBUTED_KEYSPACE_NAME, + AUTO_REPAIR_HISTORY, + AutoRepairConfig.RepairType.FULL, + liveHostIds.get(i), + liveStart[i], + liveFinish[i], + "NOT_MY_TURN" + ), ConsistencyLevel.QUORUM); + } + + // Insert orphan node record (should be next in line to run repair) + cluster.coordinator(1).execute(String.format( + "INSERT INTO %s.%s (repair_type, host_id, repair_start_ts, repair_finish_ts, repair_turn) " + + "VALUES ('%s', %s, %d, %d, '%s')", + DISTRIBUTED_KEYSPACE_NAME, + AUTO_REPAIR_HISTORY, + AutoRepairConfig.RepairType.FULL, + orphanHostId, + orphanStart, + orphanFinish, + "NOT_MY_TURN" + ), ConsistencyLevel.QUORUM); + + // Once the auto_repair_history table is prepared, initialize auto-repair service on all nodes + cluster.forEach(i -> i.runOnInstance(() -> { + try + { + AutoRepairService.setup(); + AutoRepair.instance.setup(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + })); + + // Wait for at least one auto-repair cycle to allow orphan cleanup to run + // (auto_repair.repair_check_interval is set to 1s, so 10s is plenty) + Util.spinAssertEquals(true, () -> { + Object[][] rows = cluster.coordinator(1).execute( + String.format("SELECT host_id FROM %s.%s WHERE repair_type = '%s'", + DISTRIBUTED_KEYSPACE_NAME, + AUTO_REPAIR_HISTORY, + AutoRepairConfig.RepairType.FULL), + ConsistencyLevel.QUORUM + ); + // The orphanHostId should not be present in the results + for (Object[] row : rows) + { + UUID hostId = (UUID) row[0]; + if (hostId.equals(orphanHostId)) + return false; + } + return true; + }, 10_000); + + // Query all records for this repair type + Object[][] rows = cluster.coordinator(1).execute( + String.format("SELECT host_id, repair_start_ts, repair_finish_ts FROM %s.%s WHERE repair_type = '%s'", + DISTRIBUTED_KEYSPACE_NAME, + AUTO_REPAIR_HISTORY, + AutoRepairConfig.RepairType.FULL), + ConsistencyLevel.QUORUM + ); + + // Check that the orphan node's record is gone, and live nodes' records remain with unchanged timestamps + Map actualLiveTimestamps = new HashMap<>(); + for (Object[] row : rows) + { + UUID hostId = (UUID) row[0]; + long startTs = ((Date) row[1]).getTime(); + long finishTs = ((Date) row[2]).getTime(); + if (hostId.equals(orphanHostId)) + { + throw new AssertionError("Orphan node record was not cleaned up"); + } + actualLiveTimestamps.put(hostId, new long[]{startTs, finishTs}); + } + + // All live nodes should still be present with their original timestamps + assertEquals("Unexpected number of live node records", 3, actualLiveTimestamps.size()); + for (int i = 0; i < 3; i++) + { + UUID hostId = liveHostIds.get(i); + long[] expected = new long[]{liveStart[i], liveFinish[i]}; + long[] actual = actualLiveTimestamps.get(hostId); + if (actual == null) + throw new AssertionError("Live node record missing for hostId: " + hostId); + assertEquals("Live node repair_start_ts changed for hostId: " + hostId, expected[0], actual[0]); + assertEquals("Live node repair_finish_ts changed for hostId: " + hostId, expected[1], actual[1]); + } + } +} \ No newline at end of file From affdcdfdec6e163c53e2bdee739b5805c835c379 Mon Sep 17 00:00:00 2001 From: Kristijonas Zalys Date: Thu, 30 Oct 2025 21:57:20 -0700 Subject: [PATCH 2/4] Update CHANGES.txt --- CHANGES.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.txt b/CHANGES.txt index cb825353ea9f..7522583cbff5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,5 @@ 5.1 - * Ensure auto-repair orphaned node cleanup happens regardless of when the node last ran repair (CASSANDRA-20995) + * Do not wait for repair interval to pass before cleaning up orphaned auto-repair data (CASSANDRA-20995) * Add cqlsh autocompletion for the identity mapping feature (CASSANDRA-20021) * Add DDL Guardrail enabling administrators to disallow creation/modification of keyspaces with durable_writes = false (CASSANDRA-20913) * Optimize Counter, Meter and Histogram metrics using thread local counters (CASSANDRA-20250) From 1ec86e7e1fc1ffd9294bd47a5b597ba1d97c4257 Mon Sep 17 00:00:00 2001 From: Kristijonas Zalys Date: Mon, 10 Nov 2025 17:24:56 -0800 Subject: [PATCH 3/4] Cleanup --- .../distributed/test/repair/AutoRepairOrphanCleanupTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java index d25b684a8503..9517bcb2e8b5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java @@ -75,7 +75,8 @@ public static void init() throws IOException ImmutableMap.builder() .put("initial_scheduler_delay", "0s") .put("enabled", "true") - // Set very high min_repair_interval to ensure repairs are skipped + // Set very high min_repair_interval + // to ensure repairs are skipped .put("min_repair_interval", "24h") .put("allow_parallel_replica_repair", "true") .put("repair_by_keyspace", "true") @@ -219,4 +220,4 @@ public void testOrphanNodeCleanupWhenRepairSkipped() assertEquals("Live node repair_finish_ts changed for hostId: " + hostId, expected[1], actual[1]); } } -} \ No newline at end of file +} From d7c1d13dbbb526f0787e2299f35b9ad2b90e2f7b Mon Sep 17 00:00:00 2001 From: Kristijonas Zalys Date: Thu, 4 Dec 2025 10:23:28 -0800 Subject: [PATCH 4/4] Address comment --- .../repair/AutoRepairOrphanCleanupTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java index 9517bcb2e8b5..92d4bdda6469 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/repair/AutoRepairOrphanCleanupTest.java @@ -151,6 +151,28 @@ public void testOrphanNodeCleanupWhenRepairSkipped() "NOT_MY_TURN" ), ConsistencyLevel.QUORUM); + // Validate that all 4 records (3 live nodes + 1 orphan) are present before starting auto-repair + Object[][] initialRows = cluster.coordinator(1).execute( + String.format("SELECT host_id FROM %s.%s WHERE repair_type = '%s'", + DISTRIBUTED_KEYSPACE_NAME, + AUTO_REPAIR_HISTORY, + AutoRepairConfig.RepairType.FULL), + ConsistencyLevel.QUORUM + ); + assertEquals("Expected 4 records (3 live nodes + 1 orphan) before auto-repair starts", 4, initialRows.length); + boolean orphanFound = false; + int liveNodesFound = 0; + for (Object[] row : initialRows) + { + UUID hostId = (UUID) row[0]; + if (hostId.equals(orphanHostId)) + orphanFound = true; + else if (liveHostIds.contains(hostId)) + liveNodesFound++; + } + assertEquals("All 3 live nodes should be present", 3, liveNodesFound); + assertEquals("Orphan node should be present", true, orphanFound); + // Once the auto_repair_history table is prepared, initialize auto-repair service on all nodes cluster.forEach(i -> i.runOnInstance(() -> { try