Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/config/AccordSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public enum MixedTimeSourceHandling
public static class JournalSpec implements Params
{
public int segmentSize = 32 << 20;
public int compactMaxSegments = 32;
public FailurePolicy failurePolicy = FailurePolicy.STOP;
public ReplayMode replayMode = ReplayMode.ONLY_NON_DURABLE;
public FlushMode flushMode = FlushMode.PERIODIC;
Expand Down Expand Up @@ -225,6 +226,12 @@ public int segmentSize()
return segmentSize;
}

@Override
public int compactMaxSegments()
{
return compactMaxSegments;
}

@Override
public FailurePolicy failurePolicy()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1601,7 +1601,7 @@ private TxnTable()
"Accord per-CommandStore Transaction State",
"CREATE TABLE %s (\n" +
" command_store_id int,\n" +
" txn_id text,\n" +
" txn_id 'TxnIdUtf8Type',\n" +
" save_status text,\n" +
" route text,\n" +
" durability text,\n" +
Expand Down Expand Up @@ -2089,7 +2089,7 @@ protected TxnBlockedByTable()
" blocked_by_txn_id 'TxnIdUtf8Type',\n" +
" save_status text,\n" +
" execute_at text,\n" +
" PRIMARY KEY (txn_id, command_store_id, depth, blocked_by_key, blocked_by_txn_id)" +
" PRIMARY KEY (txn_id, depth, command_store_id, blocked_by_txn_id, blocked_by_key)" +
')', TxnIdUtf8Type.instance), BEST_EFFORT, ASC);
}

Expand All @@ -2111,7 +2111,7 @@ protected void collect(PartitionsCollector collector)
DebugBlockedTxns.visit(AccordService.unsafeInstance(), txnId, maxDepth, collector.deadlineNanos(), txn -> {
String keyStr = txn.blockedViaKey == null ? "" : txn.blockedViaKey.toString();
String txnIdStr = txn.txnId == null || txn.txnId.equals(txnId) ? "" : txn.txnId.toString();
rows.add(txn.commandStoreId, txn.depth, keyStr, txnIdStr)
rows.add(txn.depth, txn.commandStoreId, txnIdStr, keyStr)
.eagerCollect(columns -> {
columns.add("save_status", txn.saveStatus, TO_STRING)
.add("execute_at", txn.executeAt, TO_STRING);
Expand Down Expand Up @@ -2160,8 +2160,9 @@ private ShardEpochsTable()
" quorum_fast_privileged_deps int,\n" +
" quorum_fast_privileged_nodeps int,\n" +
" token_end 'TokenUtf8Type',\n" +
" PRIMARY KEY (table_id, token_start, epoch_start)" +
')', UTF8Type.instance), FAIL, ASC);
" PRIMARY KEY (table_id, token_start, epoch_start))," +
" WITH CLUSTERING ORDER BY (token_start, epoch_start DESC)"
, UTF8Type.instance), FAIL, ASC);
}

@Override
Expand Down
17 changes: 14 additions & 3 deletions src/java/org/apache/cassandra/journal/Compactor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.cassandra.journal;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand All @@ -29,6 +29,7 @@

import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.concurrent.Shutdownable;
import org.apache.cassandra.utils.concurrent.WaitQueue;

import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;

Expand All @@ -40,6 +41,7 @@ public final class Compactor<K, V> implements Runnable, Shutdownable
private final SegmentCompactor<K, V> segmentCompactor;
private final ScheduledExecutorPlus executor;
private Future<?> scheduled;
public final WaitQueue compacted = WaitQueue.newWaitQueue();

