From c474c3dd1368e37985653c99d33466a56a9f9160 Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Fri, 28 Nov 2025 15:20:28 +0000 Subject: [PATCH] Fix: - DurabilityQueue/ShardScheduler deadlock - MemtableCleanerThread.Cleanup assumes Boolean parameter is non-null, which is invalid if an exception has been thrown - AccordDurableOnFlush may be invoked while Accord is starting up, so should use AccordService.unsafeInstance - AccordCache shrink without lock regression - Cleanup system_accord compaction leftovers before starting up - system_accord_debug.txn order - system_accord_debug.txn_blocked_by order - system_accord_debug.shard_epochs order Improve: - Set DefaultProgressLog.setMode(Catchup) during Catchup - IdentityAccumulators only need to readLast, not readAll - Limit number of static segments we compact at once to sstable - If too many static segments on startup, wait for them to be compacted patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21053 --- modules/accord | 2 +- .../apache/cassandra/config/AccordSpec.java | 7 ++ .../db/virtual/AccordDebugKeyspace.java | 11 +-- .../apache/cassandra/journal/Compactor.java | 17 +++- .../org/apache/cassandra/journal/Journal.java | 21 +++++ .../org/apache/cassandra/journal/Params.java | 5 ++ .../apache/cassandra/journal/Segments.java | 11 +++ .../cassandra/service/StartupChecks.java | 6 ++ .../cassandra/service/accord/AccordCache.java | 3 +- .../service/accord/AccordCacheEntry.java | 7 +- .../service/accord/AccordDurableOnFlush.java | 2 +- .../service/accord/AccordJournal.java | 21 +++-- .../service/accord/AccordJournalTable.java | 38 +++++++- .../service/accord/AccordService.java | 86 +++++++++++-------- .../service/accord/DebugBlockedTxns.java | 6 ++ .../org/apache/cassandra/tcm/Startup.java | 2 +- .../utils/memory/MemtableCleanerThread.java | 2 +- .../db/virtual/AccordDebugKeyspaceTest.java | 37 ++++---- .../apache/cassandra/journal/TestParams.java | 6 ++ 19 files changed, 216 insertions(+), 74 deletions(-) diff --git a/modules/accord b/modules/accord index 973335bfe7e9..8ccce745818c 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 973335bfe7e930c6646016a4a48cff084f49b660 +Subproject commit 8ccce745818cf80c7cff82c3554e4a88e9e540db diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index 1e2379a56473..74fa27580b60 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -197,6 +197,7 @@ public enum MixedTimeSourceHandling public static class JournalSpec implements Params { public int segmentSize = 32 << 20; + public int compactMaxSegments = 32; public FailurePolicy failurePolicy = FailurePolicy.STOP; public ReplayMode replayMode = ReplayMode.ONLY_NON_DURABLE; public FlushMode flushMode = FlushMode.PERIODIC; @@ -225,6 +226,12 @@ public int segmentSize() return segmentSize; } + @Override + public int compactMaxSegments() + { + return compactMaxSegments; + } + @Override public FailurePolicy failurePolicy() { diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index c0c4c82f2935..cb7472d7a562 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -1601,7 +1601,7 @@ private TxnTable() "Accord per-CommandStore Transaction State", "CREATE TABLE %s (\n" + " command_store_id int,\n" + - " txn_id text,\n" + + " txn_id 'TxnIdUtf8Type',\n" + " save_status text,\n" + " route text,\n" + " durability text,\n" + @@ -2089,7 +2089,7 @@ protected TxnBlockedByTable() " blocked_by_txn_id 'TxnIdUtf8Type',\n" + " save_status text,\n" + " execute_at text,\n" + - " PRIMARY KEY (txn_id, command_store_id, depth, blocked_by_key, blocked_by_txn_id)" + + " PRIMARY KEY (txn_id, depth, command_store_id, blocked_by_txn_id, blocked_by_key)" + ')', TxnIdUtf8Type.instance), BEST_EFFORT, ASC); } @@ -2111,7 +2111,7 @@ protected void collect(PartitionsCollector collector) DebugBlockedTxns.visit(AccordService.unsafeInstance(), txnId, maxDepth, collector.deadlineNanos(), txn -> { String keyStr = txn.blockedViaKey == null ? "" : txn.blockedViaKey.toString(); String txnIdStr = txn.txnId == null || txn.txnId.equals(txnId) ? "" : txn.txnId.toString(); - rows.add(txn.commandStoreId, txn.depth, keyStr, txnIdStr) + rows.add(txn.depth, txn.commandStoreId, txnIdStr, keyStr) .eagerCollect(columns -> { columns.add("save_status", txn.saveStatus, TO_STRING) .add("execute_at", txn.executeAt, TO_STRING); @@ -2160,8 +2160,9 @@ private ShardEpochsTable() " quorum_fast_privileged_deps int,\n" + " quorum_fast_privileged_nodeps int,\n" + " token_end 'TokenUtf8Type',\n" + - " PRIMARY KEY (table_id, token_start, epoch_start)" + - ')', UTF8Type.instance), FAIL, ASC); + " PRIMARY KEY (table_id, token_start, epoch_start))," + + " WITH CLUSTERING ORDER BY (token_start, epoch_start DESC)" + , UTF8Type.instance), FAIL, ASC); } @Override diff --git a/src/java/org/apache/cassandra/journal/Compactor.java b/src/java/org/apache/cassandra/journal/Compactor.java index 7a1b19b08862..b6749bc39072 100644 --- a/src/java/org/apache/cassandra/journal/Compactor.java +++ b/src/java/org/apache/cassandra/journal/Compactor.java @@ -18,9 +18,9 @@ package org.apache.cassandra.journal; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; -import java.util.Set; +import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -29,6 +29,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.concurrent.Shutdownable; +import org.apache.cassandra.utils.concurrent.WaitQueue; import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; @@ -40,6 +41,7 @@ public final class Compactor implements Runnable, Shutdownable private final SegmentCompactor segmentCompactor; private final ScheduledExecutorPlus executor; private Future scheduled; + public final WaitQueue compacted = WaitQueue.newWaitQueue(); Compactor(Journal journal, SegmentCompactor segmentCompactor) { @@ -73,11 +75,18 @@ public synchronized void updateCompactionPeriod(int period, TimeUnit units) @Override public void run() { - Set> toCompact = new HashSet<>(); + List> toCompact = new ArrayList<>(); journal.segments().selectStatic(toCompact); if (toCompact.isEmpty()) return; + int limit = journal.params.compactMaxSegments(); + if (toCompact.size() > limit) + { + toCompact.sort(StaticSegment::compareTo); + toCompact.subList(limit, toCompact.size()).clear(); + } + try { Collection> newSegments = segmentCompactor.compact(toCompact); @@ -88,6 +97,8 @@ public void run() journal.replaceCompactedSegments(toCompact, newSegments); for (StaticSegment segment : toCompact) segment.discard(journal); + + compacted.signalAll(); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/journal/Journal.java b/src/java/org/apache/cassandra/journal/Journal.java index 3120eb0916a8..ecfca32a5282 100644 --- a/src/java/org/apache/cassandra/journal/Journal.java +++ b/src/java/org/apache/cassandra/journal/Journal.java @@ -222,6 +222,27 @@ public void start() "Unexpected journal state after initialization", state); flusher.start(); compactor.start(); + + final int maxSegments = 100; + if (segments.get().count(Segment::isStatic) > maxSegments) + { + while (true) + { + WaitQueue.Signal signal = compactor.compacted.register(); + int count = segments.get().count(Segment::isStatic); + if (count <= maxSegments) + { + signal.cancel(); + logger.info("Only {} static segments; continuing with startup", count); + break; + } + else + { + logger.info("Too many ({}) static segments; waiting until some compacted before starting up", count); + signal.awaitThrowUncheckedOnInterrupt(); + } + } + } } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/journal/Params.java b/src/java/org/apache/cassandra/journal/Params.java index 1e898ebce4c1..161165177d21 100644 --- a/src/java/org/apache/cassandra/journal/Params.java +++ b/src/java/org/apache/cassandra/journal/Params.java @@ -31,6 +31,11 @@ enum ReplayMode { RESET, ALL, ONLY_NON_DURABLE } */ int segmentSize(); + /** + * @return maximum number of static segments to compact at once to sstable + */ + int compactMaxSegments(); + /** * @return this journal's {@link FailurePolicy} */ diff --git a/src/java/org/apache/cassandra/journal/Segments.java b/src/java/org/apache/cassandra/journal/Segments.java index bdd447ec6b32..7245029feabf 100644 --- a/src/java/org/apache/cassandra/journal/Segments.java +++ b/src/java/org/apache/cassandra/journal/Segments.java @@ -102,6 +102,17 @@ Iterable> all() return this.segments.values(); } + public int count(Predicate> predicate) + { + int count = 0; + for (Segment segment : segments.values()) + { + if (predicate.test(segment)) + ++count; + } + return count; + } + /** * Returns segments in timestamp order. Will allocate and sort the segment collection. */ diff --git a/src/java/org/apache/cassandra/service/StartupChecks.java b/src/java/org/apache/cassandra/service/StartupChecks.java index 4ec301946578..11a27d7fe89f 100644 --- a/src/java/org/apache/cassandra/service/StartupChecks.java +++ b/src/java/org/apache/cassandra/service/StartupChecks.java @@ -725,6 +725,12 @@ public void execute(StartupChecksOptions options) throws StartupException for (TableMetadata cfm : Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME)) ColumnFamilyStore.scrubDataDirectories(cfm); + if (DatabaseDescriptor.getAccordTransactionsEnabled()) + { + for (TableMetadata cfm : Schema.instance.getTablesAndViews(SchemaConstants.ACCORD_KEYSPACE_NAME)) + ColumnFamilyStore.scrubDataDirectories(cfm); + } + try { SystemKeyspace.checkHealth(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java b/src/java/org/apache/cassandra/service/accord/AccordCache.java index fc794f6deca8..6c6eb167b837 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCache.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java @@ -259,8 +259,7 @@ private void shrinkOrEvict(Lock lock, AccordCacheEntry node) { //noinspection LockAcquiredButNotSafelyReleased lock.lock(); - node.tryApplyShrink(cur, upd); - queue.addLast(node); + node.tryApplyShrink(cur, upd, queue); } } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java index 02ee11c3db0b..26f46334d67a 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCacheEntry.java @@ -30,6 +30,7 @@ import com.google.common.primitives.Ints; import accord.utils.ArrayBuffers.BufferList; +import accord.utils.IntrusiveLinkedList; import accord.utils.IntrusiveLinkedListNode; import accord.utils.Invariants; import accord.utils.async.Cancellable; @@ -595,10 +596,14 @@ public Throwable failure() return ((FailedToSave)state).cause; } - void tryApplyShrink(Object cur, Object upd) + void tryApplyShrink(Object cur, Object upd, IntrusiveLinkedList> queue) { + if (references() > 0 || !isUnqueued()) + return; + if (isLoaded() && unwrap() == cur && upd != cur && upd != null) applyShrink(owner.parent(), cur, upd); + queue.addLast(this); } private void applyShrink(AccordCache.Type parent, Object cur, Object upd) diff --git a/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java b/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java index 96cf75d0cd77..7f1bb70a0424 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java +++ b/src/java/org/apache/cassandra/service/accord/AccordDurableOnFlush.java @@ -58,7 +58,7 @@ public void accept(TableMetadata metadata) notify = commandStores; commandStores = null; } - CommandStores commandStores = AccordService.instance().node().commandStores(); + CommandStores commandStores = AccordService.unsafeInstance().node().commandStores(); for (Map.Entry e : notify.entrySet()) { RedundantBefore durable = e.getValue(); diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java b/src/java/org/apache/cassandra/service/accord/AccordJournal.java index 92c3629abe24..885bbfa96167 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java @@ -189,8 +189,9 @@ public void start(Node node) Invariants.require(status == Status.INITIALIZED); this.node = node; status = Status.STARTING; - journal.start(); + // start table first to scrub directories before compactor starts journalTable.start(); + journal.start(); } public boolean started() @@ -320,28 +321,28 @@ public Command.MinimalWithDeps loadMinimalWithDeps(int commandStoreId, TxnId txn @Override public RedundantBefore loadRedundantBefore(int commandStoreId) { - IdentityAccumulator accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId)); + IdentityAccumulator accumulator = readLast(new JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId)); return accumulator.get(); } @Override public NavigableMap loadBootstrapBeganAt(int commandStoreId) { - IdentityAccumulator> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, commandStoreId)); + IdentityAccumulator> accumulator = readLast(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, commandStoreId)); return accumulator.get(); } @Override public NavigableMap loadSafeToRead(int commandStoreId) { - IdentityAccumulator> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, commandStoreId)); + IdentityAccumulator> accumulator = readLast(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, commandStoreId)); return accumulator.get(); } @Override public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId) { - IdentityAccumulator accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId)); + IdentityAccumulator accumulator = readLast(new JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId)); return accumulator.get(); } @@ -520,6 +521,16 @@ public BUILDER readAll(JournalKey key) return builder; } + public BUILDER readLast(JournalKey key) + { + BUILDER builder = (BUILDER) key.type.serializer.mergerFor(); + builder.reset(key); + // TODO (expected): this can be further improved to avoid allocating lambdas + AccordJournalValueSerializers.FlyweightSerializer serializer = (AccordJournalValueSerializers.FlyweightSerializer) key.type.serializer; + journalTable.readLast(key, (in, userVersion) -> serializer.deserialize(key, builder, in, userVersion)); + return builder; + } + public void forEachEntry(JournalKey key, AccordJournalTable.Reader reader) { journalTable.readAll(key, reader); diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java index 17aff49f86a0..40649cc43299 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java +++ b/src/java/org/apache/cassandra/service/accord/AccordJournalTable.java @@ -365,7 +365,43 @@ public void readAll(K key, RecordConsumer reader) } } } - + + public void readLast(K key, Reader reader) + { + readLast(key, new RecordConsumerAdapter<>(reader)); + } + + public void readLast(K key, RecordConsumer reader) + { + try (TableKeyIterator table = readAllFromTable(key)) + { + boolean hasTableData = table.advance(); + long minSegment = hasTableData ? table.segment : Long.MIN_VALUE; + + class JournalReader implements RecordConsumer + { + boolean read; + @Override + public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion) + { + if (segment > minSegment) + { + reader.accept(segment, position, key, buffer, userVersion); + read = true; + } + } + } + + // First, read all journal entries newer than anything flushed into sstables + JournalReader journalReader = new JournalReader(); + journal.readLast(key, journalReader); + + // Then, read SSTables, if we haven't found a record already + if (hasTableData && !journalReader.read) + reader.accept(table.segment, table.offset, key, table.value, table.userVersion); + } + } + // TODO (expected): why are recordColumn and versionColumn instance fields, so that this cannot be a static class? class TableKeyIterator implements Closeable, RecordConsumer { diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index cff5f26589c5..e7e5762fb11a 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -38,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; +import accord.impl.progresslog.DefaultProgressLog; import accord.local.Catchup; import accord.topology.ActiveEpochs; import accord.topology.EpochReady; @@ -131,6 +132,7 @@ import static accord.api.Journal.TopologyUpdate; import static accord.api.ProtocolModifiers.Toggles.FastExec.MAY_BYPASS_SAFESTORE; +import static accord.impl.progresslog.DefaultProgressLog.ModeFlag.CATCH_UP; import static accord.local.durability.DurabilityService.SyncLocal.Self; import static accord.local.durability.DurabilityService.SyncRemote.All; import static accord.messages.SimpleReply.Ok; @@ -475,67 +477,75 @@ void catchup() AccordSpec spec = DatabaseDescriptor.getAccord(); if (!spec.catchup_on_start) { - logger.info("Not catching up with peers"); + logger.info("Catchup disabled; continuing to startup"); return; } BootstrapState bootstrapState = SystemKeyspace.getBootstrapState(); if (bootstrapState == COMPLETED) { - long maxLatencyNanos = spec.catchup_on_start_fail_latency.toNanoseconds(); - int attempts = 1; - while (true) + node.commandStores().forAllUnsafe(commandStore -> ((DefaultProgressLog)commandStore.unsafeProgressLog()).setMode(CATCH_UP)); + try { - logger.info("Catching up with quorum..."); - long start = nanoTime(); - long failAt = start + maxLatencyNanos; - Future f = toFuture(Catchup.catchup(node)); - if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt)) + long maxLatencyNanos = spec.catchup_on_start_fail_latency.toNanoseconds(); + int attempts = 1; + while (true) { - if (spec.catchup_on_start_exit_on_failure) + logger.info("Catchup with quorum..."); + long start = nanoTime(); + long failAt = start + maxLatencyNanos; + Future f = toFuture(Catchup.catchup(node)); + if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt)) { - logger.error("Catch up exceeded maximum latency of {}ns; shutting down", maxLatencyNanos); - throw new RuntimeException("Could not catch up with peers"); + if (spec.catchup_on_start_exit_on_failure) + { + logger.error("Catchup exceeded maximum latency of {}ns; shutting down", maxLatencyNanos); + throw new RuntimeException("Could not catchup with peers"); + } + logger.error("Catchup exceeded maximum latency of {}ns; continuing to startup", maxLatencyNanos); + break; } - logger.error("Catch up exceeded maximum latency of {}ns; starting up", maxLatencyNanos); - break; - } - Throwable failed = f.cause(); - if (failed != null) - { - if (spec.catchup_on_start_exit_on_failure) - throw new RuntimeException("Could not catch up with peers", failed); + Throwable failed = f.cause(); + if (failed != null) + { + if (spec.catchup_on_start_exit_on_failure) + throw new RuntimeException("Could not catchup with peers", failed); - logger.error("Could not catch up with peers; continuing to startup"); - break; - } + logger.error("Could not catchup with peers; continuing to startup"); + break; + } - long end = nanoTime(); - double seconds = NANOSECONDS.toMillis(end - start)/1000.0; - logger.info("Finished catching up with all quorums. {}s elapsed.", String.format("%.2f", seconds)); + long end = nanoTime(); + double seconds = NANOSECONDS.toMillis(end - start)/1000.0; + logger.info("Finished catchup with all quorums. {}s elapsed.", String.format("%.2f", seconds)); - if (seconds <= spec.catchup_on_start_success_latency.toSeconds()) - break; + if (seconds <= spec.catchup_on_start_success_latency.toSeconds()) + break; - if (++attempts > spec.catchup_on_start_max_attempts) - { - if (spec.catchup_on_start_exit_on_failure) + if (++attempts > spec.catchup_on_start_max_attempts) { - logger.error("Catch up was slow, aborting after {} attempts and shutting down", attempts); - throw new RuntimeException("Could not catch up with peers"); + if (spec.catchup_on_start_exit_on_failure) + { + logger.error("Catchup was slow, aborting after {} attempts and shutting down", attempts); + throw new RuntimeException("Could not catchup with peers"); + } + + logger.info("Catchup was slow; continuing to startup after {} attempts.", attempts - 1); + break; } - logger.info("Catch up was slow; continuing to startup after {} attempts.", attempts - 1); - break; + logger.info("Catchup was slow, so we may behind again; retrying"); } - - logger.info("Catch up was slow, so we may behind again; retrying"); + } + finally + { + node.commandStores().forAllUnsafe(commandStore -> ((DefaultProgressLog)commandStore.unsafeProgressLog()).unsetMode(CATCH_UP)); } } else { - logger.info("Not catching up with quorum, as bootstrap state is {}", bootstrapState); + logger.info("No catchup, as bootstrap state is {}", bootstrapState); } } diff --git a/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java b/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java index a4ca0af5e1dc..7756c712b4fb 100644 --- a/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java +++ b/src/java/org/apache/cassandra/service/accord/DebugBlockedTxns.java @@ -96,8 +96,14 @@ public int compareTo(Txn that) int c = Integer.compare(this.commandStoreId, that.commandStoreId); if (c == 0) c = Integer.compare(this.depth, that.depth); if (c == 0) c = this.txnId.compareTo(that.txnId); + if (c == 0) c = this.blockedViaKeyString().compareTo(that.blockedViaKeyString()); return c; } + + private String blockedViaKeyString() + { + return blockedViaKey == null ? "" : blockedViaKey.toString(); + } } final IAccordService service; diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 446c52a02eb1..c843eaa066db 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -182,7 +182,7 @@ public static void scrubDataDirectories(ClusterMetadata metadata) throws Startup for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces()) { // Skip system as we've already cleaned it - if (keyspace.name.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME)) + if (keyspace.name.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) || keyspace.name.equals(SchemaConstants.ACCORD_KEYSPACE_NAME)) continue; for (TableMetadata cfm : keyspace.tables) diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java index 99929546f56e..20b320dc057a 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableCleanerThread.java @@ -92,7 +92,7 @@ private Boolean apply(Boolean res, Throwable err) final int tasks = numPendingTasks.decrementAndGet(); // if the cleaning job was scheduled (res == true) or had an error, trigger again after decrementing the tasks - if ((res || err != null) && pool.needsCleaning()) + if (((res != null && res) || err != null) && pool.needsCleaning()) wait.signal(); if (err != null) diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index 260d468af3b9..3f31cba6d806 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.BiPredicate; +import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -44,6 +45,7 @@ import accord.primitives.Status.Durability.HasOutcome; import accord.primitives.Txn; import accord.primitives.TxnId; +import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.OptionaldPositiveInt; @@ -215,6 +217,11 @@ public static void setUpClass() requireNetwork(); } + @After + public void afterTest() throws Throwable + { + } + @Test public void unknownIsEmpty() { @@ -492,7 +499,7 @@ public void completedTxn() getBlocking(accord.node().coordinate(id, txn)); filter.apply.awaitThrowUncheckedOnInterrupt(); spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, "", "", any(), "Applied"))); + row(id.toString(), 0, anyInt(), "", "", any(), "Applied"))); assertRows(execute(QUERY_TXN, id.toString()), row(id.toString(), "Applied")); assertRows(execute(QUERY_TXN_REMOTE, nodeId, id.toString()), row(id.toString(), "Applied")); assertRows(execute(QUERY_JOURNAL, id.toString()), row(id.toString(), "PreAccepted"), row(id.toString(), "Applying"), row(id.toString(), "Applied"), row(id.toString(), null)); @@ -577,14 +584,14 @@ public void inflight() throws ExecutionException, InterruptedException filter.preAccept.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, "", "", any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); + row(id.toString(), 0, anyInt(), "", "", any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, id.toString()), - row(nodeId, id.toString(), anyInt(), 0, "", "", any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); + row(nodeId, id.toString(), 0, anyInt(), "", "", any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); filter.apply.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, "", "", any(), SaveStatus.ReadyToExecute.name())); + row(id.toString(), 0, anyInt(), "", "", any(), SaveStatus.ReadyToExecute.name())); assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, id.toString()), - row(nodeId, id.toString(), anyInt(), 0, "", "", any(), SaveStatus.ReadyToExecute.name())); + row(nodeId, id.toString(), 0, anyInt(), "", "", any(), SaveStatus.ReadyToExecute.name())); } finally { @@ -619,14 +626,14 @@ public void blocked() throws ExecutionException, InterruptedException filter.preAccept.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()), - row(first.toString(), anyInt(), 0, "", any(), any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); + row(first.toString(), 0, anyInt(), "", any(), any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, first.toString()), - row(nodeId, first.toString(), anyInt(), 0, "", any(), any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); + row(nodeId, first.toString(), 0, anyInt(), "", any(), any(), anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); filter.apply.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()), - row(first.toString(), anyInt(), 0, "", any(), anyNonNull(), SaveStatus.ReadyToExecute.name())); + row(first.toString(), 0, anyInt(), "", any(), anyNonNull(), SaveStatus.ReadyToExecute.name())); assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, first.toString()), - row(nodeId, first.toString(), anyInt(), 0, "", any(), anyNonNull(), SaveStatus.ReadyToExecute.name())); + row(nodeId, first.toString(), 0, anyInt(), "", any(), anyNonNull(), SaveStatus.ReadyToExecute.name())); filter.reset(); @@ -643,15 +650,15 @@ public void blocked() throws ExecutionException, InterruptedException return rs.size() == 2; }); assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()), - row(second.toString(), anyInt(), 0, "", "", anyNonNull(), SaveStatus.Stable.name()), - row(second.toString(), anyInt(), 1, any(), first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name())); + row(second.toString(), 0, anyInt(), "", "", anyNonNull(), SaveStatus.Stable.name()), + row(second.toString(), 1, anyInt(), first.toString(), any(), anyNonNull(), SaveStatus.ReadyToExecute.name())); assertRows(execute(QUERY_TXN_BLOCKED_BY + " AND depth < 1", second.toString()), - row(second.toString(), anyInt(), 0, any(), "", anyNonNull(), SaveStatus.Stable.name())); + row(second.toString(), 0, anyInt(), "", any(), anyNonNull(), SaveStatus.Stable.name())); assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE, nodeId, second.toString()), - row(nodeId, second.toString(), anyInt(), 0, "", "", anyNonNull(), SaveStatus.Stable.name()), - row(nodeId, second.toString(), anyInt(), 1, any(), first.toString(), anyNonNull(), SaveStatus.ReadyToExecute.name())); + row(nodeId, second.toString(), 0, anyInt(), "", "", anyNonNull(), SaveStatus.Stable.name()), + row(nodeId, second.toString(), 1, anyInt(), first.toString(), any(), anyNonNull(), SaveStatus.ReadyToExecute.name())); assertRows(execute(QUERY_TXN_BLOCKED_BY_REMOTE + " AND depth < 1", nodeId, second.toString()), - row(nodeId, second.toString(), anyInt(), 0, any(), "", anyNonNull(), SaveStatus.Stable.name())); + row(nodeId, second.toString(), 0, anyInt(), "", any(), anyNonNull(), SaveStatus.Stable.name())); } finally { diff --git a/test/unit/org/apache/cassandra/journal/TestParams.java b/test/unit/org/apache/cassandra/journal/TestParams.java index edf357a7907e..d464f00fade2 100644 --- a/test/unit/org/apache/cassandra/journal/TestParams.java +++ b/test/unit/org/apache/cassandra/journal/TestParams.java @@ -31,6 +31,12 @@ public int segmentSize() return 32 << 20; } + @Override + public int compactMaxSegments() + { + return 16; + } + @Override public FailurePolicy failurePolicy() {