From 6f297a06936f1419389611a4504773d0f59cf9b9 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Wed, 6 Dec 2023 22:13:21 -0500 Subject: [PATCH 1/9] modifies FATE to use a single thread to find work This change modifies FATE to use singe thread to find work. It also cleans up some of the signaling between threads in FATE and fixes a synchronization bug in FATE that was introduced in #4017. The bug introduced in #4017 is that somethings are syncronizing on the wrong object because a new inner class was introduced. These changes were pulled from #3964 and cleaned up and improved. --- .../accumulo/core/fate/AgeOffStore.java | 12 +- .../org/apache/accumulo/core/fate/Fate.java | 125 +++++++- .../apache/accumulo/core/fate/FateStore.java | 17 +- .../accumulo/core/fate/ReadOnlyFateStore.java | 9 + .../apache/accumulo/core/fate/ZooStore.java | 279 ++++++++---------- .../accumulo/core/logging/FateLogger.java | 12 +- .../apache/accumulo/core/fate/TestStore.java | 12 +- 7 files changed, 287 insertions(+), 179 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index c8be589aeff..f61c06028ca 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -20,10 +20,12 @@ import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -148,11 +150,6 @@ public long create() { return txid; } - @Override - public FateTxStore reserve() { - return new AgeOffFateTxStore(store.reserve()); - } - @Override public FateTxStore reserve(long tid) { return new AgeOffFateTxStore(store.reserve(tid)); @@ -204,4 +201,9 @@ public ReadOnlyFateTxStore read(long tid) { public List list() { return store.list(); } + + @Override + public Iterator runnable(AtomicBoolean keepWaiting) { + return store.runnable(keepWaiting); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index a7ad8ce2437..fffcad7e110 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.fate; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; @@ -32,6 +33,8 @@ import java.util.EnumSet; import java.util.Optional; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -48,10 +51,13 @@ import org.apache.accumulo.core.util.ShutdownUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; +import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.TApplicationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + /** * Fault tolerant executor */ @@ -68,25 +74,128 @@ public class Fate { private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); private final AtomicBoolean keepRunning = new AtomicBoolean(true); + private final BlockingQueue workQueue; + private final SingalCount idleWorkerCount = new SingalCount(); + private final Thread workFinder; public enum TxInfo { TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE } + private class SingalCount { + long count; + + synchronized void increment() { + count++; + this.notifyAll(); + } + + synchronized void decrement() { + Preconditions.checkState(count > 0); + count--; + this.notifyAll(); + } + + synchronized void waitTillNonZero() { + while (count == 0 && keepRunning.get()) { + try { + wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + } + + } + + /** + * A single thread that finds transactions to work on and queues them up. Do not want each worker + * thread going to the store and looking for work as it would place more load on the store. + */ + private class WorkFinder implements Runnable { + + @Override + public void run() { + + try { + + while (keepRunning.get()) { + + while (!workQueue.isEmpty() && keepRunning.get()) { + // wait till there is at least one worker that is looking for work and the queue is + // empty + idleWorkerCount.waitTillNonZero(); + } + + var iter = store.runnable(keepRunning); + + while (iter.hasNext() && keepRunning.get()) { + Long txid = iter.next(); + try { + while (keepRunning.get()) { + if (workQueue.offer(txid, 100, MILLISECONDS)) { + break; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + } + } catch (Exception e) { + if (keepRunning.get()) { + log.warn("Failure while attempting to find work for fate", e); + } else { + log.debug("Failure while attempting to find work for fate", e); + } + + workQueue.clear(); + } + } + } + private class TransactionRunner implements Runnable { + private Optional> reserveFateTx() throws InterruptedException { + idleWorkerCount.increment(); + try { + while (keepRunning.get()) { + var unreservedTid = workQueue.poll(100, MILLISECONDS); + + if (unreservedTid == null) { + continue; + } + var optionalopStore = store.tryReserve(unreservedTid); + if (optionalopStore.isPresent()) { + return optionalopStore; + } + } + + return Optional.empty(); + } finally { + idleWorkerCount.decrement(); + } + } + @Override public void run() { while (keepRunning.get()) { long deferTime = 0; FateTxStore txStore = null; try { - txStore = store.reserve(); + var optionalopStore = reserveFateTx(); + if (optionalopStore.isPresent()) { + txStore = optionalopStore.orElseThrow(); + } else { + continue; + } TStatus status = txStore.getStatus(); Repo op = txStore.top(); if (status == FAILED_IN_PROGRESS) { processFailed(txStore, op); - } else { + } else if (status == SUBMITTED || status == IN_PROGRESS) { Repo prevOp = null; try { deferTime = op.isReady(txStore.getID(), environment); @@ -231,6 +340,9 @@ public Fate(T environment, FateStore store, Function,String> toLogStr this.environment = environment; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); + // TODO this queue does not resize when config changes + this.workQueue = + new ArrayBlockingQueue<>(conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) * 4); this.fatePoolWatcher = ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> { @@ -257,6 +369,9 @@ public Fate(T environment, FateStore store, Function,String> toLogStr } }, 3, SECONDS)); this.executor = pool; + + this.workFinder = Threads.createThread("Fate work finder", new WorkFinder()); + this.workFinder.start(); } // get a transaction id back to the requester before doing any work @@ -399,6 +514,12 @@ public void shutdown() { if (executor != null) { executor.shutdown(); } + workFinder.interrupt(); + try { + workFinder.join(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 834a2fa6e5b..b2d8101f142 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -38,6 +38,9 @@ public interface FateStore extends ReadOnlyFateStore { */ long create(); + /** + * An interface that allows read/write access to the data related to a single fate operation. + */ interface FateTxStore extends ReadOnlyFateTxStore { @Override Repo top(); @@ -81,8 +84,8 @@ interface FateTxStore extends ReadOnlyFateTxStore { * upon successful return the store now controls the referenced transaction id. caller should no * longer interact with it. * - * @param deferTime time in millis to keep this transaction out of the pool used in the - * {@link #reserve() reserve} method. must be non-negative. + * @param deferTime time in millis to keep this transaction from being returned by + * {@link #runnable()}. Must be non-negative. */ void unreserve(long deferTime); } @@ -104,14 +107,4 @@ interface FateTxStore extends ReadOnlyFateTxStore { */ FateTxStore reserve(long tid); - /** - * Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS. - * - * Reserving a transaction id ensures that nothing else in-process interacting via the same - * instance will be operating on that transaction id. - * - * @return a transaction id that is safe to interact with, chosen by the store. - */ - FateTxStore reserve(); - } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index 4e06ab0f9e4..f0140de3674 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -20,7 +20,9 @@ import java.io.Serializable; import java.util.EnumSet; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * Read only access to a Transaction Store. @@ -121,4 +123,11 @@ interface ReadOnlyFateTxStore { * @return all outstanding transactions, including those reserved by others. */ List list(); + + /** + * @return an iterator over fate op ids that are (IN_PROGRESS or FAILED_IN_PROGRESS) and + * unreserved. This method will block until it finds something that is runnable or until + * the keepWaiting parameter is false. + */ + Iterator runnable(AtomicBoolean keepWaiting); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 683f17d9585..857dd471d00 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -35,10 +35,12 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; @@ -51,6 +53,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; //TODO use zoocache? - ACCUMULO-1297 @@ -61,11 +65,14 @@ public class ZooStore implements FateStore { private static final Logger log = LoggerFactory.getLogger(ZooStore.class); private String path; private ZooReaderWriter zk; - private String lastReserved = ""; private Set reserved; private Map defered; - private long statusChangeEvents = 0; - private int reservationsWaiting = 0; + + // The key of this map is transaction id and the value is a count of threads waiting on a change + // for that transaction id + private Map waitingForChange; + + private long unreservedRunnableCount = 0; private byte[] serialize(Object o) { @@ -110,6 +117,7 @@ public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, Interru this.zk = zk; this.reserved = new HashSet<>(); this.defered = new HashMap<>(); + this.waitingForChange = new HashMap<>(); zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } @@ -136,108 +144,15 @@ public long create() { } } - @Override - public FateTxStore reserve() { - try { - while (true) { - - long events; - synchronized (this) { - events = statusChangeEvents; - } - - List txdirs = new ArrayList<>(zk.getChildren(path)); - Collections.sort(txdirs); - - synchronized (this) { - if (!txdirs.isEmpty() && txdirs.get(txdirs.size() - 1).compareTo(lastReserved) <= 0) { - lastReserved = ""; - } - } - - for (String txdir : txdirs) { - long tid = parseTid(txdir); - - synchronized (this) { - // this check makes reserve pick up where it left off, so that it cycles through all as - // it is repeatedly called.... failing to do so can lead to - // starvation where fate ops that sort higher and hold a lock are never reserved. - if (txdir.compareTo(lastReserved) <= 0) { - continue; - } - - if (defered.containsKey(tid)) { - if (defered.get(tid) < System.currentTimeMillis()) { - defered.remove(tid); - } else { - continue; - } - } - if (reserved.contains(tid)) { - continue; - } else { - reserved.add(tid); - lastReserved = txdir; - } - } - - // have reserved id, status should not change - - try { - TStatus status = TStatus.valueOf(new String(zk.getData(path + "/" + txdir), UTF_8)); - if (status == TStatus.SUBMITTED || status == TStatus.IN_PROGRESS - || status == TStatus.FAILED_IN_PROGRESS) { - return new FateTxStoreImpl(tid, true); - } else { - unreserve(tid); - } - } catch (NoNodeException nne) { - // node deleted after we got the list of children, its ok - unreserve(tid); - } catch (KeeperException | InterruptedException | RuntimeException e) { - unreserve(tid); - throw e; - } - } - - synchronized (this) { - // suppress lgtm alert - synchronized variable is not always true - if (events == statusChangeEvents) { // lgtm [java/constant-comparison] - if (defered.isEmpty()) { - this.wait(5000); - } else { - Long minTime = Collections.min(defered.values()); - long waitTime = minTime - System.currentTimeMillis(); - if (waitTime > 0) { - this.wait(Math.min(waitTime, 5000)); - } - } - } - } - } - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } - @Override public FateTxStore reserve(long tid) { synchronized (this) { - reservationsWaiting++; - try { - while (reserved.contains(tid)) { - try { - this.wait(1000); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - - reserved.add(tid); - return new FateTxStoreImpl(tid, true); - } finally { - reservationsWaiting--; + while (reserved.contains(tid)) { + waitForChange(tid, 100); } + + reserved.add(tid); + return new FateTxStoreImpl(tid, true); } } @@ -257,27 +172,13 @@ public Optional> tryReserve(long tid) { } } - private void unreserve(long tid) { - synchronized (this) { - if (!reserved.remove(tid)) { - throw new IllegalStateException( - "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); - } - - // do not want this unreserve to unesc wake up threads in reserve()... this leads to infinite - // loop when tx is stuck in NEW... - // only do this when something external has called reserve(tid)... - if (reservationsWaiting > 0) { - this.notifyAll(); - } - } - } - private class FateTxStoreImpl implements FateTxStore { private final long tid; private final boolean isReserved; + private boolean observedRunnableStatus = false; + private FateTxStoreImpl(long tid, boolean isReserved) { this.tid = tid; this.isReserved = isReserved; @@ -290,7 +191,7 @@ public void unreserve(long deferTime) { throw new IllegalArgumentException("deferTime < 0 : " + deferTime); } - synchronized (this) { + synchronized (ZooStore.this) { if (!reserved.remove(tid)) { throw new IllegalStateException( "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); @@ -300,9 +201,14 @@ public void unreserve(long deferTime) { defered.put(tid, System.currentTimeMillis() + deferTime); } - this.notifyAll(); - } + if (observedRunnableStatus) { + unreservedRunnableCount++; + } + if (waitingForChange.containsKey(tid) || observedRunnableStatus) { + ZooStore.this.notifyAll(); + } + } } private void verifyReserved(boolean isWrite) { @@ -311,7 +217,7 @@ private void verifyReserved(boolean isWrite) { } if (isReserved) { - synchronized (this) { + synchronized (ZooStore.this) { if (!reserved.contains(tid)) { throw new IllegalStateException( "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); @@ -409,45 +315,25 @@ public void pop() { } } - private TStatus _getStatus(long tid) { - try { - return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); - } catch (NoNodeException nne) { - return TStatus.UNKNOWN; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } - @Override public TStatus getStatus() { verifyReserved(false); + var status = _getStatus(tid); + observedRunnableStatus = isRunnable(status); return _getStatus(tid); } @Override public TStatus waitForStatusChange(EnumSet expected) { + Preconditions.checkState(!isReserved, + "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); while (true) { - long events; - synchronized (this) { - events = statusChangeEvents; - } - TStatus status = _getStatus(tid); if (expected.contains(status)) { return status; } - synchronized (this) { - // suppress lgtm alert - synchronized variable is not always true - if (events == statusChangeEvents) { // lgtm [java/constant-comparison] - try { - this.wait(5000); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } - } - } + waitForChange(tid, 5000); } } @@ -462,10 +348,7 @@ public void setStatus(TStatus status) { throw new IllegalStateException(e); } - synchronized (this) { - statusChangeEvents++; - } - + observedRunnableStatus = isRunnable(status); } @Override @@ -582,6 +465,33 @@ public List> getStack() { } } + private void waitForChange(long tid, long timeout) { + synchronized (ZooStore.this) { + waitingForChange.compute(tid, (k, v) -> (v == null) ? 1 : v + 1); + try { + ZooStore.this.wait(timeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } finally { + waitingForChange.compute(tid, (k, v) -> { + Preconditions.checkState(v != null && v > 0); + return (v == 1) ? null : v - 1; + }); + } + } + } + + private TStatus _getStatus(long tid) { + try { + return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); + } catch (NoNodeException nne) { + return TStatus.UNKNOWN; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + @Override public ReadOnlyFateTxStore read(long tid) { return new FateTxStoreImpl(tid, false); @@ -600,4 +510,73 @@ public List list() { throw new IllegalStateException(e); } } + + private boolean isRunnable(TStatus status) { + return status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS + || status == TStatus.SUBMITTED; + } + + @Override + public Iterator runnable(AtomicBoolean keepWaiting) { + + while (keepWaiting.get()) { + ArrayList runnableTids = new ArrayList<>(); + + long events; + synchronized (this) { + events = unreservedRunnableCount; + } + + try { + + List transactions = zk.getChildren(path); + for (String txidStr : transactions) { + long txid = parseTid(txidStr); + if (isRunnable(_getStatus(txid))) { + runnableTids.add(txid); + } + } + + synchronized (this) { + runnableTids.removeIf(txid -> { + var deferedTime = defered.get(txid); + if (deferedTime != null) { + if (deferedTime < System.currentTimeMillis()) { + return true; + } else { + defered.remove(txid); + } + } + + if (reserved.contains(txid)) { + return true; + } + + return false; + }); + + if (runnableTids.isEmpty()) { + // suppress lgtm alert - synchronized variable is not always true + if (events == unreservedRunnableCount) {// lgtm [java/constant-comparison] + if (defered.isEmpty()) { + this.wait(5000); + } else { + Long minTime = Collections.min(defered.values()); + long waitTime = minTime - System.currentTimeMillis(); + if (waitTime > 0) { + this.wait(Math.min(waitTime, 5000)); + } + } + } + } else { + return runnableTids.iterator(); + } + } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + + return List.of().iterator(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index ce8dda313b5..d85e417650a 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -21,8 +21,10 @@ import static org.apache.accumulo.core.fate.FateTxId.formatTid; import java.io.Serializable; +import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import org.apache.accumulo.core.fate.Fate; @@ -97,11 +99,6 @@ public static FateStore wrap(FateStore store, Function,String> // only logging operations that change the persisted data, not operations that only read data return new FateStore<>() { - @Override - public FateTxStore reserve() { - return new LoggingFateTxStore<>(store.reserve(), toLogString); - } - @Override public FateTxStore reserve(long tid) { return new LoggingFateTxStore<>(store.reserve(tid), toLogString); @@ -122,6 +119,11 @@ public List list() { return store.list(); } + @Override + public Iterator runnable(AtomicBoolean keepWaiting) { + return store.runnable(keepWaiting); + } + @Override public long create() { long tid = store.create(); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 5bfd60d2bd8..058b0c50a4b 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -23,10 +23,12 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; /** * Transient in memory store for transactions. @@ -53,11 +55,6 @@ public FateTxStore reserve(long tid) { return new TestFateTxStore(tid); } - @Override - public FateTxStore reserve() { - throw new UnsupportedOperationException(); - } - @Override public Optional> tryReserve(long tid) { synchronized (this) { @@ -172,4 +169,9 @@ public List list() { return new ArrayList<>(statuses.keySet()); } + @Override + public Iterator runnable(AtomicBoolean keepWaiting) { + throw new UnsupportedOperationException(); + } + } From 76362767f2aed1287db2aceffc8c6e8f8e7dac53 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 7 Dec 2023 17:14:28 -0500 Subject: [PATCH 2/9] fix javadoc --- .../org/apache/accumulo/core/fate/FateStore.java | 2 +- .../org/apache/accumulo/test/ComprehensiveIT.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index b2d8101f142..7db5766e81b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -85,7 +85,7 @@ interface FateTxStore extends ReadOnlyFateTxStore { * longer interact with it. * * @param deferTime time in millis to keep this transaction from being returned by - * {@link #runnable()}. Must be non-negative. + * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative. */ void unreserve(long deferTime); } diff --git a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java index fb7215216b0..c9a05c178f4 100644 --- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java @@ -148,7 +148,10 @@ public void testBulkImport() throws Exception { public void testMergeAndSplit() throws Exception { String table = getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + long t1 = System.currentTimeMillis(); client.tableOperations().create(table); + long t2 = System.currentTimeMillis(); + System.out.println("create time " + (t2 - t1)); final SortedMap expectedData = generateKeys(0, 300); @@ -158,23 +161,35 @@ public void testMergeAndSplit() throws Exception { // test adding splits to a table var splits = new TreeSet<>(List.of(new Text(row(75)), new Text(row(150)))); + t1 = System.currentTimeMillis(); client.tableOperations().addSplits(table, splits); + t2 = System.currentTimeMillis(); + System.out.println("split time " + (t2 - t1)); assertEquals(splits, new TreeSet<>(client.tableOperations().listSplits(table))); // adding splits should not change data verifyData(client, table, AUTHORIZATIONS, expectedData); + t1 = System.currentTimeMillis(); // test merging splits away client.tableOperations().merge(table, null, null); + t2 = System.currentTimeMillis(); + System.out.println("merge time " + (t2 - t1)); assertEquals(Set.of(), new TreeSet<>(client.tableOperations().listSplits(table))); // merging should not change data verifyData(client, table, AUTHORIZATIONS, expectedData); splits = new TreeSet<>(List.of(new Text(row(33)), new Text(row(66)))); + t1 = System.currentTimeMillis(); client.tableOperations().addSplits(table, splits); + t2 = System.currentTimeMillis(); + System.out.println("split time " + (t2 - t1)); assertEquals(splits, new TreeSet<>(client.tableOperations().listSplits(table))); verifyData(client, table, AUTHORIZATIONS, expectedData); + t1 = System.currentTimeMillis(); client.tableOperations().merge(table, null, null); + t2 = System.currentTimeMillis(); + System.out.println("merge time " + (t2 - t1)); assertEquals(Set.of(), new TreeSet<>(client.tableOperations().listSplits(table))); verifyData(client, table, AUTHORIZATIONS, expectedData); } From a1bd38b316dadb278b7dc7c8d6f2ec06db045107 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Thu, 7 Dec 2023 17:21:44 -0500 Subject: [PATCH 3/9] remove test changed and adjust wait time --- .../org/apache/accumulo/core/fate/ZooStore.java | 2 +- .../org/apache/accumulo/test/ComprehensiveIT.java | 15 --------------- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 857dd471d00..063137e57f7 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -333,7 +333,7 @@ public TStatus waitForStatusChange(EnumSet expected) { return status; } - waitForChange(tid, 5000); + waitForChange(tid, 1000); } } diff --git a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java index c9a05c178f4..fb7215216b0 100644 --- a/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ComprehensiveIT.java @@ -148,10 +148,7 @@ public void testBulkImport() throws Exception { public void testMergeAndSplit() throws Exception { String table = getUniqueNames(1)[0]; try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - long t1 = System.currentTimeMillis(); client.tableOperations().create(table); - long t2 = System.currentTimeMillis(); - System.out.println("create time " + (t2 - t1)); final SortedMap expectedData = generateKeys(0, 300); @@ -161,35 +158,23 @@ public void testMergeAndSplit() throws Exception { // test adding splits to a table var splits = new TreeSet<>(List.of(new Text(row(75)), new Text(row(150)))); - t1 = System.currentTimeMillis(); client.tableOperations().addSplits(table, splits); - t2 = System.currentTimeMillis(); - System.out.println("split time " + (t2 - t1)); assertEquals(splits, new TreeSet<>(client.tableOperations().listSplits(table))); // adding splits should not change data verifyData(client, table, AUTHORIZATIONS, expectedData); - t1 = System.currentTimeMillis(); // test merging splits away client.tableOperations().merge(table, null, null); - t2 = System.currentTimeMillis(); - System.out.println("merge time " + (t2 - t1)); assertEquals(Set.of(), new TreeSet<>(client.tableOperations().listSplits(table))); // merging should not change data verifyData(client, table, AUTHORIZATIONS, expectedData); splits = new TreeSet<>(List.of(new Text(row(33)), new Text(row(66)))); - t1 = System.currentTimeMillis(); client.tableOperations().addSplits(table, splits); - t2 = System.currentTimeMillis(); - System.out.println("split time " + (t2 - t1)); assertEquals(splits, new TreeSet<>(client.tableOperations().listSplits(table))); verifyData(client, table, AUTHORIZATIONS, expectedData); - t1 = System.currentTimeMillis(); client.tableOperations().merge(table, null, null); - t2 = System.currentTimeMillis(); - System.out.println("merge time " + (t2 - t1)); assertEquals(Set.of(), new TreeSet<>(client.tableOperations().listSplits(table))); verifyData(client, table, AUTHORIZATIONS, expectedData); } From 66837e7c8010c0db551439a8a0d7f643f2cb90cc Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 8 Dec 2023 08:45:23 -0500 Subject: [PATCH 4/9] code review update --- core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index fffcad7e110..99b7f00edb1 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -75,14 +75,14 @@ public class Fate { private final AtomicBoolean keepRunning = new AtomicBoolean(true); private final BlockingQueue workQueue; - private final SingalCount idleWorkerCount = new SingalCount(); + private final SignalCount idleWorkerCount = new SignalCount(); private final Thread workFinder; public enum TxInfo { TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE } - private class SingalCount { + private class SignalCount { long count; synchronized void increment() { From f67c53fd2108a007afdc903a7e9d3b3dce06f57c Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 8 Dec 2023 09:59:48 -0500 Subject: [PATCH 5/9] removes some usages of wait and notify --- .../org/apache/accumulo/core/fate/Fate.java | 31 +----- .../accumulo/core/fate/SignalCount.java | 70 ++++++++++++++ .../apache/accumulo/core/fate/ZooStore.java | 95 +++++++++---------- 3 files changed, 114 insertions(+), 82 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 99b7f00edb1..566c991a1f2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -56,8 +56,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - /** * Fault tolerant executor */ @@ -82,33 +80,6 @@ public enum TxInfo { TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE } - private class SignalCount { - long count; - - synchronized void increment() { - count++; - this.notifyAll(); - } - - synchronized void decrement() { - Preconditions.checkState(count > 0); - count--; - this.notifyAll(); - } - - synchronized void waitTillNonZero() { - while (count == 0 && keepRunning.get()) { - try { - wait(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } - } - } - - } - /** * A single thread that finds transactions to work on and queues them up. Do not want each worker * thread going to the store and looking for work as it would place more load on the store. @@ -125,7 +96,7 @@ public void run() { while (!workQueue.isEmpty() && keepRunning.get()) { // wait till there is at least one worker that is looking for work and the queue is // empty - idleWorkerCount.waitTillNonZero(); + idleWorkerCount.waitFor(count -> count != 0, keepRunning::get); } var iter = store.runnable(keepRunning); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java b/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java new file mode 100644 index 00000000000..4bad48a6afa --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; +import java.util.function.LongPredicate; + +import com.google.common.base.Preconditions; + +class SignalCount { + private long count = 0; + + synchronized void increment() { + count++; + this.notifyAll(); + } + + synchronized void decrement() { + Preconditions.checkState(count > 0); + count--; + this.notifyAll(); + } + + synchronized long getCount() { + return count; + } + + synchronized boolean waitFor(LongPredicate predicate, BooleanSupplier keepWaiting) { + return waitFor(predicate, Long.MAX_VALUE, keepWaiting); + } + + synchronized boolean waitFor(LongPredicate predicate, long maxWait, BooleanSupplier keepWaiting) { + Preconditions.checkArgument(maxWait >= 0); + + if (maxWait == 0) { + return predicate.test(count); + } + + long start = System.nanoTime(); + + while (!predicate.test(count) && keepWaiting.getAsBoolean() + && TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) < maxWait) { + try { + wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + return predicate.test(count); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 063137e57f7..38071ef182e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -68,11 +68,11 @@ public class ZooStore implements FateStore { private Set reserved; private Map defered; - // The key of this map is transaction id and the value is a count of threads waiting on a change - // for that transaction id - private Map waitingForChange; + // This is incremented each time a transaction was unreserved that was non new + private final SignalCount unreservedNonNewCount = new SignalCount(); - private long unreservedRunnableCount = 0; + // This is incremented each time a transaction is unreserved that was runnable + private final SignalCount unreservedRunnableCount = new SignalCount(); private byte[] serialize(Object o) { @@ -117,7 +117,6 @@ public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, Interru this.zk = zk; this.reserved = new HashSet<>(); this.defered = new HashMap<>(); - this.waitingForChange = new HashMap<>(); zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } @@ -146,9 +145,14 @@ public long create() { @Override public FateTxStore reserve(long tid) { - synchronized (this) { + synchronized (ZooStore.this) { while (reserved.contains(tid)) { - waitForChange(tid, 100); + try { + ZooStore.this.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } } reserved.add(tid); @@ -177,7 +181,7 @@ private class FateTxStoreImpl implements FateTxStore { private final long tid; private final boolean isReserved; - private boolean observedRunnableStatus = false; + private TStatus observedStatus = null; private FateTxStoreImpl(long tid, boolean isReserved) { this.tid = tid; @@ -197,17 +201,20 @@ public void unreserve(long deferTime) { "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); } + // notify any threads waiting to reserve + ZooStore.this.notifyAll(); + if (deferTime > 0) { defered.put(tid, System.currentTimeMillis() + deferTime); } + } - if (observedRunnableStatus) { - unreservedRunnableCount++; - } + if (observedStatus != null && isRunnable(observedStatus)) { + unreservedRunnableCount.increment(); + } - if (waitingForChange.containsKey(tid) || observedRunnableStatus) { - ZooStore.this.notifyAll(); - } + if (observedStatus != TStatus.NEW) { + unreservedNonNewCount.increment(); } } @@ -319,7 +326,7 @@ public void pop() { public TStatus getStatus() { verifyReserved(false); var status = _getStatus(tid); - observedRunnableStatus = isRunnable(status); + observedStatus = status; return _getStatus(tid); } @@ -328,12 +335,15 @@ public TStatus waitForStatusChange(EnumSet expected) { Preconditions.checkState(!isReserved, "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); while (true) { + + long countBefore = unreservedNonNewCount.getCount(); + TStatus status = _getStatus(tid); if (expected.contains(status)) { return status; } - waitForChange(tid, 1000); + unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); } } @@ -348,7 +358,7 @@ public void setStatus(TStatus status) { throw new IllegalStateException(e); } - observedRunnableStatus = isRunnable(status); + observedStatus = status; } @Override @@ -465,23 +475,6 @@ public List> getStack() { } } - private void waitForChange(long tid, long timeout) { - synchronized (ZooStore.this) { - waitingForChange.compute(tid, (k, v) -> (v == null) ? 1 : v + 1); - try { - ZooStore.this.wait(timeout); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } finally { - waitingForChange.compute(tid, (k, v) -> { - Preconditions.checkState(v != null && v > 0); - return (v == 1) ? null : v - 1; - }); - } - } - } - private TStatus _getStatus(long tid) { try { return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); @@ -522,10 +515,7 @@ public Iterator runnable(AtomicBoolean keepWaiting) { while (keepWaiting.get()) { ArrayList runnableTids = new ArrayList<>(); - long events; - synchronized (this) { - events = unreservedRunnableCount; - } + final long beforeCount = unreservedRunnableCount.getCount(); try { @@ -554,24 +544,25 @@ public Iterator runnable(AtomicBoolean keepWaiting) { return false; }); + } - if (runnableTids.isEmpty()) { - // suppress lgtm alert - synchronized variable is not always true - if (events == unreservedRunnableCount) {// lgtm [java/constant-comparison] - if (defered.isEmpty()) { - this.wait(5000); - } else { - Long minTime = Collections.min(defered.values()); - long waitTime = minTime - System.currentTimeMillis(); - if (waitTime > 0) { - this.wait(Math.min(waitTime, 5000)); - } - } + if (runnableTids.isEmpty()) { + if (beforeCount == unreservedRunnableCount.getCount()) { + long waitTime = 5000; + if (!defered.isEmpty()) { + Long minTime = Collections.min(defered.values()); + waitTime = minTime - System.currentTimeMillis(); + } + + if (waitTime > 0) { + unreservedRunnableCount.waitFor(count -> count != beforeCount, waitTime, + keepWaiting::get); } - } else { - return runnableTids.iterator(); } + } else { + return runnableTids.iterator(); } + } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } From abd608652e69b084ecd9b41b24a5a8c1e72f2c27 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 8 Dec 2023 10:36:39 -0500 Subject: [PATCH 6/9] code review update --- .../org/apache/accumulo/core/fate/Fate.java | 44 +++++++------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 566c991a1f2..0b3dd5e7078 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -33,12 +33,12 @@ import java.util.EnumSet; import java.util.Optional; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -72,7 +72,7 @@ public class Fate { private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); private final AtomicBoolean keepRunning = new AtomicBoolean(true); - private final BlockingQueue workQueue; + private final TransferQueue workQueue; private final SignalCount idleWorkerCount = new SignalCount(); private final Thread workFinder; @@ -92,20 +92,13 @@ public void run() { try { while (keepRunning.get()) { - - while (!workQueue.isEmpty() && keepRunning.get()) { - // wait till there is at least one worker that is looking for work and the queue is - // empty - idleWorkerCount.waitFor(count -> count != 0, keepRunning::get); - } - var iter = store.runnable(keepRunning); while (iter.hasNext() && keepRunning.get()) { Long txid = iter.next(); try { while (keepRunning.get()) { - if (workQueue.offer(txid, 100, MILLISECONDS)) { + if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) { break; } } @@ -130,24 +123,19 @@ public void run() { private class TransactionRunner implements Runnable { private Optional> reserveFateTx() throws InterruptedException { - idleWorkerCount.increment(); - try { - while (keepRunning.get()) { - var unreservedTid = workQueue.poll(100, MILLISECONDS); + while (keepRunning.get()) { + var unreservedTid = workQueue.poll(100, MILLISECONDS); - if (unreservedTid == null) { - continue; - } - var optionalopStore = store.tryReserve(unreservedTid); - if (optionalopStore.isPresent()) { - return optionalopStore; - } + if (unreservedTid == null) { + continue; + } + var optionalopStore = store.tryReserve(unreservedTid); + if (optionalopStore.isPresent()) { + return optionalopStore; } - - return Optional.empty(); - } finally { - idleWorkerCount.decrement(); } + + return Optional.empty(); } @Override @@ -311,9 +299,7 @@ public Fate(T environment, FateStore store, Function,String> toLogStr this.environment = environment; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); - // TODO this queue does not resize when config changes - this.workQueue = - new ArrayBlockingQueue<>(conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE) * 4); + this.workQueue = new LinkedTransferQueue<>(); this.fatePoolWatcher = ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf); ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> { From d4f0061485b114d245038b9d9dc8e51b2d547909 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 8 Dec 2023 10:55:47 -0500 Subject: [PATCH 7/9] moved try catch inside loop --- .../org/apache/accumulo/core/fate/Fate.java | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 0b3dd5e7078..8abc86085a9 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -88,10 +88,8 @@ private class WorkFinder implements Runnable { @Override public void run() { - - try { - - while (keepRunning.get()) { + while (keepRunning.get()) { + try { var iter = store.runnable(keepRunning); while (iter.hasNext() && keepRunning.get()) { @@ -107,15 +105,15 @@ public void run() { throw new IllegalStateException(e); } } - } - } catch (Exception e) { - if (keepRunning.get()) { - log.warn("Failure while attempting to find work for fate", e); - } else { - log.debug("Failure while attempting to find work for fate", e); - } + } catch (Exception e) { + if (keepRunning.get()) { + log.warn("Failure while attempting to find work for fate", e); + } else { + log.debug("Failure while attempting to find work for fate", e); + } - workQueue.clear(); + workQueue.clear(); + } } } } From 23248e70b5267ec846f50008c1a6c8f69de31964 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 8 Dec 2023 11:25:51 -0500 Subject: [PATCH 8/9] removes unused variable --- core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 8abc86085a9..7e4223e2eaa 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -73,7 +73,6 @@ public class Fate { private final AtomicBoolean keepRunning = new AtomicBoolean(true); private final TransferQueue workQueue; - private final SignalCount idleWorkerCount = new SignalCount(); private final Thread workFinder; public enum TxInfo { From 6a1cddc8809d45d2896a7db841c77bcdc59cc66c Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 8 Dec 2023 12:09:16 -0500 Subject: [PATCH 9/9] added comment about transfer vs queuing --- core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 7e4223e2eaa..a54ad734ee7 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -95,6 +95,11 @@ public void run() { Long txid = iter.next(); try { while (keepRunning.get()) { + // The reason for calling transfer instead of queueing is avoid rescanning the + // storage layer and adding the same thing over and over. For example if all threads + // were busy, the queue size was 100, and there are three runnable things in the + // store. Do not want to keep scanning the store adding those same 3 runnable things + // until the queue is full. if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) { break; }