diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 95ef99448f5..21838f282bf 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -419,6 +419,8 @@ public void print(ReadOnlyFateStore zs, ZooReader zk, public boolean prepDelete(FateStore zs, ZooReaderWriter zk, ServiceLockPath path, String txidStr) { + // TODO do not need global lock now + // TODO need way to see what process holds a reservation if (!checkGlobalLock(zk, path)) { return false; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java deleted file mode 100644 index f61c06028ca..00000000000 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This store removes Repos, in the store it wraps, that are in a finished or new state for more - * than a configurable time period. - * - * No external time source is used. It starts tracking idle time when its created. - */ -public class AgeOffStore implements FateStore { - - public interface TimeSource { - long currentTimeMillis(); - } - - private static final Logger log = LoggerFactory.getLogger(AgeOffStore.class); - - private final FateStore store; - private Map candidates; - private long ageOffTime; - private long minTime; - private TimeSource timeSource; - - private synchronized void updateMinTime() { - minTime = Long.MAX_VALUE; - - for (Long time : candidates.values()) { - if (time < minTime) { - minTime = time; - } - } - } - - private synchronized void addCandidate(long txid) { - long time = timeSource.currentTimeMillis(); - candidates.put(txid, time); - if (time < minTime) { - minTime = time; - } - } - - private synchronized void removeCandidate(long txid) { - Long time = candidates.remove(txid); - if (time != null && time <= minTime) { - updateMinTime(); - } - } - - public void ageOff() { - HashSet oldTxs = new HashSet<>(); - - synchronized (this) { - long time = timeSource.currentTimeMillis(); - if (minTime < time && time - minTime >= ageOffTime) { - for (Entry entry : candidates.entrySet()) { - if (time - entry.getValue() >= ageOffTime) { - oldTxs.add(entry.getKey()); - } - } - - candidates.keySet().removeAll(oldTxs); - updateMinTime(); - } - } - - for (Long txid : oldTxs) { - try { - FateTxStore txStore = store.reserve(txid); - try { - switch (txStore.getStatus()) { - case NEW: - case FAILED: - case SUCCESSFUL: - txStore.delete(); - log.debug("Aged off FATE tx {}", FateTxId.formatTid(txid)); - break; - default: - break; - } - - } finally { - txStore.unreserve(0); - } - } catch (Exception e) { - log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e); - } - } - } - - public AgeOffStore(FateStore store, long ageOffTime, TimeSource timeSource) { - this.store = store; - this.ageOffTime = ageOffTime; - this.timeSource = timeSource; - candidates = new HashMap<>(); - - minTime = Long.MAX_VALUE; - - List txids = store.list(); - for (Long txid : txids) { - FateTxStore txStore = store.reserve(txid); - try { - switch (txStore.getStatus()) { - case NEW: - case FAILED: - case SUCCESSFUL: - addCandidate(txid); - break; - default: - break; - } - } finally { - txStore.unreserve(0); - } - } - } - - @Override - public long create() { - long txid = store.create(); - addCandidate(txid); - return txid; - } - - @Override - public FateTxStore reserve(long tid) { - return new AgeOffFateTxStore(store.reserve(tid)); - } - - @Override - public Optional> tryReserve(long tid) { - return store.tryReserve(tid).map(AgeOffFateTxStore::new); - } - - private class AgeOffFateTxStore extends WrappedFateTxStore { - - private AgeOffFateTxStore(FateTxStore wrapped) { - super(wrapped); - } - - @Override - public void setStatus(FateStore.TStatus status) { - super.setStatus(status); - - switch (status) { - case SUBMITTED: - case IN_PROGRESS: - case FAILED_IN_PROGRESS: - removeCandidate(getID()); - break; - case FAILED: - case SUCCESSFUL: - addCandidate(getID()); - break; - default: - break; - } - } - - @Override - public void delete() { - super.delete(); - removeCandidate(getID()); - } - } - - @Override - public ReadOnlyFateTxStore read(long tid) { - return store.read(tid); - } - - @Override - public List list() { - return store.list(); - } - - @Override - public Iterator runnable(AtomicBoolean keepWaiting) { - return store.runnable(keepWaiting); - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index a54ad734ee7..10e14db08e1 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -41,6 +41,7 @@ import java.util.concurrent.TransferQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.function.Supplier; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -48,6 +49,7 @@ import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; +import org.apache.accumulo.core.manager.PartitionData; import org.apache.accumulo.core.util.ShutdownUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -72,6 +74,7 @@ public class Fate { private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); private final AtomicBoolean keepRunning = new AtomicBoolean(true); + private final Supplier partitionDataSupplier; private final TransferQueue workQueue; private final Thread workFinder; @@ -79,6 +82,22 @@ public enum TxInfo { TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE } + // TODO add a task that periodically looks for fate task reserved by dead instances, only run in + // partition zero + private class LockCleaner implements Runnable { + + @Override + public void run() { + while (keepRunning.get()) { + var partitionData = partitionDataSupplier.get(); + if (partitionData.shouldRun(PartitionData.SingletonManagerService.FATE_LOCK_CLEANUP)) { + // run cleanup + } + // sleep a long time + } + } + } + /** * A single thread that finds transactions to work on and queues them up. Do not want each worker * thread going to the store and looking for work as it would place more load on the store. @@ -89,12 +108,14 @@ private class WorkFinder implements Runnable { public void run() { while (keepRunning.get()) { try { - var iter = store.runnable(keepRunning); + PartitionData partitionData = partitionDataSupplier.get(); + var iter = store.runnable(keepRunning, partitionData); - while (iter.hasNext() && keepRunning.get()) { + while (iter.hasNext() && keepRunning.get() + && partitionData.equals(partitionDataSupplier.get())) { Long txid = iter.next(); try { - while (keepRunning.get()) { + while (keepRunning.get() && partitionData.equals(partitionDataSupplier.get())) { // The reason for calling transfer instead of queueing is avoid rescanning the // storage layer and adding the same thing over and over. For example if all threads // were busy, the queue size was 100, and there are three runnable things in the @@ -144,7 +165,7 @@ private Optional> reserveFateTx() throws InterruptedException { public void run() { while (keepRunning.get()) { long deferTime = 0; - FateTxStore txStore = null; + FateStore.FateTxStore txStore = null; try { var optionalopStore = reserveFateTx(); if (optionalopStore.isPresent()) { @@ -296,9 +317,10 @@ private void undo(long tid, Repo op) { * @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging */ public Fate(T environment, FateStore store, Function,String> toLogStrFunc, - AccumuloConfiguration conf) { + Supplier partitionDataSupplier, AccumuloConfiguration conf) { this.store = FateLogger.wrap(store, toLogStrFunc); this.environment = environment; + this.partitionDataSupplier = partitionDataSupplier; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true); this.workQueue = new LinkedTransferQueue<>(); @@ -415,6 +437,7 @@ public boolean cancel(long tid) { // resource cleanup public void delete(long tid) { + // TODO need to handle case of not existing FateTxStore txStore = store.reserve(tid); try { switch (txStore.getStatus()) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 7db5766e81b..6e0b3620018 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -21,13 +21,16 @@ import java.io.Serializable; import java.util.Optional; +import org.apache.accumulo.core.manager.PartitionData; + /** - * Transaction Store: a place to save transactions + * FatesStore : a place to store fate data for all fate operations. * - * A transaction consists of a number of operations. To use, first create a transaction id, and then - * seed the transaction with an initial operation. An executor service can then execute the - * transaction's operation, possibly pushing more operations onto the transaction as each step - * successfully completes. If a step fails, the stack can be unwound, undoing each operation. + * A fate operation consists of a number of smaller idempotent operations. To use, first create a + * transaction id, and then seed the transaction with an initial operation. An executor service can + * then execute the transaction's operation, possibly pushing more operations onto the transaction + * as each step successfully completes. If a step fails, the stack can be unwound, undoing each + * operation. */ public interface FateStore extends ReadOnlyFateStore { @@ -42,6 +45,7 @@ public interface FateStore extends ReadOnlyFateStore { * An interface that allows read/write access to the data related to a single fate operation. */ interface FateTxStore extends ReadOnlyFateTxStore { + @Override Repo top(); @@ -74,7 +78,6 @@ interface FateTxStore extends ReadOnlyFateTxStore { /** * Remove the transaction from the store. - * */ void delete(); @@ -85,7 +88,8 @@ interface FateTxStore extends ReadOnlyFateTxStore { * longer interact with it. * * @param deferTime time in millis to keep this transaction from being returned by - * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative. + * {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, PartitionData)}. Must be + * non-negative. */ void unreserve(long deferTime); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index f0140de3674..a8014298be1 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.manager.PartitionData; + /** * Read only access to a Transaction Store. * @@ -35,6 +37,7 @@ public interface ReadOnlyFateStore { /** * Possible operational status codes. Serialized by name within stores. */ + // TODO rename to FateTxStatus enum TStatus { /** Unseeded transaction */ NEW, @@ -124,10 +127,12 @@ interface ReadOnlyFateTxStore { */ List list(); + // TODO need to handle partitionDataChanging, probably pass a supplier /** * @return an iterator over fate op ids that are (IN_PROGRESS or FAILED_IN_PROGRESS) and * unreserved. This method will block until it finds something that is runnable or until - * the keepWaiting parameter is false. + * the keepWaiting parameter is false. Also filter the transaction using the partitioning + * data so that each fate instance sees a different subset of all fate transactions. */ - Iterator runnable(AtomicBoolean keepWaiting); + Iterator runnable(AtomicBoolean keepWaiting, PartitionData partitionData); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 38071ef182e..a41dc1bfb9f 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -21,6 +21,8 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; import java.io.ByteArrayInputStream; @@ -34,18 +36,23 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; -import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.manager.PartitionData; import org.apache.accumulo.core.util.FastFormat; +import org.apache.accumulo.core.util.Retry; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -65,7 +72,7 @@ public class ZooStore implements FateStore { private static final Logger log = LoggerFactory.getLogger(ZooStore.class); private String path; private ZooReaderWriter zk; - private Set reserved; + private Map defered; // This is incremented each time a transaction was unreserved that was non new @@ -74,6 +81,43 @@ public class ZooStore implements FateStore { // This is incremented each time a transaction is unreserved that was runnable private final SignalCount unreservedRunnableCount = new SignalCount(); + // The zookeeper lock for the process that running this store instance. + private final ZooUtil.LockID lockID; + + // TODO this was hastily written code for serialization. Should it use json? It needs better + // validation of data being serialized (like validate status, lock, and uuid). + private static class NodeValue { + final TStatus status; + final String lock; + final String uuid; + + private NodeValue(byte[] serializedData) { + var fields = new String(serializedData, UTF_8).split(":", 3); + this.status = TStatus.valueOf(fields[0]); + this.lock = fields[1]; + this.uuid = fields[2]; + } + + private NodeValue(TStatus status, String lock, String uuid) { + if (lock.isEmpty() && !uuid.isEmpty() || uuid.isEmpty() && !lock.isEmpty()) { + throw new IllegalArgumentException( + "For lock and uuid expect neither is empty or both are empty. lock:'" + lock + + "' uuid:'" + uuid + "'"); + } + this.status = status; + this.lock = lock; + this.uuid = uuid; + } + + byte[] serialize() { + return (status.name() + ":" + lock + ":" + uuid).getBytes(UTF_8); + } + + public boolean isReserved() { + return !lock.isEmpty(); + } + } + private byte[] serialize(Object o) { try { @@ -111,12 +155,13 @@ private long parseTid(String txdir) { return Long.parseLong(txdir.split("_")[1], 16); } - public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, InterruptedException { + public ZooStore(String path, ZooReaderWriter zk, ZooUtil.LockID lockID) + throws KeeperException, InterruptedException { this.path = path; this.zk = zk; - this.reserved = new HashSet<>(); - this.defered = new HashMap<>(); + this.defered = Collections.synchronizedMap(new HashMap<>()); + this.lockID = lockID; zk.putPersistentData(path, new byte[0], NodeExistsPolicy.SKIP); } @@ -124,7 +169,9 @@ public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, Interru /** * For testing only */ - ZooStore() {} + ZooStore() { + lockID = null; + } @Override public long create() { @@ -132,7 +179,7 @@ public long create() { try { // looking at the code for SecureRandom, it appears to be thread safe long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; - zk.putPersistentData(getTXPath(tid), TStatus.NEW.name().getBytes(UTF_8), + zk.putPersistentData(getTXPath(tid), new NodeValue(TStatus.NEW, "", "").serialize(), NodeExistsPolicy.FAIL); return tid; } catch (NodeExistsException nee) { @@ -145,19 +192,23 @@ public long create() { @Override public FateTxStore reserve(long tid) { - synchronized (ZooStore.this) { - while (reserved.contains(tid)) { - try { - ZooStore.this.wait(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } - } + var retry = + Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS).incrementBy(25, MILLISECONDS) + .maxWait(30, SECONDS).backOffFactor(1.5).logInterval(3, MINUTES).createRetry(); - reserved.add(tid); - return new FateTxStoreImpl(tid, true); + var resFateOp = tryReserve(tid); + while (resFateOp.isEmpty()) { + try { + retry.waitForNextAttempt(log, "Attempting to reserve " + FateTxId.formatTid(tid)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalArgumentException(e); + } + resFateOp = tryReserve(tid); } + + retry.logCompletion(log, "Attempting to reserve " + FateTxId.formatTid(tid)); + return resFateOp.orElseThrow(); } /** @@ -168,24 +219,75 @@ public FateTxStore reserve(long tid) { */ @Override public Optional> tryReserve(long tid) { - synchronized (this) { - if (!reserved.contains(tid)) { - return Optional.of(reserve(tid)); + + // uniquely identify this attempt to reserve the fate operation data + var uuid = UUID.randomUUID(); + + try { + byte[] newValue = zk.mutateExisting(getTXPath(tid), currentValue -> { + var nodeVal = new NodeValue(currentValue); + // The uuid handle the case where there was a ZK server fault and the write for this thread + // went through but that was not acknowledged and we are reading our own write for 2nd time. + if (!nodeVal.isReserved() || nodeVal.uuid.equals(uuid.toString())) { + return new NodeValue(nodeVal.status, lockID.serialize(""), uuid.toString()).serialize(); + } else { + // returning null will not change the value AND the null will be returned + return null; + } + }); + + if (newValue != null) { + // clear any deferment if it exists, it may or may not be set when its unreserved + defered.remove(tid); + return Optional.of(new FateTxStoreImpl(tid, uuid)); + } else { + return Optional.empty(); } + } catch (NoNodeException e) { + log.trace("Fate op does not exists so can not reserve", e); return Optional.empty(); + } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { + throw new IllegalStateException(e); } } private class FateTxStoreImpl implements FateTxStore { private final long tid; - private final boolean isReserved; + private UUID reservationUUID; + private boolean deleted = false; private TStatus observedStatus = null; - private FateTxStoreImpl(long tid, boolean isReserved) { + private FateTxStoreImpl(long tid) { this.tid = tid; - this.isReserved = isReserved; + this.reservationUUID = null; + } + + private FateTxStoreImpl(long tid, UUID uuid) { + this.tid = tid; + this.reservationUUID = Objects.requireNonNull(uuid); + } + + private void unreserve() { + Preconditions.checkState(reservationUUID != null); + try { + if (!deleted) { + zk.mutateExisting(getTXPath(tid), currentValue -> { + var nodeVal = new NodeValue(currentValue); + if (nodeVal.uuid.equals(reservationUUID.toString())) { + return new NodeValue(nodeVal.status, "", "").serialize(); + } else { + // possible this is running a 2nd time in zk server fault conditions and its first + // write went through + return null; + } + }); + } + reservationUUID = null; + } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { + throw new IllegalStateException(e); + } } @Override @@ -195,20 +297,13 @@ public void unreserve(long deferTime) { throw new IllegalArgumentException("deferTime < 0 : " + deferTime); } - synchronized (ZooStore.this) { - if (!reserved.remove(tid)) { - throw new IllegalStateException( - "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); - } - - // notify any threads waiting to reserve - ZooStore.this.notifyAll(); - - if (deferTime > 0) { - defered.put(tid, System.currentTimeMillis() + deferTime); - } + if (deferTime > 0) { + // add to defered before actually unreserving + defered.put(tid, System.currentTimeMillis() + deferTime); } + unreserve(); + if (observedStatus != null && isRunnable(observedStatus)) { unreservedRunnableCount.increment(); } @@ -219,17 +314,9 @@ public void unreserve(long deferTime) { } private void verifyReserved(boolean isWrite) { - if (!isReserved && isWrite) { - throw new IllegalStateException("Attempted write on unreserved FATE transaction."); - } - - if (isReserved) { - synchronized (ZooStore.this) { - if (!reserved.contains(tid)) { - throw new IllegalStateException( - "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); - } - } + if (reservationUUID == null && isWrite) { + throw new IllegalStateException( + "Attempted write on unreserved FATE transaction." + FateTxId.formatTid(getID())); } } @@ -268,6 +355,7 @@ public Repo top() { } private String findTop(String txpath) throws KeeperException, InterruptedException { + verifyReserved(false); List ops = zk.getChildren(txpath); ops = new ArrayList<>(ops); @@ -322,28 +410,60 @@ public void pop() { } } + private TStatus _getStatus() { + verifyReserved(false); + try { + return new NodeValue(zk.getData(getTXPath(tid))).status; + } catch (NoNodeException nne) { + return TStatus.UNKNOWN; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + @Override public TStatus getStatus() { verifyReserved(false); - var status = _getStatus(tid); + var status = _getStatus(); observedStatus = status; - return _getStatus(tid); + return status; } @Override public TStatus waitForStatusChange(EnumSet expected) { - Preconditions.checkState(!isReserved, + Preconditions.checkState(reservationUUID == null, "Attempted to wait for status change while reserved " + FateTxId.formatTid(getID())); - while (true) { - long countBefore = unreservedNonNewCount.getCount(); + verifyReserved(false); + // TODO make the max time a function of the number of concurrent callers, as the number of + // concurrent callers increases then increase the max wait time + // TODO could support signaling within this instance for known events + // TODO made the maxWait low so this would be responsive... that may put a lot of load in the + // case there are lots of things waiting... + var retry = Retry.builder().infiniteRetries().retryAfter(25, MILLISECONDS) + .incrementBy(25, MILLISECONDS).maxWait(1, SECONDS).backOffFactor(1.5) + .logInterval(3, MINUTES).createRetry(); + + while (true) { + // TODO from merge + // long countBefore = unreservedNonNewCount.getCount(); - TStatus status = _getStatus(tid); + TStatus status = _getStatus(); if (expected.contains(status)) { + retry.logCompletion(log, "Waiting on status change for " + FateTxId.formatTid(tid) + + " expected:" + expected + " status:" + status); return status; } + // TODO from merge + // unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); - unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); + try { + retry.waitForNextAttempt(log, "Waiting on status change for " + FateTxId.formatTid(tid) + + " expected:" + expected + " status:" + status); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } } } @@ -352,9 +472,13 @@ public void setStatus(TStatus status) { verifyReserved(true); try { - zk.putPersistentData(getTXPath(tid), status.name().getBytes(UTF_8), - NodeExistsPolicy.OVERWRITE); - } catch (KeeperException | InterruptedException e) { + zk.mutateExisting(getTXPath(tid), currentValue -> { + var nodeVal = new NodeValue(currentValue); + Preconditions.checkState(nodeVal.uuid.equals(reservationUUID.toString()), + "Tried to set status for %s and it was not reserved", FateTxId.formatTid(tid)); + return new NodeValue(status, nodeVal.lock, nodeVal.uuid).serialize(); + }); + } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { throw new IllegalStateException(e); } @@ -364,9 +488,9 @@ public void setStatus(TStatus status) { @Override public void delete() { verifyReserved(true); - try { zk.recursiveDelete(getTXPath(tid), NodeMissingPolicy.SKIP); + deleted = true; } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -430,6 +554,7 @@ public long timeCreated() { @Override public long getID() { + verifyReserved(false); return tid; } @@ -487,7 +612,7 @@ private TStatus _getStatus(long tid) { @Override public ReadOnlyFateTxStore read(long tid) { - return new FateTxStoreImpl(tid, false); + return new FateTxStoreImpl(tid); } @Override @@ -510,41 +635,43 @@ private boolean isRunnable(TStatus status) { } @Override - public Iterator runnable(AtomicBoolean keepWaiting) { - - while (keepWaiting.get()) { - ArrayList runnableTids = new ArrayList<>(); - - final long beforeCount = unreservedRunnableCount.getCount(); + public Iterator runnable(AtomicBoolean keepWaiting, PartitionData partitionData) { + try { + while (keepWaiting.get()) { - try { + long beforeCount = unreservedRunnableCount.getCount(); + ArrayList runnableTids = new ArrayList<>(); List transactions = zk.getChildren(path); - for (String txidStr : transactions) { - long txid = parseTid(txidStr); - if (isRunnable(_getStatus(txid))) { - runnableTids.add(txid); + for (String txid : transactions) { + try { + var nodeVal = new NodeValue(zk.getData(path + "/" + txid)); + var tid = parseTid(txid); + if (!nodeVal.isReserved() && isRunnable(nodeVal.status) + && tid % partitionData.getTotalInstances() == partitionData.getPartition()) { + runnableTids.add(tid); + } + } catch (NoNodeException nne) { + // expected race condition that node could be deleted after getting list of children + log.trace("Skipping missing node {}", txid); } } - synchronized (this) { - runnableTids.removeIf(txid -> { - var deferedTime = defered.get(txid); - if (deferedTime != null) { - if (deferedTime < System.currentTimeMillis()) { - return true; - } else { - defered.remove(txid); - } - } + // Anything that is not runnable in this partition should be removed from the defered set + defered.keySet().retainAll(runnableTids); - if (reserved.contains(txid)) { + // Filter out any transactions that are not read to run because of deferment + runnableTids.removeIf(runnableTid -> { + var deferedTime = defered.get(runnableTid); + if (deferedTime != null) { + if (deferedTime < System.currentTimeMillis()) { return true; + } else { + defered.remove(runnableTid); } - - return false; - }); - } + } + return false; + }); if (runnableTids.isEmpty()) { if (beforeCount == unreservedRunnableCount.getCount()) { @@ -562,10 +689,9 @@ public Iterator runnable(AtomicBoolean keepWaiting) { } else { return runnableTids.iterator(); } - - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); } + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); } return List.of().iterator(); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java index 47d906fedb4..8eb4e17d62c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java @@ -85,7 +85,6 @@ public LockID(String path, String node, long eid) { } public String serialize(String root) { - return path.substring(root.length()) + "/" + node + "$" + Long.toHexString(eid); } diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index d85e417650a..4ff73bd1173 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; import org.apache.accumulo.core.fate.WrappedFateTxStore; +import org.apache.accumulo.core.manager.PartitionData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +105,11 @@ public FateTxStore reserve(long tid) { return new LoggingFateTxStore<>(store.reserve(tid), toLogString); } + @Override + public long create() { + return store.create(); + } + @Override public Optional> tryReserve(long tid) { return store.tryReserve(tid).map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString)); @@ -120,18 +126,10 @@ public List list() { } @Override - public Iterator runnable(AtomicBoolean keepWaiting) { - return store.runnable(keepWaiting); + public Iterator runnable(AtomicBoolean keepWaiting, PartitionData partitionData) { + return store.runnable(keepWaiting, partitionData); } - @Override - public long create() { - long tid = store.create(); - if (storeLog.isTraceEnabled()) { - storeLog.trace("{} created fate transaction", formatTid(tid)); - } - return tid; - } }; } } diff --git a/core/src/main/java/org/apache/accumulo/core/manager/PartitionData.java b/core/src/main/java/org/apache/accumulo/core/manager/PartitionData.java new file mode 100644 index 00000000000..2f9fcfe7ce9 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/manager/PartitionData.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.manager; + +import java.util.Objects; + +import com.google.common.base.Preconditions; + +/** + * This class is used by functional services inside the manager process to determine what action + * they should take when there are multiple active manager processes. + */ +public class PartitionData { + + /** + * Functional services within the manager that should only run in a single manager process. + */ + public enum SingletonManagerService { + FATE_LOCK_CLEANUP + } + + private final int partition; + private final int totalInstances; + + public PartitionData(int partition, int totalInstances) { + Preconditions.checkArgument(partition >= 0 && partition < totalInstances, + "Incorrect partition data, partition:%s totalInstances:%s", partition, totalInstances); + this.partition = partition; + this.totalInstances = totalInstances; + } + + /** + * Each manager instance in a cluster is assigned a unique partition number in the range + * [0,totalInstances). This method returns that unique partition number. + */ + public int getPartition() { + return partition; + } + + /** + * @return the total number of manager instances in a cluster. + */ + public int getTotalInstances() { + return totalInstances; + } + + @Override + public boolean equals(Object o) { + if (o instanceof PartitionData) { + var opd = (PartitionData) o; + return partition == opd.partition && totalInstances == opd.totalInstances; + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hash(partition, totalInstances); + } + + @Override + public String toString() { + return partition + " " + totalInstances; + } + + /** + * Determines if a singleton manager service should run on this partition + */ + public boolean shouldRun(SingletonManagerService service) { + // The following should spread singleton manager services evenly over multiple manager + // processes, rather than running them all on partition zero. Its important that all + // partitions come to the same decision. + return (getPartition() + service.ordinal()) % getTotalInstances() == 0; + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java deleted file mode 100644 index d2530ce1f3a..00000000000 --- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.HashSet; -import java.util.Set; - -import org.apache.accumulo.core.fate.AgeOffStore.TimeSource; -import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; -import org.apache.zookeeper.KeeperException; -import org.junit.jupiter.api.Test; - -public class AgeOffStoreTest { - - private static class TestTimeSource implements TimeSource { - long time = 0; - - @Override - public long currentTimeMillis() { - return time; - } - - } - - @Test - public void testBasic() throws InterruptedException, KeeperException { - - TestTimeSource tts = new TestTimeSource(); - TestStore testStore = new TestStore(); - AgeOffStore aoStore = new AgeOffStore<>(testStore, 10, tts); - - aoStore.ageOff(); - - long txid1 = aoStore.create(); - var txStore1 = aoStore.reserve(txid1); - txStore1.setStatus(TStatus.IN_PROGRESS); - txStore1.unreserve(0); - - aoStore.ageOff(); - - long txid2 = aoStore.create(); - var txStore2 = aoStore.reserve(txid2); - txStore2.setStatus(TStatus.IN_PROGRESS); - txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(0); - - tts.time = 6; - - long txid3 = aoStore.create(); - var txStore3 = aoStore.reserve(txid3); - txStore3.setStatus(TStatus.IN_PROGRESS); - txStore3.setStatus(TStatus.SUCCESSFUL); - txStore3.unreserve(0); - - Long txid4 = aoStore.create(); - - aoStore.ageOff(); - - assertEquals(Set.of(txid1, txid2, txid3, txid4), new HashSet<>(aoStore.list())); - assertEquals(4, new HashSet<>(aoStore.list()).size()); - - tts.time = 15; - - aoStore.ageOff(); - - assertEquals(Set.of(txid1, txid3, txid4), new HashSet<>(aoStore.list())); - assertEquals(3, new HashSet<>(aoStore.list()).size()); - - tts.time = 30; - - aoStore.ageOff(); - - assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); - assertEquals(1, Set.of(aoStore.list()).size()); - } - - @Test - public void testNonEmpty() throws InterruptedException, KeeperException { - // test age off when source store starts off non empty - - TestTimeSource tts = new TestTimeSource(); - TestStore testStore = new TestStore(); - long txid1 = testStore.create(); - var txStore1 = testStore.reserve(txid1); - txStore1.setStatus(TStatus.IN_PROGRESS); - txStore1.unreserve(0); - - long txid2 = testStore.create(); - var txStore2 = testStore.reserve(txid2); - txStore2.setStatus(TStatus.IN_PROGRESS); - txStore2.setStatus(TStatus.FAILED); - txStore2.unreserve(0); - - long txid3 = testStore.create(); - var txStore3 = testStore.reserve(txid3); - txStore3.setStatus(TStatus.IN_PROGRESS); - txStore3.setStatus(TStatus.SUCCESSFUL); - txStore3.unreserve(0); - - Long txid4 = testStore.create(); - - AgeOffStore aoStore = new AgeOffStore<>(testStore, 10, tts); - - assertEquals(Set.of(txid1, txid2, txid3, txid4), new HashSet<>(aoStore.list())); - assertEquals(4, new HashSet<>(aoStore.list()).size()); - - aoStore.ageOff(); - - assertEquals(Set.of(txid1, txid2, txid3, txid4), new HashSet<>(aoStore.list())); - assertEquals(4, new HashSet<>(aoStore.list()).size()); - - tts.time = 15; - - aoStore.ageOff(); - - assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); - assertEquals(1, new HashSet<>(aoStore.list()).size()); - - txStore1 = aoStore.reserve(txid1); - txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); - txStore1.unreserve(0); - - tts.time = 30; - - aoStore.ageOff(); - - assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); - assertEquals(1, new HashSet<>(aoStore.list()).size()); - - txStore1 = aoStore.reserve(txid1); - txStore1.setStatus(TStatus.FAILED); - txStore1.unreserve(0); - - aoStore.ageOff(); - - assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); - assertEquals(1, new HashSet<>(aoStore.list()).size()); - - tts.time = 42; - - aoStore.ageOff(); - - assertEquals(0, new HashSet<>(aoStore.list()).size()); - } -} diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java deleted file mode 100644 index 058b0c50a4b..00000000000 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ /dev/null @@ -1,177 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Transient in memory store for transactions. - */ -public class TestStore implements FateStore { - - private long nextId = 1; - private Map statuses = new HashMap<>(); - private Set reserved = new HashSet<>(); - - @Override - public long create() { - statuses.put(nextId, TStatus.NEW); - return nextId++; - } - - @Override - public FateTxStore reserve(long tid) { - if (reserved.contains(tid)) { - throw new IllegalStateException(); // zoo store would wait, but do not expect test to reserve - } - // twice... if test change, then change this - reserved.add(tid); - return new TestFateTxStore(tid); - } - - @Override - public Optional> tryReserve(long tid) { - synchronized (this) { - if (!reserved.contains(tid)) { - reserve(tid); - return Optional.of(new TestFateTxStore(tid)); - } - return Optional.empty(); - } - } - - private class TestFateTxStore implements FateTxStore { - - private final long tid; - - TestFateTxStore(long tid) { - this.tid = tid; - } - - @Override - public Repo top() { - throw new UnsupportedOperationException(); - } - - @Override - public List> getStack() { - throw new UnsupportedOperationException(); - } - - @Override - public TStatus getStatus() { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); - } - - TStatus status = statuses.get(tid); - if (status == null) { - return TStatus.UNKNOWN; - } - return status; - } - - @Override - public TStatus waitForStatusChange(EnumSet expected) { - throw new UnsupportedOperationException(); - } - - @Override - public Serializable getTransactionInfo(Fate.TxInfo txInfo) { - throw new UnsupportedOperationException(); - } - - @Override - public long timeCreated() { - throw new UnsupportedOperationException(); - } - - @Override - public long getID() { - return tid; - } - - @Override - public void push(Repo repo) throws StackOverflowException { - throw new UnsupportedOperationException(); - } - - @Override - public void pop() { - throw new UnsupportedOperationException(); - } - - @Override - public void setStatus(TStatus status) { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); - } - if (!statuses.containsKey(tid)) { - throw new IllegalStateException(); - } - statuses.put(tid, status); - } - - @Override - public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { - throw new UnsupportedOperationException(); - } - - @Override - public void delete() { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); - } - statuses.remove(tid); - } - - @Override - public void unreserve(long deferTime) { - if (!reserved.remove(tid)) { - throw new IllegalStateException(); - } - } - } - - @Override - public ReadOnlyFateTxStore read(long tid) { - throw new UnsupportedOperationException(); - } - - @Override - public List list() { - return new ArrayList<>(statuses.keySet()); - } - - @Override - public Iterator runnable(AtomicBoolean keepWaiting) { - throw new UnsupportedOperationException(); - } - -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 144013dc1eb..dcf3a4a5c62 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -761,7 +761,7 @@ private void executeFateOpsCommand(ServerContext context, FateOpsCommand fateOps var zTableLocksPath = ServiceLock.path(zkRoot + Constants.ZTABLE_LOCKS); String fateZkPath = zkRoot + Constants.ZFATE; ZooReaderWriter zk = context.getZooReaderWriter(); - ZooStore zs = new ZooStore<>(fateZkPath, zk); + ZooStore zs = new ZooStore<>(fateZkPath, zk, null); // TODO if (fateOpsCommand.cancel) { cancelSubmittedFateTxs(context, fateOpsCommand.txList); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index b23e0a6bdb1..f518af069e5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySortedMap; -import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -71,8 +70,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; -import org.apache.accumulo.core.fate.AgeOffStore; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.lock.ServiceLock; @@ -82,6 +81,7 @@ import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor; import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors; import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; +import org.apache.accumulo.core.manager.PartitionData; import org.apache.accumulo.core.manager.balancer.AssignmentParamsImpl; import org.apache.accumulo.core.manager.balancer.BalanceParamsImpl; import org.apache.accumulo.core.manager.balancer.TServerStatusImpl; @@ -257,6 +257,12 @@ public boolean stillManager() { return getManagerState() != ManagerState.STOP; } + // ELASTICITY_TODO implement this and determine behavior when manager does not have lock. Maybe it + // should return Optional, so it can return Optional.empty() when lock is not held? + public PartitionData getParitionData() { + return new PartitionData(0, 1); + } + /** * Retrieve the Fate object, blocking until it is ready. This could cause problems if Fate * operations are attempted to be used prior to the Manager being ready for them. If these @@ -1078,17 +1084,13 @@ boolean canSuspendTablets() { } try { - final AgeOffStore store = new AgeOffStore<>( - new org.apache.accumulo.core.fate.ZooStore<>(getZooKeeperRoot() + Constants.ZFATE, - context.getZooReaderWriter()), - HOURS.toMillis(8), System::currentTimeMillis); + final ZooStore store = new ZooStore<>(getZooKeeperRoot() + Constants.ZFATE, + context.getZooReaderWriter(), managerLock.getLockID()); - Fate f = new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); + Fate f = new Fate<>(this, store, TraceRepo::toLogString, this::getParitionData, + getConfiguration()); fateRef.set(f); fateReadyLatch.countDown(); - - ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() - .scheduleWithFixedDelay(store::ageOff, 63000, 63000, MILLISECONDS)); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException("Exception setting up FaTE cleanup thread", e); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 1fc7dcf4752..ae46d81ed72 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -71,7 +71,7 @@ public FateMetrics(final ServerContext context, final long minimumRefreshDelay) this.refreshDelay = Math.max(DEFAULT_MIN_REFRESH_DELAY, minimumRefreshDelay); try { - this.zooStore = new ZooStore<>(fateRootPath, context.getZooReaderWriter()); + this.zooStore = new ZooStore<>(fateRootPath, context.getZooReaderWriter(), null); // TODO } catch (KeeperException ex) { throw new IllegalStateException( "FATE Metrics - Failed to create zoo store - metrics unavailable", ex); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index 2bcaad0e1bb..ce60b4e5537 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -272,8 +272,9 @@ public UpgradeStatus getStatus() { justification = "Want to immediately stop all manager threads on upgrade error") private void abortIfFateTransactions(ServerContext context) { try { + // TODO maybe have a version of zoostore w/o a lock that fails on ops that need lock final ReadOnlyFateStore fate = new ZooStore<>( - context.getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter()); + context.getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter(), null); if (!fate.list().isEmpty()) { throw new AccumuloException("Aborting upgrade because there are" + " outstanding FATE transactions from a previous Accumulo version." diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java index 5091d0be0b8..4e60b72d112 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java @@ -44,13 +44,14 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.fate.AgeOffStore; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateTxId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.manager.PartitionData; import org.apache.accumulo.manager.Manager; import org.apache.accumulo.manager.tableOps.ManagerRepo; import org.apache.accumulo.manager.tableOps.TraceRepo; @@ -143,13 +144,16 @@ public static void teardown() throws Exception { szk.close(); } + private ZooUtil.LockID createLockID() { + return new ZooUtil.LockID("S1", "N1", 1234); + } + @Test @Timeout(30) public void testTransactionStatus() throws Exception { - final ZooStore zooStore = new ZooStore(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore store = - new AgeOffStore(zooStore, 3000, System::currentTimeMillis); + final ZooStore store = + new ZooStore(ZK_ROOT + Constants.ZFATE, zk, createLockID()); Manager manager = createMock(Manager.class); ServerContext sctx = createMock(ServerContext.class); @@ -161,7 +165,8 @@ public void testTransactionStatus() throws Exception { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate fate = new Fate(manager, store, TraceRepo::toLogString, config); + Fate fate = new Fate(manager, store, TraceRepo::toLogString, + () -> new PartitionData(0, 1), config); try { // Wait for the transaction runner to be scheduled. @@ -207,9 +212,8 @@ public void testTransactionStatus() throws Exception { @Test public void testCancelWhileNew() throws Exception { - final ZooStore zooStore = new ZooStore(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore store = - new AgeOffStore(zooStore, 3000, System::currentTimeMillis); + final ZooStore store = + new ZooStore(ZK_ROOT + Constants.ZFATE, zk, createLockID()); Manager manager = createMock(Manager.class); ServerContext sctx = createMock(ServerContext.class); @@ -221,7 +225,8 @@ public void testCancelWhileNew() throws Exception { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate fate = new Fate(manager, store, TraceRepo::toLogString, config); + Fate fate = new Fate(manager, store, TraceRepo::toLogString, + () -> new PartitionData(0, 1), config); try { // Wait for the transaction runner to be scheduled. @@ -246,9 +251,8 @@ public void testCancelWhileNew() throws Exception { @Test public void testCancelWhileSubmittedAndRunning() throws Exception { - final ZooStore zooStore = new ZooStore(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore store = - new AgeOffStore(zooStore, 3000, System::currentTimeMillis); + final ZooStore store = + new ZooStore(ZK_ROOT + Constants.ZFATE, zk, createLockID()); Manager manager = createMock(Manager.class); ServerContext sctx = createMock(ServerContext.class); @@ -260,7 +264,8 @@ public void testCancelWhileSubmittedAndRunning() throws Exception { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate fate = new Fate(manager, store, TraceRepo::toLogString, config); + Fate fate = new Fate(manager, store, TraceRepo::toLogString, + () -> new PartitionData(0, 1), config); try { // Wait for the transaction runner to be scheduled. @@ -274,13 +279,12 @@ public void testCancelWhileSubmittedAndRunning() throws Exception { assertEquals(NEW, getTxStatus(zk, txid)); fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); assertEquals(SUBMITTED, getTxStatus(zk, txid)); + callStarted.await(); // This is false because the transaction runner has reserved the FaTe // transaction. Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid)); assertFalse(fate.cancel(txid)); - callStarted.await(); finishCall.countDown(); - fate.delete(txid); } finally { fate.shutdown(); } @@ -288,9 +292,8 @@ public void testCancelWhileSubmittedAndRunning() throws Exception { @Test public void testCancelWhileInCall() throws Exception { - final ZooStore zooStore = new ZooStore(ZK_ROOT + Constants.ZFATE, zk); - final AgeOffStore store = - new AgeOffStore(zooStore, 3000, System::currentTimeMillis); + final ZooStore store = + new ZooStore(ZK_ROOT + Constants.ZFATE, zk, createLockID()); Manager manager = createMock(Manager.class); ServerContext sctx = createMock(ServerContext.class); @@ -302,7 +305,8 @@ public void testCancelWhileInCall() throws Exception { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - Fate fate = new Fate(manager, store, TraceRepo::toLogString, config); + Fate fate = new Fate(manager, store, TraceRepo::toLogString, + () -> new PartitionData(0, 1), config); try { // Wait for the transaction runner to be scheduled. @@ -341,7 +345,7 @@ private static TStatus getTxStatus(ZooReaderWriter zrw, long txid) throws KeeperException, InterruptedException { zrw.sync(ZK_ROOT); String txdir = String.format("%s%s/tx_%016x", ZK_ROOT, Constants.ZFATE, txid); - return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8)); + return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8).split(":")[0]); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/MultipleFateInstancesIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/MultipleFateInstancesIT.java new file mode 100644 index 00000000000..a61b5134d67 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/MultipleFateInstancesIT.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.zookeeper; + +import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.ZooStore; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.manager.PartitionData; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.io.TempDir; + +import com.google.common.collect.Sets; + +@Tag(ZOOKEEPER_TESTING_SERVER) +public class MultipleFateInstancesIT { + + @TempDir + private static File tempDir; + private static ZooKeeperTestingServer szk = null; + private static ZooReaderWriter zk; + + private static final String FATE_DIR = "/fate"; + + @BeforeAll + public static void setup() throws Exception { + szk = new ZooKeeperTestingServer(tempDir); + zk = szk.getZooReaderWriter(); + } + + @AfterAll + public static void teardown() throws Exception { + szk.close(); + } + + @Test + @Timeout(30) + public void testZooStores() throws Exception { + ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + final ZooStore zooStore1 = new ZooStore(FATE_DIR, zk, lock1); + final ZooStore zooStore2 = new ZooStore(FATE_DIR, zk, lock2); + + Set allIds = new HashSet<>(); + + AtomicBoolean keepRunning = new AtomicBoolean(true); + + for (int i = 0; i < 100; i++) { + assertTrue(allIds.add(zooStore1.create())); + assertTrue(allIds.add(zooStore2.create())); + } + + var pd1 = new PartitionData(0, 2); + var pd2 = new PartitionData(1, 2); + + for (var txid : allIds) { + if (txid % 2 == 0) { + var rfo = zooStore1.reserve(txid); + assertEquals(ReadOnlyFateStore.TStatus.NEW, rfo.getStatus()); + assertTrue(zooStore2.tryReserve(txid).isEmpty()); + rfo.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED); + rfo.unreserve(0); + } else { + var rfo = zooStore2.reserve(txid); + assertEquals(ReadOnlyFateStore.TStatus.NEW, rfo.getStatus()); + assertTrue(zooStore1.tryReserve(txid).isEmpty()); + rfo.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED); + rfo.unreserve(0); + } + } + + HashSet runnable1 = new HashSet<>(); + zooStore1.runnable(keepRunning, pd1).forEachRemaining(txid -> assertTrue(runnable1.add(txid))); + HashSet runnable2 = new HashSet<>(); + zooStore2.runnable(keepRunning, pd2).forEachRemaining(txid -> assertTrue(runnable2.add(txid))); + + assertFalse(runnable1.isEmpty()); + assertFalse(runnable2.isEmpty()); + assertTrue(Collections.disjoint(runnable1, runnable2)); + assertEquals(allIds, Sets.union(runnable1, runnable2)); + + for (var txid : allIds) { + var rfo = zooStore1.reserve(txid); + assertTrue(zooStore2.tryReserve(txid).isEmpty()); + rfo.delete(); + rfo.unreserve(0); + } + + for (var txid : allIds) { + assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, zooStore1.read(txid).getStatus()); + assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, zooStore2.read(txid).getStatus()); + // TODO this will block forever, should probably throw an exception + // zooStore1.reserve(txid); + + } + } + + public static class TestEnv { + public final Set executedOps = Collections.synchronizedSet(new HashSet<>()); + + } + + public static class TestRepo implements Repo { + + private static final long serialVersionUID = 1L; + + @Override + public long isReady(long tid, TestEnv environment) throws Exception { + return 0; + } + + @Override + public String getName() { + return null; + } + + @Override + public Repo call(long tid, TestEnv environment) throws Exception { + environment.executedOps.add(tid); + return null; + } + + @Override + public void undo(long tid, TestEnv environment) throws Exception { + + } + + @Override + public String getReturn() { + return null; + } + } + + @Test + public void testMultipleFateInstances() throws Exception { + ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + final ZooStore zooStore1 = new ZooStore<>(FATE_DIR, zk, lock1); + final ZooStore zooStore2 = new ZooStore<>(FATE_DIR, zk, lock2); + + TestEnv testEnv1 = new TestEnv(); + TestEnv testEnv2 = new TestEnv(); + + AtomicReference partitionData1 = new AtomicReference<>(new PartitionData(0, 2)); + AtomicReference partitionData2 = new AtomicReference<>(new PartitionData(1, 2)); + + Fate fate1 = new Fate<>(testEnv1, zooStore1, r -> "", partitionData1::get, + DefaultConfiguration.getInstance()); + Fate fate2 = new Fate<>(testEnv2, zooStore2, r -> "", partitionData2::get, + DefaultConfiguration.getInstance()); + + Set submittedIds = new HashSet<>(); + + // submit 100 operations through fate1 instance. Even though all were submitted through a single + // instance, both instances should pick them up and run. + for (int i = 0; i < 100; i++) { + var id = fate1.startTransaction(); + fate1.seedTransaction("op" + i, id, new TestRepo(), true, "test"); + submittedIds.add(id); + } + + assertEquals(100, submittedIds.size()); + + // should be able to wait for completion on any fate instance + for (var id : submittedIds) { + fate2.waitForCompletion(id); + } + + // verify all fate ops ran + assertEquals(submittedIds, Sets.union(testEnv1.executedOps, testEnv2.executedOps)); + // verify each op only ran in one fate instance + assertTrue(Collections.disjoint(testEnv1.executedOps, testEnv2.executedOps)); + // verify both fate instances executed operations + assertFalse(testEnv1.executedOps.isEmpty()); + assertFalse(testEnv2.executedOps.isEmpty()); + + // create a third fate instance + ZooUtil.LockID lock3 = new ZooUtil.LockID("/locks", "L3", 54); + final ZooStore zooStore3 = new ZooStore<>(FATE_DIR, zk, lock3); + TestEnv testEnv3 = new TestEnv(); + AtomicReference partitionData3 = new AtomicReference<>(new PartitionData(2, 3)); + Fate fate3 = new Fate<>(testEnv3, zooStore3, r -> "", partitionData3::get, + DefaultConfiguration.getInstance()); + + // adjust the partition data for the existing fate instances, they should react to this change + partitionData1.set(new PartitionData(0, 3)); + partitionData2.set(new PartitionData(1, 3)); + + // clear the tracking sets from the last batch of operations executed + submittedIds.clear(); + testEnv1.executedOps.clear(); + testEnv2.executedOps.clear(); + + // execute another set of 100 operations, all three fate instances should pick these ops up + for (int i = 0; i < 100; i++) { + var id = fate3.startTransaction(); + fate3.seedTransaction("op2." + i, id, new TestRepo(), true, "test"); + submittedIds.add(id); + } + + // should be able to wait for completion on any fate instance + for (var id : submittedIds) { + fate1.waitForCompletion(id); + } + + // verify all fate ops ran + assertEquals(submittedIds, + Sets.union(Sets.union(testEnv1.executedOps, testEnv2.executedOps), testEnv3.executedOps)); + // verify each op only ran in one fate instance + assertTrue(Collections.disjoint(testEnv1.executedOps, testEnv2.executedOps)); + assertTrue(Collections.disjoint(testEnv1.executedOps, testEnv3.executedOps)); + assertTrue(Collections.disjoint(testEnv2.executedOps, testEnv3.executedOps)); + // verify all fate instances executed operations + assertFalse(testEnv1.executedOps.isEmpty()); + assertFalse(testEnv2.executedOps.isEmpty()); + assertFalse(testEnv3.executedOps.isEmpty()); + + fate1.shutdown(); + fate2.shutdown(); + fate3.shutdown(); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index 2283132c3f7..eae14b32675 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -254,7 +254,8 @@ public void getFateStatus() { InstanceId instanceId = context.getInstanceID(); ZooReaderWriter zk = context.getZooReader().asWriter(secret); - ZooStore zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk); + ZooStore zs = + new ZooStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, createLockID()); var lockPath = ServiceLock.path(ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + tableId); @@ -344,7 +345,8 @@ private boolean lookupFateInZookeeper(final String tableName) throws KeeperExcep InstanceId instanceId = context.getInstanceID(); ZooReaderWriter zk = context.getZooReader().asWriter(secret); - ZooStore zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk); + ZooStore zs = + new ZooStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, createLockID()); var lockPath = ServiceLock.path(ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + tableId); AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk, lockPath, null, null); @@ -502,4 +504,8 @@ public void multipleCompactions() { }); } + + private ZooUtil.LockID createLockID() { + return new ZooUtil.LockID("S1", "N1", 1234); + } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 175533f0c93..df09d626543 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -226,7 +226,7 @@ private static FateStatus getFateStatus(AccumuloCluster cluster) { AdminUtil admin = new AdminUtil<>(false); ServerContext context = cluster.getServerContext(); ZooReaderWriter zk = context.getZooReaderWriter(); - ZooStore zs = new ZooStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zk); + ZooStore zs = new ZooStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zk, null); // TODO var lockPath = ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTABLE_LOCKS); return admin.getStatus(zs, zk, lockPath, null, null); } catch (KeeperException | InterruptedException e) {