Compactor(Journal<K, V> journal, SegmentCompactor<K, V> segmentCompactor)
{
Expand Down Expand Up @@ -73,11 +75,18 @@ public synchronized void updateCompactionPeriod(int period, TimeUnit units)
@Override
public void run()
{
Set<StaticSegment<K, V>> toCompact = new HashSet<>();
List<StaticSegment<K, V>> toCompact = new ArrayList<>();
journal.segments().selectStatic(toCompact);
if (toCompact.isEmpty())
return;

int limit = journal.params.compactMaxSegments();
if (toCompact.size() > limit)
{
toCompact.sort(StaticSegment::compareTo);
toCompact.subList(limit, toCompact.size()).clear();
}

try
{
Collection<StaticSegment<K, V>> newSegments = segmentCompactor.compact(toCompact);
Expand All @@ -88,6 +97,8 @@ public void run()
journal.replaceCompactedSegments(toCompact, newSegments);
for (StaticSegment<K, V> segment : toCompact)
segment.discard(journal);

compacted.signalAll();
}
catch (IOException e)
{
Expand Down
21 changes: 21 additions & 0 deletions src/java/org/apache/cassandra/journal/Journal.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,27 @@ public void start()
"Unexpected journal state after initialization", state);
flusher.start();
compactor.start();

final int maxSegments = 100;
if (segments.get().count(Segment::isStatic) > maxSegments)
{
while (true)
{
WaitQueue.Signal signal = compactor.compacted.register();
int count = segments.get().count(Segment::isStatic);
if (count <= maxSegments)
{
signal.cancel();
logger.info("Only {} static segments; continuing with startup", count);
break;
}
else
{
logger.info("Too many ({}) static segments; waiting until some compacted before starting up", count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: some are compacted maybe?
(please feel free to ignore)

signal.awaitThrowUncheckedOnInterrupt();
}
}
}
}

@VisibleForTesting
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/journal/Params.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ enum ReplayMode { RESET, ALL, ONLY_NON_DURABLE }
*/
int segmentSize();

/**
* @return maximum number of static segments to compact at once to sstable
*/
int compactMaxSegments();

/**
* @return this journal's {@link FailurePolicy}
*/
Expand Down
11 changes: 11 additions & 0 deletions src/java/org/apache/cassandra/journal/Segments.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ Iterable<Segment<K, V>> all()
return this.segments.values();
}

public int count(Predicate<? super Segment<K, V>> predicate)
{
int count = 0;
for (Segment<K, V> segment : segments.values())
{
if (predicate.test(segment))
++count;
}
return count;
}

/**
* Returns segments in timestamp order. Will allocate and sort the segment collection.
*/
Expand Down
6 changes: 6 additions & 0 deletions src/java/org/apache/cassandra/service/StartupChecks.java
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,12 @@ public void execute(StartupChecksOptions options) throws StartupException
for (TableMetadata cfm : Schema.instance.getTablesAndViews(SchemaConstants.SYSTEM_KEYSPACE_NAME))
ColumnFamilyStore.scrubDataDirectories(cfm);

if (DatabaseDescriptor.getAccordTransactionsEnabled())
{
for (TableMetadata cfm : Schema.instance.getTablesAndViews(SchemaConstants.ACCORD_KEYSPACE_NAME))
ColumnFamilyStore.scrubDataDirectories(cfm);
}

try
{
SystemKeyspace.checkHealth();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ private <K, V> void shrinkOrEvict(Lock lock, AccordCacheEntry<K, V> node)
{
//noinspection LockAcquiredButNotSafelyReleased
lock.lock();
node.tryApplyShrink(cur, upd);
queue.addLast(node);
node.tryApplyShrink(cur, upd, queue);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.primitives.Ints;

import accord.utils.ArrayBuffers.BufferList;
import accord.utils.IntrusiveLinkedList;
import accord.utils.IntrusiveLinkedListNode;
import accord.utils.Invariants;
import accord.utils.async.Cancellable;
Expand Down Expand Up @@ -595,10 +596,14 @@ public Throwable failure()
return ((FailedToSave)state).cause;
}

void tryApplyShrink(Object cur, Object upd)
void tryApplyShrink(Object cur, Object upd, IntrusiveLinkedList<AccordCacheEntry<?,?>> queue)
{
if (references() > 0 || !isUnqueued())
return;

if (isLoaded() && unwrap() == cur && upd != cur && upd != null)
applyShrink(owner.parent(), cur, upd);
queue.addLast(this);
}

private void applyShrink(AccordCache.Type<K, V, ?> parent, Object cur, Object upd)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void accept(TableMetadata metadata)
notify = commandStores;
commandStores = null;
}
CommandStores commandStores = AccordService.instance().node().commandStores();
CommandStores commandStores = AccordService.unsafeInstance().node().commandStores();
for (Map.Entry<Integer, RedundantBefore> e : notify.entrySet())
{
RedundantBefore durable = e.getValue();
Expand Down
21 changes: 16 additions & 5 deletions src/java/org/apache/cassandra/service/accord/AccordJournal.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,9 @@ public void start(Node node)
Invariants.require(status == Status.INITIALIZED);
this.node = node;
status = Status.STARTING;
journal.start();
// start table first to scrub directories before compactor starts
journalTable.start();
journal.start();
}

public boolean started()
Expand Down Expand Up @@ -320,28 +321,28 @@ public Command.MinimalWithDeps loadMinimalWithDeps(int commandStoreId, TxnId txn
@Override
public RedundantBefore loadRedundantBefore(int commandStoreId)
{
IdentityAccumulator<RedundantBefore> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
IdentityAccumulator<RedundantBefore> accumulator = readLast(new JournalKey(TxnId.NONE, JournalKey.Type.REDUNDANT_BEFORE, commandStoreId));
return accumulator.get();
}

@Override
public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId)
{
IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, commandStoreId));
IdentityAccumulator<NavigableMap<TxnId, Ranges>> accumulator = readLast(new JournalKey(TxnId.NONE, JournalKey.Type.BOOTSTRAP_BEGAN_AT, commandStoreId));
return accumulator.get();
}

@Override
public NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId)
{
IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, commandStoreId));
IdentityAccumulator<NavigableMap<Timestamp, Ranges>> accumulator = readLast(new JournalKey(TxnId.NONE, JournalKey.Type.SAFE_TO_READ, commandStoreId));
return accumulator.get();
}

@Override
public CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId)
{
IdentityAccumulator<RangesForEpoch> accumulator = readAll(new JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
IdentityAccumulator<RangesForEpoch> accumulator = readLast(new JournalKey(TxnId.NONE, JournalKey.Type.RANGES_FOR_EPOCH, commandStoreId));
return accumulator.get();
}

Expand Down Expand Up @@ -520,6 +521,16 @@ public <BUILDER extends FlyweightImage> BUILDER readAll(JournalKey key)
return builder;
}

public <BUILDER extends FlyweightImage> BUILDER readLast(JournalKey key)
{
BUILDER builder = (BUILDER) key.type.serializer.mergerFor();
builder.reset(key);
// TODO (expected): this can be further improved to avoid allocating lambdas
AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER> serializer = (AccordJournalValueSerializers.FlyweightSerializer<?, BUILDER>) key.type.serializer;
journalTable.readLast(key, (in, userVersion) -> serializer.deserialize(key, builder, in, userVersion));
return builder;
}

public void forEachEntry(JournalKey key, AccordJournalTable.Reader reader)
{
journalTable.readAll(key, reader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,43 @@ public void readAll(K key, RecordConsumer<K> reader)
}
}
}


public void readLast(K key, Reader reader)
{
readLast(key, new RecordConsumerAdapter<>(reader));
}

public void readLast(K key, RecordConsumer<K> reader)
{
try (TableKeyIterator table = readAllFromTable(key))
{
boolean hasTableData = table.advance();
long minSegment = hasTableData ? table.segment : Long.MIN_VALUE;

class JournalReader implements RecordConsumer<K>
{
boolean read;
@Override
public void accept(long segment, int position, K key, ByteBuffer buffer, int userVersion)
{
if (segment > minSegment)
{
reader.accept(segment, position, key, buffer, userVersion);
read = true;
}
}
}

// First, read all journal entries newer than anything flushed into sstables
JournalReader journalReader = new JournalReader();
journal.readLast(key, journalReader);

// Then, read SSTables, if we haven't found a record already
if (hasTableData && !journalReader.read)
reader.accept(table.segment, table.offset, key, table.value, table.userVersion);
}
}

// TODO (expected): why are recordColumn and versionColumn instance fields, so that this cannot be a static class?
class TableKeyIterator implements Closeable, RecordConsumer<K>
{
Expand Down
Loading