diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 6b4377cef67..96d5805e6e9 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -28,9 +28,9 @@ import java.io.Serializable; import java.io.UncheckedIOException; import java.time.Duration; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -38,13 +38,17 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.util.CountDownTimer; -import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.Retry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,27 +72,37 @@ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { } }; - protected final Set reserved; + // The ZooKeeper lock for the process that's running this store instance + protected final ZooUtil.LockID lockID; + protected final Predicate isLockHeld; protected final Map deferred; + protected final FateIdGenerator fateIdGenerator; private final int maxDeferred; private final AtomicBoolean deferredOverflow = new AtomicBoolean(); - private final FateIdGenerator fateIdGenerator; - - // This is incremented each time a transaction was unreserved that was non new - protected final SignalCount unreservedNonNewCount = new SignalCount(); // This is incremented each time a transaction is unreserved that was runnable - protected final SignalCount unreservedRunnableCount = new SignalCount(); + private final SignalCount unreservedRunnableCount = new SignalCount(); + + // Keeps track of the number of concurrent callers to waitForStatusChange() + private final AtomicInteger concurrentStatusChangeCallers = new AtomicInteger(0); public AbstractFateStore() { - this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + this(createDummyLockID(), null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } - public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) { + public AbstractFateStore(ZooUtil.LockID lockID, Predicate isLockHeld) { + this(lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + } + + public AbstractFateStore(ZooUtil.LockID lockID, Predicate isLockHeld, + int maxDeferred, FateIdGenerator fateIdGenerator) { this.maxDeferred = maxDeferred; this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator); - this.reserved = new HashSet<>(); - this.deferred = new HashMap<>(); + this.deferred = Collections.synchronizedMap(new HashMap<>()); + this.lockID = Objects.requireNonNull(lockID); + // If the store is used for a Fate which runs a dead reservation cleaner, + // this should be non-null, otherwise null is fine + this.isLockHeld = isLockHeld; } public static byte[] serialize(Object o) { @@ -115,38 +129,26 @@ public static Object deserialize(byte[] ser) { } } - /** - * Attempt to reserve the fate transaction. - * - * @param fateId The FateId - * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or - * an empty Optional if the transaction was already reserved. - */ - @Override - public Optional> tryReserve(FateId fateId) { - synchronized (this) { - if (!reserved.contains(fateId)) { - return Optional.of(reserve(fateId)); - } - return Optional.empty(); - } - } - @Override public FateTxStore reserve(FateId fateId) { - synchronized (AbstractFateStore.this) { - while (reserved.contains(fateId)) { - try { - AbstractFateStore.this.wait(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException(e); - } + var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25)) + .incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(30)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); + Optional> reserveAttempt = tryReserve(fateId); + while (reserveAttempt.isEmpty()) { + Preconditions.checkState(!_getStatus(fateId).equals(TStatus.UNKNOWN), + "Attempted to reserve a tx that does not exist: " + fateId); + try { + retry.waitForNextAttempt(log, "Attempting to reserve " + fateId); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalArgumentException(e); } - - reserved.add(fateId); - return newFateTxStore(fateId, true); + reserveAttempt = tryReserve(fateId); } + retry.logCompletion(log, "Attempting to reserve " + fateId); + + return reserveAttempt.orElseThrow(); } private static final Set IN_PROGRESS_SET = Set.of(TStatus.IN_PROGRESS); @@ -168,21 +170,20 @@ public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { // first var transactions = Stream.concat(inProgress, other); transactions.filter(fateIdStatus -> isRunnable(fateIdStatus.getStatus())) - .map(FateIdStatus::getFateId).filter(fateId -> { - synchronized (AbstractFateStore.this) { - var deferredTime = deferred.get(fateId); - if (deferredTime != null) { - if (deferredTime.isExpired()) { - deferred.remove(fateId); - } else { - return false; - } + .filter(fateIdStatus -> { + var fateId = fateIdStatus.getFateId(); + var deferredTime = deferred.get(fateId); + if (deferredTime != null) { + if (deferredTime.isExpired()) { + deferred.remove(fateId); + } else { + return false; } - return !reserved.contains(fateId); } - }).forEach(fateId -> { + return fateIdStatus.getFateReservation().isEmpty(); + }).forEach(fateIdStatus -> { seen.incrementAndGet(); - idConsumer.accept(fateId); + idConsumer.accept(fateIdStatus.getFateId()); }); } @@ -192,12 +193,10 @@ public void runnable(AtomicBoolean keepWaiting, Consumer idConsumer) { if (seen.get() == 0) { if (beforeCount == unreservedRunnableCount.getCount()) { long waitTime = 5000; - synchronized (AbstractFateStore.this) { - if (!deferred.isEmpty()) { - waitTime = deferred.values().stream() - .mapToLong(countDownTimer -> countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min() - .getAsLong(); - } + if (!deferred.isEmpty()) { + waitTime = deferred.values().stream() + .mapToLong(countDownTimer -> countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min() + .getAsLong(); } if (waitTime > 0) { @@ -230,7 +229,13 @@ public Stream list(Set statuses) { @Override public ReadOnlyFateTxStore read(FateId fateId) { - return newFateTxStore(fateId, false); + return newUnreservedFateTxStore(fateId); + } + + @Override + public Map getActiveReservations() { + return list().filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors + .toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow())); } protected boolean isRunnable(TStatus status) { @@ -261,190 +266,120 @@ public int getDeferredCount() { // This method is primarily used right now for unit testing but // if this synchronization becomes an issue we could add an atomic // counter instead to track it separately so we don't need to lock - synchronized (AbstractFateStore.this) { - return deferred.size(); - } - } - - private Optional create(FateKey fateKey) { - FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey); - - try { - create(fateId, fateKey); - } catch (IllegalStateException e) { - Pair> statusAndKey = getStatusAndKey(fateId); - TStatus status = statusAndKey.getFirst(); - Optional tFateKey = statusAndKey.getSecond(); - - // Case 1: Status is NEW so this is unseeded, we can return and allow the calling code - // to reserve/seed as long as the existing key is the same and not different as that would - // mean a collision - if (status == TStatus.NEW) { - Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing from tid %s", - fateId.getTxUUIDStr()); - Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()), - "Collision detected for tid %s", fateId.getTxUUIDStr()); - // Case 2: Status is some other state which means already in progress - // so we can just log and return empty optional - } else { - log.trace("Existing transaction {} already exists for key {} with status {}", fateId, - fateKey, status); - return Optional.empty(); - } - } - - return Optional.of(fateId); + return deferred.size(); } - @Override - public Optional> createAndReserve(FateKey fateKey) { - FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey); - final Optional> txStore; - - // First make sure we can reserve in memory the fateId, if not - // we can return an empty Optional as it is reserved and in progress - // This reverses the usual order of creation and then reservation but - // this prevents a race condition by ensuring we can reserve first. - // This will create the FateTxStore before creation but this object - // is not exposed until after creation is finished so there should not - // be any errors. - final Optional> reservedTxStore; - synchronized (this) { - reservedTxStore = tryReserve(fateId); - } - - // If present we were able to reserve so try and create - if (reservedTxStore.isPresent()) { - try { - var fateIdFromCreate = create(fateKey); - if (fateIdFromCreate.isPresent()) { - Preconditions.checkState(fateId.equals(fateIdFromCreate.orElseThrow()), - "Transaction creation returned unexpected %s, expected %s", fateIdFromCreate, fateId); - txStore = reservedTxStore; - } else { - // We already exist in a non-new state then un-reserve and an empty - // Optional will be returned. This is expected to happen when the - // system is busy and operations are not running, and we keep seeding them - synchronized (this) { - reserved.remove(fateId); - } - txStore = Optional.empty(); - } - } catch (Exception e) { - // Clean up the reservation if the creation failed - // And then throw error - synchronized (this) { - reserved.remove(fateId); - } - if (e instanceof IllegalStateException) { - throw e; - } else { - throw new IllegalStateException(e); - } - } - } else { - // Could not reserve so return empty - log.trace("Another thread currently has transaction {} key {} reserved", fateId, fateKey); - txStore = Optional.empty(); - } - - return txStore; + protected void verifyFateKey(FateId fateId, Optional fateKeySeen, + FateKey fateKeyExpected) { + Preconditions.checkState(fateKeySeen.isPresent(), "fate key is missing from fate id " + fateId); + Preconditions.checkState(fateKeySeen.orElseThrow().equals(fateKeyExpected), + "Collision detected for fate id " + fateId); } - protected abstract void create(FateId fateId, FateKey fateKey); - - protected abstract Pair> getStatusAndKey(FateId fateId); - protected abstract Stream getTransactions(Set statuses); protected abstract TStatus _getStatus(FateId fateId); protected abstract Optional getKey(FateId fateId); - protected abstract FateTxStore newFateTxStore(FateId fateId, boolean isReserved); - - protected abstract FateInstanceType getInstanceType(); + protected abstract FateTxStore newUnreservedFateTxStore(FateId fateId); protected abstract class AbstractFateTxStoreImpl implements FateTxStore { protected final FateId fateId; - protected final boolean isReserved; + protected boolean deleted; + protected FateReservation reservation; protected TStatus observedStatus = null; - protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) { + protected AbstractFateTxStoreImpl(FateId fateId) { + this.fateId = fateId; + this.deleted = false; + this.reservation = null; + } + + protected AbstractFateTxStoreImpl(FateId fateId, FateReservation reservation) { this.fateId = fateId; - this.isReserved = isReserved; + this.deleted = false; + this.reservation = Objects.requireNonNull(reservation); + } + + protected boolean isReserved() { + return this.reservation != null; } @Override public TStatus waitForStatusChange(EnumSet expected) { - Preconditions.checkState(!isReserved, - "Attempted to wait for status change while reserved " + fateId); - while (true) { + Preconditions.checkState(!isReserved(), + "Attempted to wait for status change while reserved: " + fateId); + verifyReserved(false); - long countBefore = unreservedNonNewCount.getCount(); + int currNumCallers = concurrentStatusChangeCallers.incrementAndGet(); - TStatus status = _getStatus(fateId); - if (expected.contains(status)) { - return status; - } + try { + var retry = Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(25)) + .incrementBy(Duration.ofMillis(25)).maxWait(Duration.ofSeconds(currNumCallers)) + .backOffFactor(1.5).logInterval(Duration.ofMinutes(3)).createRetry(); + + while (true) { + + TStatus status = _getStatus(fateId); + if (expected.contains(status)) { + retry.logCompletion(log, "Waiting on status change for " + fateId + " expected:" + + expected + " status:" + status); + return status; + } - unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () -> true); + try { + retry.waitForNextAttempt(log, "Waiting on status change for " + fateId + " expected:" + + expected + " status:" + status); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + } finally { + concurrentStatusChangeCallers.decrementAndGet(); } } @Override public void unreserve(Duration deferTime) { + Preconditions.checkState(isReserved(), + "Attempted to unreserve a transaction that was not reserved: " + fateId); if (deferTime.isNegative()) { throw new IllegalArgumentException("deferTime < 0 : " + deferTime); } - synchronized (AbstractFateStore.this) { - if (!reserved.remove(fateId)) { - throw new IllegalStateException("Tried to unreserve id that was not reserved " + fateId); - } - - // notify any threads waiting to reserve - AbstractFateStore.this.notifyAll(); - - // If deferred map has overflowed then skip adding to the deferred map - // and clear the map and set the flag. This will cause the next execution - // of runnable to process all the transactions and to not defer as we - // have a large backlog and want to make progress - if (deferTime.compareTo(Duration.ZERO) > 0 && !deferredOverflow.get()) { - if (deferred.size() >= maxDeferred) { - log.info( - "Deferred map overflowed with size {}, clearing and setting deferredOverflow to true", - deferred.size()); - deferredOverflow.set(true); - deferred.clear(); - } else { - deferred.put(fateId, CountDownTimer.startNew(deferTime)); - } + // If deferred map has overflowed then skip adding to the deferred map + // and clear the map and set the flag. This will cause the next execution + // of runnable to process all the transactions and to not defer as we + // have a large backlog and want to make progress + if (deferTime.compareTo(Duration.ZERO) > 0 && !isDeferredOverflow()) { + if (deferred.size() >= maxDeferred) { + log.info( + "Deferred map overflowed with size {}, clearing and setting deferredOverflow to true", + deferred.size()); + deferredOverflow.set(true); + deferred.clear(); + } else { + deferred.put(fateId, CountDownTimer.startNew(deferTime)); } } + unreserve(); + if (observedStatus != null && isRunnable(observedStatus)) { unreservedRunnableCount.increment(); } - - if (observedStatus != TStatus.NEW) { - unreservedNonNewCount.increment(); - } } - protected void verifyReserved(boolean isWrite) { - if (!isReserved && isWrite) { - throw new IllegalStateException("Attempted write on unreserved FATE transaction."); - } + protected abstract void unreserve(); - if (isReserved) { - synchronized (AbstractFateStore.this) { - if (!reserved.contains(fateId)) { - throw new IllegalStateException("Tried to operate on unreserved transaction " + fateId); - } - } + protected void verifyReserved(boolean isWrite) { + if (!isReserved() && isWrite) { + throw new IllegalStateException( + "Attempted write on unreserved FATE transaction: " + fateId); } } @@ -462,12 +397,6 @@ public Optional getKey() { return AbstractFateStore.this.getKey(fateId); } - @Override - public Pair> getStatusAndKey() { - verifyReserved(false); - return AbstractFateStore.this.getStatusAndKey(fateId); - } - @Override public FateId getID() { return fateId; @@ -502,4 +431,14 @@ protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { throw new IllegalStateException("Bad node data " + txInfo); } } + + /** + * this is a temporary method used to create a dummy lock when using a FateStore outside the + * context of a Manager (one example is testing) so reservations can still be made. + * + * @return a dummy {@link ZooUtil.LockID} + */ + public static ZooUtil.LockID createDummyLockID() { + return new ZooUtil.LockID("/path", "node", 123); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index add5b7cf112..e2d4e7cbe55 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -37,6 +37,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -73,7 +75,8 @@ public class Fate { private final FateStore store; private final T environment; private final ScheduledThreadPoolExecutor fatePoolWatcher; - private final ExecutorService executor; + private final ExecutorService transactionExecutor; + private final ExecutorService deadResCleanerExecutor; private static final EnumSet FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN); @@ -323,13 +326,25 @@ protected Repo executeCall(FateId fateId, Repo op) throws Exception { return next; } + /** + * A thread that finds reservations held by dead processes and unreserves them + */ + private class DeadReservationCleaner implements Runnable { + @Override + public void run() { + if (keepRunning.get()) { + store.deleteDeadReservations(); + } + } + } + /** * Creates a Fault-tolerant executor. * * @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging */ - public Fate(T environment, FateStore store, Function,String> toLogStrFunc, - AccumuloConfiguration conf) { + public Fate(T environment, FateStore store, boolean runDeadResCleaner, + Function,String> toLogStrFunc, AccumuloConfiguration conf) { this.store = FateLogger.wrap(store, toLogStrFunc); this.environment = environment; final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf, @@ -360,7 +375,19 @@ public Fate(T environment, FateStore store, Function,String> toLogStr } } }, 3, SECONDS)); - this.executor = pool; + this.transactionExecutor = pool; + + ScheduledExecutorService deadResCleanerExecutor = null; + if (runDeadResCleaner) { + // Create a dead reservation cleaner for this store that will periodically clean up + // reservations held by dead processes, if they exist. + deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, + store.type() + "-dead-reservation-cleaner-pool"); + ScheduledFuture deadReservationCleaner = deadResCleanerExecutor + .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, SECONDS); + ThreadPools.watchCriticalScheduledTask(deadReservationCleaner); + } + this.deadResCleanerExecutor = deadResCleanerExecutor; this.workFinder = Threads.createThread("Fate work finder", new WorkFinder()); this.workFinder.start(); @@ -530,21 +557,32 @@ public Stream list(FateKey.FateKeyType type) { public void shutdown(long timeout, TimeUnit timeUnit) { if (keepRunning.compareAndSet(true, false)) { fatePoolWatcher.shutdown(); - executor.shutdown(); + transactionExecutor.shutdown(); workFinder.interrupt(); + if (deadResCleanerExecutor != null) { + deadResCleanerExecutor.shutdown(); + } } if (timeout > 0) { long start = System.nanoTime(); while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) - && (workFinder.isAlive() || !executor.isTerminated())) { + && (workFinder.isAlive() || !transactionExecutor.isTerminated() + || (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()))) { try { - if (!executor.awaitTermination(1, SECONDS)) { + if (!transactionExecutor.awaitTermination(1, SECONDS)) { log.debug("Fate {} is waiting for worker threads to terminate", store.type()); continue; } + if (deadResCleanerExecutor != null + && !deadResCleanerExecutor.awaitTermination(1, SECONDS)) { + log.debug("Fate {} is waiting for dead reservation cleaner thread to terminate", + store.type()); + continue; + } + workFinder.join(1_000); if (workFinder.isAlive()) { log.debug("Fate {} is waiting for work finder thread to terminate", store.type()); @@ -555,15 +593,20 @@ public void shutdown(long timeout, TimeUnit timeUnit) { } } - if (workFinder.isAlive() || !executor.isTerminated()) { + if (workFinder.isAlive() || !transactionExecutor.isTerminated() + || (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated())) { log.warn( - "Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} executor:{}", + "Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} transactionExecutor:{} deadResCleanerExecutor:{}", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), store.type(), - workFinder.isAlive(), !executor.isTerminated()); + workFinder.isAlive(), !transactionExecutor.isTerminated(), + (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated())); } } // interrupt the background threads - executor.shutdownNow(); + transactionExecutor.shutdownNow(); + if (deadResCleanerExecutor != null) { + deadResCleanerExecutor.shutdownNow(); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 9aa7dcbbc49..6b7b68baf58 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -18,9 +18,20 @@ */ package org.apache.accumulo.core.fate; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.time.Duration; +import java.util.Arrays; +import java.util.Objects; import java.util.Optional; +import java.util.UUID; + +import org.apache.accumulo.core.fate.user.FateMutatorImpl; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.hadoop.io.DataInputBuffer; /** * Transaction Store: a place to save transactions @@ -107,11 +118,123 @@ interface FateTxStore extends ReadOnlyFateTxStore { void unreserve(Duration deferTime); } + /** + * The value stored to indicate a FATE transaction ID ({@link FateId}) has been reserved + */ + class FateReservation { + + // The LockID (provided by the Manager running the FATE which uses this store) which is used for + // identifying dead Managers, so their reservations can be deleted and picked up again since + // they can no longer be worked on. + private final ZooUtil.LockID lockID; + // The UUID generated on a reservation attempt (tryReserve()) used to uniquely identify that + // attempt. This is useful for the edge case where the reservation is sent to the server + // (Tablet Server for UserFateStore and the ZooKeeper Server for MetaFateStore), but the server + // dies before the store receives the response. It allows us to determine if the reservation + // was successful and was written by this reservation attempt (could have been successfully + // reserved by another attempt or not reserved at all, in which case, we wouldn't want to + // expose a FateTxStore). + private final UUID reservationUUID; + private final byte[] serialized; + + private FateReservation(ZooUtil.LockID lockID, UUID reservationUUID) { + this.lockID = Objects.requireNonNull(lockID); + this.reservationUUID = Objects.requireNonNull(reservationUUID); + this.serialized = serialize(lockID, reservationUUID); + } + + public static FateReservation from(ZooUtil.LockID lockID, UUID reservationUUID) { + return new FateReservation(lockID, reservationUUID); + } + + /** + * @param serializedFateRes the value present in the table for the reservation column + * @return true if the array represents a valid serialized FateReservation object, false if it + * represents an unreserved value, error otherwise + */ + public static boolean isFateReservation(byte[] serializedFateRes) { + if (Arrays.equals(serializedFateRes, FateMutatorImpl.NOT_RESERVED)) { + return false; + } + deserialize(serializedFateRes); + return true; + } + + public ZooUtil.LockID getLockID() { + return lockID; + } + + public UUID getReservationUUID() { + return reservationUUID; + } + + public byte[] getSerialized() { + return serialized; + } + + private static byte[] serialize(ZooUtil.LockID lockID, UUID reservationUUID) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeUTF(lockID.serialize("/")); + dos.writeUTF(reservationUUID.toString()); + dos.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static FateReservation deserialize(byte[] serialized) { + try (DataInputBuffer buffer = new DataInputBuffer()) { + buffer.reset(serialized, serialized.length); + ZooUtil.LockID lockID = new ZooUtil.LockID("", buffer.readUTF()); + UUID reservationUUID = UUID.fromString(buffer.readUTF()); + return new FateReservation(lockID, reservationUUID); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static boolean locksAreEqual(ZooUtil.LockID lockID1, ZooUtil.LockID lockID2) { + return lockID1.serialize("/").equals(lockID2.serialize("/")); + } + + @Override + public String toString() { + return lockID.serialize("/") + ":" + reservationUUID; + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof FateReservation) { + FateReservation other = (FateReservation) obj; + return Arrays.equals(this.getSerialized(), other.getSerialized()); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(lockID, reservationUUID); + } + } + + /** + * Deletes the current reservations which were reserved by a now dead Manager. These reservations + * can no longer be worked on so their reservation should be deleted, so they can be picked up and + * worked on again. + */ + void deleteDeadReservations(); + /** * Attempt to reserve the fate transaction. * * @param fateId The FateId - * @return true if reserved by this call, false if already reserved + * @return An Optional containing the {@link FateTxStore} if the transaction was successfully + * reserved, or an empty Optional if the transaction was not able to be reserved. */ Optional> tryReserve(FateId fateId); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java index 726f5c614c0..d801167d074 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java @@ -30,18 +30,21 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Stream; +import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; -import org.apache.accumulo.core.util.Pair; import org.apache.hadoop.io.DataInputBuffer; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -51,11 +54,11 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.base.Suppliers; //TODO use zoocache? - ACCUMULO-1297 //TODO handle zookeeper being down gracefully - ACCUMULO-1297 - public class MetaFateStore extends AbstractFateStore { private static final Logger log = LoggerFactory.getLogger(MetaFateStore.class); @@ -67,15 +70,16 @@ private String getTXPath(FateId fateId) { return path + "/tx_" + fateId.getTxUUIDStr(); } - public MetaFateStore(String path, ZooReaderWriter zk) - throws KeeperException, InterruptedException { - this(path, zk, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + public MetaFateStore(String path, ZooReaderWriter zk, ZooUtil.LockID lockID, + Predicate isLockHeld) throws KeeperException, InterruptedException { + this(path, zk, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } @VisibleForTesting - public MetaFateStore(String path, ZooReaderWriter zk, int maxDeferred, - FateIdGenerator fateIdGenerator) throws KeeperException, InterruptedException { - super(maxDeferred, fateIdGenerator); + public MetaFateStore(String path, ZooReaderWriter zk, ZooUtil.LockID lockID, + Predicate isLockHeld, int maxDeferred, FateIdGenerator fateIdGenerator) + throws KeeperException, InterruptedException { + super(lockID, isLockHeld, maxDeferred, fateIdGenerator); this.path = path; this.zk = zk; @@ -92,7 +96,7 @@ public FateId create() { while (true) { try { FateId fateId = FateId.from(fateInstanceType, UUID.randomUUID()); - zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW).serialize(), + zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null).serialize(), NodeExistsPolicy.FAIL); return fateId; } catch (NodeExistsException nee) { @@ -104,19 +108,117 @@ public FateId create() { } @Override - protected void create(FateId fateId, FateKey key) { + public Optional> createAndReserve(FateKey fateKey) { + final var reservation = FateReservation.from(lockID, UUID.randomUUID()); + final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + try { - zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, key).serialize(), - NodeExistsPolicy.FAIL); - } catch (KeeperException | InterruptedException e) { + byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId), + new NodeValue(TStatus.NEW, reservation, fateKey).serialize(), currSerNodeVal -> { + // We are only returning a non-null value for the following cases: + // 1) The existing NodeValue for fateId is exactly the same as the value set for the + // node if it doesn't yet exist: + // TStatus = TStatus.NEW, FateReservation = reservation, FateKey = fateKey + // This might occur if there was a ZK server fault and the same write is running a 2nd + // time + // 2) The existing NodeValue for fateId has: + // TStatus = TStatus.NEW, no FateReservation present, FateKey = fateKey + // The fateId is NEW/unseeded and not reserved, so we can allow it to be reserved + // Note: returning null here will not change the value to null but will return null + NodeValue currNodeVal = new NodeValue(currSerNodeVal); + if (currNodeVal.status == TStatus.NEW) { + verifyFateKey(fateId, currNodeVal.fateKey, fateKey); + if (currNodeVal.isReservedBy(reservation)) { + return currSerNodeVal; + } else if (!currNodeVal.isReserved()) { + // NEW/unseeded transaction and not reserved, so we can allow it to be reserved + return new NodeValue(TStatus.NEW, reservation, fateKey).serialize(); + } else { + // NEW/unseeded transaction reserved under a different reservation + return null; + } + } else { + log.trace( + "fate id {} tstatus {} fate key {} is reserved {} " + + "has already been seeded with work (non-NEW status)", + fateId, currNodeVal.status, currNodeVal.fateKey.orElse(null), + currNodeVal.isReserved()); + return null; + } + }); + if (nodeVal != null) { + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } else { + return Optional.empty(); + } + } catch (InterruptedException | KeeperException | AcceptableThriftTableOperationException e) { throw new IllegalStateException(e); } } @Override - protected Pair> getStatusAndKey(FateId fateId) { - final NodeValue node = getNode(fateId); - return new Pair<>(node.status, node.fateKey); + public Optional> tryReserve(FateId fateId) { + // uniquely identify this attempt to reserve the fate operation data + FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); + + try { + byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { + NodeValue currNodeVal = new NodeValue(currSerNodeVal); + // The uuid handles the case where there was a ZK server fault and the write for this thread + // went through but that was not acknowledged, and we are reading our own write for 2nd + // time. + if (!currNodeVal.isReserved() || currNodeVal.isReservedBy(reservation)) { + FateKey currFateKey = currNodeVal.fateKey.orElse(null); + // Add the FateReservation to the node to reserve + return new NodeValue(currNodeVal.status, reservation, currFateKey).serialize(); + } else { + // This will not change the value to null but will return null + return null; + } + }); + if (newSerNodeVal != null) { + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } else { + return Optional.empty(); + } + } catch (KeeperException.NoNodeException e) { + log.trace("Tried to reserve a transaction {} that does not exist", fateId); + return Optional.empty(); + } catch (InterruptedException | KeeperException | AcceptableThriftTableOperationException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void deleteDeadReservations() { + for (Map.Entry entry : getActiveReservations().entrySet()) { + FateId fateId = entry.getKey(); + FateReservation reservation = entry.getValue(); + if (isLockHeld.test(reservation.getLockID())) { + continue; + } + try { + zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { + NodeValue currNodeVal = new NodeValue(currSerNodeVal); + // Make sure the current node is still reserved and reserved with the expected reservation + // and it is dead + if (currNodeVal.isReservedBy(reservation) + && !isLockHeld.test(currNodeVal.reservation.orElseThrow().getLockID())) { + // Delete the reservation + log.trace("Deleted the dead reservation {} for fate id {}", reservation, fateId); + return new NodeValue(currNodeVal.status, null, currNodeVal.fateKey.orElse(null)) + .serialize(); + } else { + // No change + return null; + } + }); + } catch (KeeperException.NoNodeException e) { + // the node has since been deleted. Can safely ignore + } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { + throw new RuntimeException(e); + } + } } @Override @@ -126,8 +228,12 @@ public FateInstanceType type() { private class FateTxStoreImpl extends AbstractFateTxStoreImpl { - private FateTxStoreImpl(FateId fateId, boolean isReserved) { - super(fateId, isReserved); + private FateTxStoreImpl(FateId fateId) { + super(fateId); + } + + private FateTxStoreImpl(FateId fateId, FateReservation reservation) { + super(fateId, reservation); } private static final int RETRIES = 10; @@ -177,7 +283,7 @@ private String findTop(String txpath) throws KeeperException, InterruptedExcepti } } - if (max.equals("")) { + if (max.isEmpty()) { return null; } @@ -224,9 +330,22 @@ public void setStatus(TStatus status) { verifyReserved(true); try { - zk.putPersistentData(getTXPath(fateId), new NodeValue(status).serialize(), - NodeExistsPolicy.OVERWRITE); - } catch (KeeperException | InterruptedException e) { + zk.mutateExisting(getTXPath(fateId), currSerializedData -> { + NodeValue currNodeVal = new NodeValue(currSerializedData); + // Ensure the FateId is reserved in ZK, and it is reserved with the expected reservation + if (currNodeVal.isReservedBy(this.reservation)) { + FateReservation currFateReservation = currNodeVal.reservation.orElseThrow(); + FateKey currFateKey = currNodeVal.fateKey.orElse(null); + NodeValue newNodeValue = new NodeValue(status, currFateReservation, currFateKey); + return newNodeValue.serialize(); + } else { + throw new IllegalStateException("Either the FateId " + fateId + + " is not reserved in ZK, or it is but the reservation in ZK: " + + currNodeVal.reservation.orElse(null) + " differs from that in the store: " + + this.reservation); + } + }); + } catch (KeeperException | InterruptedException | AcceptableThriftTableOperationException e) { throw new IllegalStateException(e); } @@ -239,6 +358,7 @@ public void delete() { try { zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP); + this.deleted = true; } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -315,6 +435,36 @@ public List> getStack() { return dops; } } + + @Override + protected void unreserve() { + try { + if (!this.deleted) { + zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> { + NodeValue currNodeVal = new NodeValue(currSerNodeVal); + FateKey currFateKey = currNodeVal.fateKey.orElse(null); + if (currNodeVal.isReservedBy(this.reservation)) { + // Remove the FateReservation from the NodeValue to unreserve + return new NodeValue(currNodeVal.status, null, currFateKey).serialize(); + } else { + // possible this is running a 2nd time in zk server fault conditions and its first + // write went through + if (!currNodeVal.isReserved()) { + log.trace("The FATE reservation for fate id {} does not exist in ZK", fateId); + } else if (!currNodeVal.reservation.orElseThrow().equals(this.reservation)) { + log.debug( + "The FATE reservation for fate id {} in ZK differs from that in the store", + fateId); + } + return null; + } + }); + } + this.reservation = null; + } catch (InterruptedException | KeeperException | AcceptableThriftTableOperationException e) { + throw new IllegalStateException(e); + } + } } private Serializable getTransactionInfo(TxInfo txInfo, FateId fateId) { @@ -341,20 +491,15 @@ private NodeValue getNode(FateId fateId) { try { return new NodeValue(zk.getData(getTXPath(fateId))); } catch (NoNodeException nne) { - return new NodeValue(TStatus.UNKNOWN); + return new NodeValue(TStatus.UNKNOWN, null); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } } @Override - protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { - return new FateTxStoreImpl(fateId, isReserved); - } - - @Override - protected FateInstanceType getInstanceType() { - return fateInstanceType; + protected FateTxStore newUnreservedFateTxStore(FateId fateId) { + return new FateTxStoreImpl(fateId); } @Override @@ -363,13 +508,19 @@ protected Stream getTransactions(Set statuses) { Stream stream = zk.getChildren(path).stream().map(strTxid -> { String txUUIDStr = strTxid.split("_")[1]; FateId fateId = FateId.from(fateInstanceType, txUUIDStr); - // Memoizing for two reasons. First the status may never be requested, so in that case avoid - // the lookup. Second, if its requested multiple times the result will always be consistent. - Supplier statusSupplier = Suppliers.memoize(() -> _getStatus(fateId)); + // Memoizing for two reasons. First the status or reservation may never be requested, so + // in that case avoid the lookup. Second, if it's requested multiple times the result will + // always be consistent. + Supplier nodeSupplier = Suppliers.memoize(() -> getNode(fateId)); return new FateIdStatusBase(fateId) { @Override public TStatus getStatus() { - return statusSupplier.get(); + return nodeSupplier.get().status; + } + + @Override + public Optional getFateReservation() { + return nodeSupplier.get().reservation; } }; }); @@ -393,42 +544,63 @@ public Stream list(FateKey.FateKeyType type) { protected static class NodeValue { final TStatus status; final Optional fateKey; + final Optional reservation; - private NodeValue(byte[] serialized) { + private NodeValue(byte[] serializedData) { try (DataInputBuffer buffer = new DataInputBuffer()) { - buffer.reset(serialized, serialized.length); + buffer.reset(serializedData, serializedData.length); this.status = TStatus.valueOf(buffer.readUTF()); + this.reservation = deserializeFateReservation(buffer); this.fateKey = deserializeFateKey(buffer); } catch (IOException e) { throw new UncheckedIOException(e); } } - private NodeValue(TStatus status) { - this(status, null); + private NodeValue(TStatus status, FateReservation reservation) { + this(status, reservation, null); } - private NodeValue(TStatus status, FateKey fateKey) { + private NodeValue(TStatus status, FateReservation reservation, FateKey fateKey) { this.status = Objects.requireNonNull(status); + this.reservation = Optional.ofNullable(reservation); this.fateKey = Optional.ofNullable(fateKey); } private Optional deserializeFateKey(DataInputBuffer buffer) throws IOException { int length = buffer.readInt(); + Preconditions.checkArgument(length >= 0); if (length > 0) { return Optional.of(FateKey.deserialize(buffer.readNBytes(length))); } return Optional.empty(); } + private Optional deserializeFateReservation(DataInputBuffer buffer) + throws IOException { + int length = buffer.readInt(); + Preconditions.checkArgument(length >= 0); + if (length > 0) { + return Optional.of(FateReservation.deserialize(buffer.readNBytes(length))); + } + return Optional.empty(); + } + byte[] serialize() { try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos)) { dos.writeUTF(status.name()); + if (isReserved()) { + byte[] serializedFateReservation = reservation.orElseThrow().getSerialized(); + dos.writeInt(serializedFateReservation.length); + dos.write(serializedFateReservation); + } else { + dos.writeInt(0); + } if (fateKey.isPresent()) { - byte[] serialized = fateKey.orElseThrow().getSerialized(); - dos.writeInt(serialized.length); - dos.write(serialized); + byte[] serializedFateKey = fateKey.orElseThrow().getSerialized(); + dos.writeInt(serializedFateKey.length); + dos.write(serializedFateKey); } else { dos.writeInt(0); } @@ -439,5 +611,12 @@ byte[] serialize() { } } + public boolean isReserved() { + return reservation.isPresent(); + } + + public boolean isReservedBy(FateReservation reservation) { + return isReserved() && this.reservation.orElseThrow().equals(reservation); + } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index 1fd9cac06be..bbb08f1b4bf 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,8 +30,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.accumulo.core.util.Pair; - /** * Read only access to a Transaction Store. * @@ -98,8 +97,6 @@ interface ReadOnlyFateTxStore { Optional getKey(); - Pair> getStatusAndKey(); - /** * Wait for the status of a transaction to change * @@ -134,6 +131,8 @@ interface ReadOnlyFateTxStore { interface FateIdStatus { FateId getFateId(); + Optional getFateReservation(); + TStatus getStatus(); } @@ -156,6 +155,12 @@ interface FateIdStatus { */ Stream list(FateKey.FateKeyType type); + /** + * @return a map of the current active reservations with the keys being the transaction that is + * reserved and the value being the value stored to indicate the transaction is reserved. + */ + Map getActiveReservations(); + /** * Finds all fate ops that are (IN_PROGRESS, SUBMITTED, or FAILED_IN_PROGRESS) and unreserved. Ids * that are found are passed to the consumer. This method will block until at least one runnable diff --git a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java index ac5147d4a97..bf0a35721c7 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java @@ -25,7 +25,6 @@ import java.util.Optional; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; -import org.apache.accumulo.core.util.Pair; public class WrappedFateTxStore implements FateStore.FateTxStore { protected final FateStore.FateTxStore wrapped; @@ -64,11 +63,6 @@ public Optional getKey() { return wrapped.getKey(); } - @Override - public Pair> getStatusAndKey() { - return wrapped.getStatusAndKey(); - } - @Override public void setStatus(FateStore.TStatus status) { wrapped.setStatus(status); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index b57ffae68f1..8c39e897000 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -20,8 +20,10 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.user.schema.FateSchema; public interface FateMutator { @@ -31,6 +33,42 @@ public interface FateMutator { FateMutator putCreateTime(long ctime); + /** + * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will + * put the reservation if there is not already a reservation present + * + * @param reservation the reservation to attempt to put + * @return the FateMutator with this added mutation + */ + FateMutator putReservedTx(FateStore.FateReservation reservation); + + /** + * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will + * put the reservation if the column doesn't exist yet. This should only be used for + * {@link UserFateStore#createAndReserve(FateKey)} + * + * @param reservation the reservation to attempt to put + * @return the FateMutator with this added mutation + */ + FateMutator putReservedTxOnCreation(FateStore.FateReservation reservation); + + /** + * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will + * remove the given reservation if it matches what is present in the column. + * + * @param reservation the reservation to attempt to remove + * @return the FateMutator with this added mutation + */ + FateMutator putUnreserveTx(FateStore.FateReservation reservation); + + /** + * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will + * put the initial column value if it has not already been set yet + * + * @return the FateMutator with this added mutation + */ + FateMutator putInitReservationVal(); + FateMutator putName(byte[] data); FateMutator putAutoClean(byte[] data); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index c794fdc3d62..fcb0e4f1f16 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.core.fate.user; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.fate.AbstractFateStore.serialize; import static org.apache.accumulo.core.fate.user.UserFateStore.getRow; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; @@ -39,6 +40,7 @@ import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; @@ -49,6 +51,8 @@ public class FateMutatorImpl implements FateMutator { + public static final byte[] NOT_RESERVED = "".getBytes(UTF_8); + private final ClientContext context; private final String tableName; private final FateId fateId; @@ -79,6 +83,43 @@ public FateMutator putCreateTime(long ctime) { return this; } + @Override + public FateMutator putReservedTx(FateStore.FateReservation reservation) { + Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED); + mutation.addCondition(condition); + TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized())); + return this; + } + + @Override + public FateMutator putReservedTxOnCreation(FateStore.FateReservation reservation) { + Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + mutation.addCondition(condition); + TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized())); + return this; + } + + @Override + public FateMutator putUnreserveTx(FateStore.FateReservation reservation) { + Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()) + .setValue(reservation.getSerialized()); + mutation.addCondition(condition); + TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED)); + return this; + } + + @Override + public FateMutator putInitReservationVal() { + Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + mutation.addCondition(condition); + TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED)); + return this; + } + @Override public FateMutator putName(byte[] data) { TxInfoColumnFamily.TX_NAME_COLUMN.put(mutation, new Value(data)); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateStatusFilter.java b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java similarity index 75% rename from core/src/main/java/org/apache/accumulo/core/fate/user/FateStatusFilter.java rename to core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java index e586a646c38..61ac19eb4b0 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateStatusFilter.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/RowFateStatusFilter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -31,11 +32,13 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.ReadOnlyFateStore; -import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.fate.user.schema.FateSchema; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.hadoop.io.Text; -public class FateStatusFilter extends Filter { +public class RowFateStatusFilter extends WholeRowIterator { private EnumSet valuesToAccept; @@ -53,9 +56,15 @@ public void init(SortedKeyValueIterator source, Map op } @Override - public boolean accept(Key k, Value v) { - var tstatus = ReadOnlyFateStore.TStatus.valueOf(v.toString()); - return valuesToAccept.contains(tstatus); + protected boolean filter(Text currentRow, List keys, List values) { + for (int i = 0; i < keys.size(); i++) { + Key key = keys.get(i); + if (FateSchema.TxColumnFamily.STATUS_COLUMN.hasColumns(key) + && valuesToAccept.contains(ReadOnlyFateStore.TStatus.valueOf(values.get(i).toString()))) { + return true; + } + } + return false; } public static void configureScanner(ScannerBase scanner, @@ -63,9 +72,11 @@ public static void configureScanner(ScannerBase scanner, // only filter when getting a subset of statuses if (!statuses.equals(ALL_STATUSES)) { String statusesStr = statuses.stream().map(Enum::name).collect(Collectors.joining(",")); - var iterSettings = new IteratorSetting(100, "statuses", FateStatusFilter.class); + var iterSettings = new IteratorSetting(100, "statuses", RowFateStatusFilter.class); iterSettings.addOption("statuses", statusesStr); scanner.addScanIterator(iterSettings); + } else { + scanner.addScanIterator(new IteratorSetting(100, WholeRowIterator.class)); } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 7fdcfd3aa0e..1d45c170f61 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -18,14 +18,18 @@ */ package org.apache.accumulo.core.fate.user; +import java.io.IOException; import java.io.Serializable; +import java.time.Duration; import java.util.List; import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.SortedMap; import java.util.UUID; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -46,10 +50,11 @@ import org.apache.accumulo.core.fate.user.schema.FateSchema.RepoColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -70,20 +75,22 @@ public class UserFateStore extends AbstractFateStore { private static final com.google.common.collect.Range REPO_RANGE = com.google.common.collect.Range.closed(1, maxRepos); - public UserFateStore(ClientContext context, String tableName) { - this(context, tableName, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); + public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, + Predicate isLockHeld) { + this(context, tableName, lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } @VisibleForTesting - public UserFateStore(ClientContext context, String tableName, int maxDeferred, - FateIdGenerator fateIdGenerator) { - super(maxDeferred, fateIdGenerator); + public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID, + Predicate isLockHeld, int maxDeferred, FateIdGenerator fateIdGenerator) { + super(lockID, isLockHeld, maxDeferred, fateIdGenerator); this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); } - public UserFateStore(ClientContext context) { - this(context, AccumuloTable.FATE.tableName()); + public UserFateStore(ClientContext context, ZooUtil.LockID lockID, + Predicate isLockHeld) { + this(context, AccumuloTable.FATE.tableName(), lockID, isLockHeld); } @Override @@ -100,7 +107,7 @@ public FateId create() { } var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW) - .putCreateTime(System.currentTimeMillis()).tryMutate(); + .putCreateTime(System.currentTimeMillis()).putInitReservationVal().tryMutate(); switch (status) { case ACCEPTED: @@ -120,35 +127,162 @@ public FateId getFateId() { } @Override - protected void create(FateId fateId, FateKey fateKey) { - final int maxAttempts = 5; - + public Optional> createAndReserve(FateKey fateKey) { + final var reservation = FateReservation.from(lockID, UUID.randomUUID()); + final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey); + Optional> txStore = Optional.empty(); + int maxAttempts = 5; + FateMutator.Status status = null; + + // Only need to retry if it is UNKNOWN for (int attempt = 0; attempt < maxAttempts; attempt++) { - - if (attempt >= 1) { - log.debug("Failed to create transaction with fateId {} and fateKey {}, trying again", - fateId, fateKey); - UtilWaitThread.sleep(100); + status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey) + .putReservedTxOnCreation(reservation).putCreateTime(System.currentTimeMillis()) + .tryMutate(); + if (status != FateMutator.Status.UNKNOWN) { + break; } + UtilWaitThread.sleep(100); + } - var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey) - .putCreateTime(System.currentTimeMillis()).tryMutate(); + switch (status) { + case ACCEPTED: + txStore = Optional.of(new FateTxStoreImpl(fateId, reservation)); + break; + case REJECTED: + // If the status is REJECTED, we need to check what about the mutation was REJECTED: + // 1) Possible something like the following occurred: + // the first attempt was UNKNOWN but written, the next attempt would be rejected + // We return the FateTxStore in this case. + // 2) If there is a collision with existing fate id, throw error + // 3) If the fate id is already reserved, return an empty optional + // 4) If the fate id is still NEW/unseeded and unreserved, we can try to reserve it + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(getRow(fateId)); + scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), + TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); + scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(), + TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier()); + scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + TStatus statusSeen = TStatus.UNKNOWN; + Optional fateKeySeen = Optional.empty(); + Optional reservationSeen = Optional.empty(); + + for (Entry entry : scanner) { + Text colf = entry.getKey().getColumnFamily(); + Text colq = entry.getKey().getColumnQualifier(); + Value val = entry.getValue(); + + switch (colq.toString()) { + case TxColumnFamily.STATUS: + statusSeen = TStatus.valueOf(val.toString()); + break; + case TxColumnFamily.TX_KEY: + fateKeySeen = Optional.of(FateKey.deserialize(val.get())); + break; + case TxColumnFamily.RESERVATION: + if (FateReservation.isFateReservation(val.get())) { + reservationSeen = Optional.of(FateReservation.deserialize(val.get())); + } + break; + default: + throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq); + } + } - switch (status) { - case ACCEPTED: - return; - case UNKNOWN: - continue; - case REJECTED: - throw new IllegalStateException("Attempt to create transaction with fateId " + fateId - + " and fateKey " + fateKey + " was rejected"); - default: - throw new IllegalStateException("Unknown status " + status); + if (statusSeen == TStatus.NEW) { + verifyFateKey(fateId, fateKeySeen, fateKey); + // This will be the case if the mutation status is REJECTED but the mutation was written + if (reservationSeen.isPresent() && reservationSeen.orElseThrow().equals(reservation)) { + txStore = Optional.of(new FateTxStoreImpl(fateId, reservation)); + } else if (reservationSeen.isEmpty()) { + // NEW/unseeded transaction and not reserved, so we can allow it to be reserved + // we tryReserve() since another thread may have reserved it since the scan + txStore = tryReserve(fateId); + // the status was known before reserving to be NEW, + // however it could change so check after reserving to avoid race conditions. + var statusAfterReserve = + txStore.map(ReadOnlyFateTxStore::getStatus).orElse(TStatus.UNKNOWN); + if (statusAfterReserve != TStatus.NEW) { + txStore.ifPresent(txs -> txs.unreserve(Duration.ZERO)); + txStore = Optional.empty(); + } + } + } else { + log.trace( + "fate id {} tstatus {} fate key {} is reserved {} " + + "has already been seeded with work (non-NEW status)", + fateId, statusSeen, fateKeySeen.orElse(null), reservationSeen.isPresent()); + } + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + break; + default: + throw new IllegalStateException("Unknown or unexpected status " + status); + } + + return txStore; + } + + @Override + public Optional> tryReserve(FateId fateId) { + // Create a unique FateReservation for this reservation attempt + FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); + + FateMutator.Status status = newMutator(fateId).putReservedTx(reservation).tryMutate(); + if (status.equals(FateMutator.Status.ACCEPTED)) { + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } else if (status.equals(FateMutator.Status.UNKNOWN)) { + // If the status is UNKNOWN, this means an error occurred after the mutation was + // sent to the TabletServer, and it is unknown if the mutation was written. We + // need to check if the mutation was written and if it was written by this + // attempt at reservation. If it was written by this reservation attempt, + // we can return the FateTxStore since it was successfully reserved in this + // attempt, otherwise we return empty (was written by another reservation + // attempt or was not written at all). + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(getRow(fateId)); + scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), + TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); + FateReservation persistedRes = scanner.stream() + .filter(entry -> FateReservation.isFateReservation(entry.getValue().get())) + .map(entry -> FateReservation.deserialize(entry.getValue().get())).findFirst() + .orElse(null); + if (persistedRes != null && persistedRes.equals(reservation)) { + return Optional.of(new FateTxStoreImpl(fateId, reservation)); + } + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); } } + return Optional.empty(); + } - throw new IllegalStateException("Failed to create transaction with fateId " + fateId - + " and fateKey " + fateKey + " after " + maxAttempts + " attempts"); + @Override + public void deleteDeadReservations() { + for (Entry activeRes : getActiveReservations().entrySet()) { + FateId fateId = activeRes.getKey(); + FateReservation reservation = activeRes.getValue(); + if (!isLockHeld.test(reservation.getLockID())) { + var status = newMutator(fateId).putUnreserveTx(reservation).tryMutate(); + if (status == FateMutator.Status.ACCEPTED) { + // Technically, this should also be logged for the case where the mutation status + // is UNKNOWN, but the mutation was actually written (fate id was unreserved) + // but there is no way to tell if it was unreserved from this mutation or another + // thread simply unreserving the transaction + log.trace("Deleted the dead reservation {} for fate id {}", reservation, fateId); + } + // No need to verify the status... If it is ACCEPTED, we have successfully unreserved + // the dead transaction. If it is REJECTED, the reservation has changed (i.e., + // has been unreserved so no need to do anything, or has been unreserved and reserved + // again in which case we don't want to change it). If it is UNKNOWN, the mutation + // may or may not have been written. If it was written, we have successfully unreserved + // the dead transaction. If it was not written, the next cycle/call to + // deleteDeadReservations() will try again. + } + } } @Override @@ -156,15 +290,53 @@ protected Stream getTransactions(Set statuses) { try { Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY); scanner.setRange(new Range()); - FateStatusFilter.configureScanner(scanner, statuses); + RowFateStatusFilter.configureScanner(scanner, statuses); TxColumnFamily.STATUS_COLUMN.fetch(scanner); + TxColumnFamily.RESERVATION_COLUMN.fetch(scanner); return scanner.stream().onClose(scanner::close).map(e -> { String txUUIDStr = e.getKey().getRow().toString(); FateId fateId = FateId.from(fateInstanceType, txUUIDStr); + SortedMap rowMap; + TStatus status = TStatus.UNKNOWN; + FateReservation reservation = null; + try { + rowMap = WholeRowIterator.decodeRow(e.getKey(), e.getValue()); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + // expect status and optionally reservation + Preconditions.checkState(rowMap.size() == 1 || rowMap.size() == 2, + "Invalid row seen: %s. Expected to see one entry for the status and optionally an " + + "entry for the fate reservation", + rowMap); + for (Entry entry : rowMap.entrySet()) { + Text colf = entry.getKey().getColumnFamily(); + Text colq = entry.getKey().getColumnQualifier(); + Value val = entry.getValue(); + switch (colq.toString()) { + case TxColumnFamily.STATUS: + status = TStatus.valueOf(val.toString()); + break; + case TxColumnFamily.RESERVATION: + if (FateReservation.isFateReservation(val.get())) { + reservation = FateReservation.deserialize(val.get()); + } + break; + default: + throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq); + } + } + final TStatus finalStatus = status; + final Optional finalReservation = Optional.ofNullable(reservation); return new FateIdStatusBase(fateId) { @Override public TStatus getStatus() { - return TStatus.valueOf(e.getValue().toString()); + return finalStatus; + } + + @Override + public Optional getFateReservation() { + return finalReservation; } }; }); @@ -207,42 +379,8 @@ protected Optional getKey(FateId fateId) { } @Override - protected Pair> getStatusAndKey(FateId fateId) { - return scanTx(scanner -> { - scanner.setRange(getRow(fateId)); - TxColumnFamily.STATUS_COLUMN.fetch(scanner); - TxColumnFamily.TX_KEY_COLUMN.fetch(scanner); - - TStatus status = null; - FateKey key = null; - - for (Entry entry : scanner) { - final String qual = entry.getKey().getColumnQualifierData().toString(); - switch (qual) { - case TxColumnFamily.STATUS: - status = TStatus.valueOf(entry.getValue().toString()); - break; - case TxColumnFamily.TX_KEY: - key = FateKey.deserialize(entry.getValue().get()); - break; - default: - throw new IllegalStateException("Unexpected column qualifier: " + qual); - } - } - - return new Pair<>(Optional.ofNullable(status).orElse(TStatus.UNKNOWN), - Optional.ofNullable(key)); - }); - } - - @Override - protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { - return new FateTxStoreImpl(fateId, isReserved); - } - - @Override - protected FateInstanceType getInstanceType() { - return fateInstanceType; + protected FateTxStore newUnreservedFateTxStore(FateId fateId) { + return new FateTxStoreImpl(fateId); } static Range getRow(FateId fateId) { @@ -272,8 +410,12 @@ public FateInstanceType type() { private class FateTxStoreImpl extends AbstractFateTxStoreImpl { - private FateTxStoreImpl(FateId fateId, boolean isReserved) { - super(fateId, isReserved); + private FateTxStoreImpl(FateId fateId) { + super(fateId); + } + + private FateTxStoreImpl(FateId fateId, FateReservation reservation) { + super(fateId, reservation); } @Override @@ -403,6 +545,7 @@ public void delete() { var mutator = newMutator(fateId); mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED); mutator.delete().mutate(); + this.deleted = true; } private Optional findTop() { @@ -413,6 +556,17 @@ private Optional findTop() { return scanner.stream().map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst(); }); } + + @Override + protected void unreserve() { + if (!deleted) { + FateMutator.Status status; + do { + status = newMutator(fateId).putUnreserveTx(reservation).tryMutate(); + } while (status.equals(FateMutator.Status.UNKNOWN)); + } + reservation = null; + } } static Text invertRepo(int position) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java index b7b7846d189..012e2853ff2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/schema/FateSchema.java @@ -35,6 +35,9 @@ public static class TxColumnFamily { public static final String CREATE_TIME = "ctime"; public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new Text(CREATE_TIME)); + + public static final String RESERVATION = "reservation"; + public static final ColumnFQ RESERVATION_COLUMN = new ColumnFQ(NAME, new Text(RESERVATION)); } public static class TxInfoColumnFamily { diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index f1b86e32e80..5722c617236 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.logging; import java.io.Serializable; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -175,6 +176,16 @@ public Optional> createAndReserve(FateKey fateKey) { } return txStore; } + + @Override + public Map getActiveReservations() { + return store.getActiveReservations(); + } + + @Override + public void deleteDeadReservations() { + store.deleteDeadReservations(); + } }; } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index bfe9251189a..ccefb8ea660 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -60,7 +60,8 @@ public Optional> createAndReserve(FateKey key) { @Override public FateTxStore reserve(FateId fateId) { if (reserved.contains(fateId)) { - throw new IllegalStateException(); // zoo store would wait, but do not expect test to reserve + // other fate stores would wait, but do not expect test to reserve + throw new IllegalStateException(); } // twice... if test change, then change this reserved.add(fateId); @@ -78,6 +79,18 @@ public Optional> tryReserve(FateId fateId) { } } + @Override + public Map getActiveReservations() { + // This method only makes sense for the FateStores that don't store their reservations in memory + throw new UnsupportedOperationException(); + } + + @Override + public void deleteDeadReservations() { + // This method only makes sense for the FateStores that don't store their reservations in memory + throw new UnsupportedOperationException(); + } + private class TestFateTxStore implements FateTxStore { private final FateId fateId; @@ -98,26 +111,28 @@ public List> getStack() { @Override public TStatus getStatus() { - return getStatusAndKey().getFirst(); - } + if (!reserved.contains(fateId)) { + throw new IllegalStateException(); + } - @Override - public Optional getKey() { - return getStatusAndKey().getSecond(); + Pair> status = statuses.get(fateId); + if (status == null) { + return TStatus.UNKNOWN; + } + return status.getFirst(); } @Override - public Pair> getStatusAndKey() { + public Optional getKey() { if (!reserved.contains(fateId)) { throw new IllegalStateException(); } Pair> status = statuses.get(fateId); if (status == null) { - return new Pair<>(TStatus.UNKNOWN, Optional.empty()); + return Optional.empty(); } - - return status; + return status.getSecond(); } @Override @@ -209,6 +224,12 @@ public FateId getFateId() { public TStatus getStatus() { return e.getValue().getFirst(); } + + @Override + public Optional getFateReservation() { + throw new UnsupportedOperationException( + "Only the 'reserved' set should be used for reservations in the test store"); + } }); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 8cef44c7455..1dd789c4d36 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import java.io.BufferedWriter; import java.io.File; @@ -912,8 +913,8 @@ private void executeFateOpsCommand(ServerContext context, FateOpsCommand fateOps var zTableLocksPath = context.getServerPaths().createTableLocksPath(); String fateZkPath = zkRoot + Constants.ZFATE; ZooReaderWriter zk = context.getZooReaderWriter(); - MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk); - UserFateStore ufs = new UserFateStore<>(context); + MetaFateStore mfs = new MetaFateStore<>(fateZkPath, zk, createDummyLockID(), null); + UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); Map> fateStores = Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); Map> readOnlyFateStores = @@ -1106,9 +1107,9 @@ private static long printDanglingFateOperations(ServerContext context, } }; - UserFateStore ufs = new UserFateStore<>(context); + UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); MetaFateStore mfs = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, - context.getZooReaderWriter()); + context.getZooReaderWriter(), createDummyLockID(), null); LoadingCache fateStatusCache = Caffeine.newBuilder() .maximumSize(100_000).expireAfterWrite(10, TimeUnit.SECONDS).build(fateId -> { if (fateId.getType() == FateInstanceType.META) { diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 78f03215005..d8874b0c6d5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -80,6 +80,7 @@ import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.lock.ServiceLock; import org.apache.accumulo.core.lock.ServiceLock.LockLossReason; @@ -1244,10 +1245,13 @@ boolean canSuspendTablets() { throw new IllegalStateException("Upgrade coordinator is unexpectedly not complete"); } try { - var metaInstance = initializeFateInstance(context, - new MetaFateStore<>(getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter())); - var userInstance = initializeFateInstance(context, - new UserFateStore<>(context, AccumuloTable.FATE.tableName())); + Predicate isLockHeld = + lock -> ServiceLock.isLockHeld(context.getZooCache(), lock); + var metaInstance = + initializeFateInstance(context, new MetaFateStore<>(getZooKeeperRoot() + Constants.ZFATE, + context.getZooReaderWriter(), managerLock.getLockID(), isLockHeld)); + var userInstance = initializeFateInstance(context, new UserFateStore<>(context, + AccumuloTable.FATE.tableName(), managerLock.getLockID(), isLockHeld)); if (!fateRefs.compareAndSet(null, Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userInstance))) { @@ -1358,7 +1362,7 @@ boolean canSuspendTablets() { protected Fate initializeFateInstance(ServerContext context, FateStore store) { final Fate fateInstance = - new Fate<>(this, store, TraceRepo::toLogString, getConfiguration()); + new Fate<>(this, store, true, TraceRepo::toLogString, getConfiguration()); var fateCleaner = new FateCleaner<>(store, Duration.ofHours(8), this::getSteadyTime); ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor() diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java index 550ee7967ea..1087cf1b9be 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.manager.metrics.fate.FateMetrics; @@ -63,7 +64,8 @@ public void registerMetrics(MeterRegistry registry) { @Override protected ReadOnlyFateStore> buildStore(ServerContext context) { try { - return new MetaFateStore<>(getFateRootPath(context), context.getZooReaderWriter()); + return new MetaFateStore<>(getFateRootPath(context), context.getZooReaderWriter(), + AbstractFateStore.createDummyLockID(), null); } catch (KeeperException ex) { throw new IllegalStateException( "FATE Metrics - Failed to create zoo store - metrics unavailable", ex); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java index b26206582de..92ac8568810 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/user/UserFateMetrics.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.manager.metrics.fate.user; +import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.manager.metrics.fate.FateMetrics; @@ -31,7 +32,7 @@ public UserFateMetrics(ServerContext context, long minimumRefreshDelay) { @Override protected ReadOnlyFateStore> buildStore(ServerContext context) { - return new UserFateStore<>(context); + return new UserFateStore<>(context, AbstractFateStore.createDummyLockID(), null); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 55df5e35547..314212693a3 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.compaction; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP1; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP2; import static org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.GROUP3; @@ -232,8 +233,8 @@ public void testExternalCompaction() throws Exception { @Test public void testCompactionCommitAndDeadDetectionRoot() throws Exception { var ctx = getCluster().getServerContext(); - FateStore metaFateStore = - new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); + FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, + ctx.getZooReaderWriter(), createDummyLockID(), null); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName()); @@ -251,8 +252,8 @@ public void testCompactionCommitAndDeadDetectionRoot() throws Exception { @Test public void testCompactionCommitAndDeadDetectionMeta() throws Exception { var ctx = getCluster().getServerContext(); - FateStore metaFateStore = - new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); + FateStore metaFateStore = new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, + ctx.getZooReaderWriter(), createDummyLockID(), null); try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { // Metadata table by default already has 2 tablets @@ -274,7 +275,7 @@ public void testCompactionCommitAndDeadDetectionUser() throws Exception { final String tableName = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx); + UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID(), null); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); c.tableOperations().create(tableName, new NewTableConfiguration().withSplits(splits)); @@ -297,9 +298,10 @@ public void testCompactionCommitAndDeadDetectionAll() throws Exception { final String userTable = getUniqueNames(1)[0]; try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { - UserFateStore userFateStore = new UserFateStore<>(ctx); + UserFateStore userFateStore = new UserFateStore<>(ctx, createDummyLockID(), null); FateStore metaFateStore = - new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter()); + new MetaFateStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, ctx.getZooReaderWriter(), + createDummyLockID(), null); SortedSet splits = new TreeSet<>(); splits.add(new Text(row(MAX_DATA / 2))); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index 00847ad5f78..bd7c4a2395b 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -492,7 +492,7 @@ protected Fate initializeFate(FateStore store) { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - return new Fate<>(new TestEnv(), store, r -> r + "", config); + return new Fate<>(new TestEnv(), store, false, r -> r + "", config); } protected abstract TStatus getTxStatus(ServerContext sctx, FateId fateId); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java index 7c46f8a75ea..b4bf4146829 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateInterleavingIT.java @@ -189,7 +189,7 @@ protected Fate initializeFate(AccumuloClient client, FateStore(new FilTestEnv(client), store, r -> r + "", config); + return new Fate<>(new FilTestEnv(client), store, false, r -> r + "", config); } private static Entry toIdStep(Entry e) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index 04bff32cd24..8b52f88f97f 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.util.compaction.ExternalCompactionUtil.getCompactorAddrs; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; @@ -37,9 +38,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -60,6 +63,7 @@ import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.IteratorUtil; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; @@ -622,14 +626,16 @@ protected void testFatePrintAndSummaryCommandsWithInProgressTxns(FateStore getFateReservation() { + return Optional.empty(); + } }; } @@ -761,7 +773,7 @@ private Fate initializeFate(FateStore store) { ConfigurationCopy config = new ConfigurationCopy(); config.set(Property.GENERAL_THREADPOOL_SIZE, "2"); config.set(Property.MANAGER_FATE_THREADPOOL_SIZE, "1"); - return new Fate<>(new TestEnv(), store, Object::toString, config); + return new Fate<>(new TestEnv(), store, false, Object::toString, config); } private boolean wordIsTStatus(String word) { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java index 2ffdb59031c..938025435fb 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java @@ -28,7 +28,6 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.lang.reflect.Method; import java.time.Duration; import java.util.HashMap; import java.util.HashSet; @@ -55,7 +54,6 @@ import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.StackOverflowException; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT.TestRepo; @@ -64,23 +62,10 @@ import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; -import com.google.common.base.Throwables; import com.google.common.collect.Sets; public abstract class FateStoreIT extends SharedMiniClusterBase implements FateTestRunner { - private static final Method fsCreateByKeyMethod; - - static { - try { - // Private method, need to capture for testing - fsCreateByKeyMethod = AbstractFateStore.class.getDeclaredMethod("create", FateKey.class); - fsCreateByKeyMethod.setAccessible(true); - } catch (NoSuchMethodException e) { - throw new RuntimeException(e); - } - } - @Override protected Duration defaultTimeout() { return Duration.ofMinutes(1); @@ -376,7 +361,7 @@ protected void testCreateWithKeyInProgress(FateStore store, ServerConte // We have an existing transaction with the same key in progress // so should return an empty Optional - assertTrue(create(store, fateKey).isEmpty()); + assertTrue(store.createAndReserve(fateKey).isEmpty()); assertEquals(TStatus.IN_PROGRESS, txStore.getStatus()); } finally { txStore.setStatus(TStatus.SUCCESSFUL); @@ -415,9 +400,10 @@ protected void testCreateWithKeyCollision(FateStore store, ServerContex FateTxStore txStore = store.createAndReserve(fateKey1).orElseThrow(); try { - var e = assertThrows(IllegalStateException.class, () -> create(store, fateKey2)); + var e = assertThrows(IllegalStateException.class, () -> store.createAndReserve(fateKey2)); assertEquals( - "Collision detected for tid " + UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8)), + "Collision detected for fate id " + + FateId.from(store.type(), UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8))), e.getMessage()); assertEquals(fateKey1, txStore.getKey().orElseThrow()); } finally { @@ -438,19 +424,18 @@ protected void testCollisionWithRandomFateId(FateStore store, ServerCon new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); FateKey fateKey = FateKey.forSplit(ke); - FateId fateId = create(store, fateKey).orElseThrow(); + FateTxStore txStore = store.createAndReserve(fateKey).orElseThrow(); + FateId fateId = txStore.getID(); - // After create a fate transaction using a key we can simulate a collision with - // a random FateId by deleting the key out of Fate and calling create again to verify - // it detects the key is missing. Then we can continue and see if we can still reserve - // and use the existing transaction, which we should. + // After createAndReserve a fate transaction using a key we can simulate a collision with + // a random FateId by deleting the key out of Fate and calling createAndReserve again to + // verify it detects the key is missing. Then we can continue and see if we can still use + // the existing transaction. deleteKey(fateId, sctx); var e = assertThrows(IllegalStateException.class, () -> store.createAndReserve(fateKey)); - assertEquals("Tx Key is missing from tid " + fateId.getTxUUIDStr(), e.getMessage()); + assertEquals("fate key is missing from fate id " + fateId, e.getMessage()); - // We should still be able to reserve and continue when not using a key - // just like a normal transaction - FateTxStore txStore = store.reserve(fateId); + // We should still be able to use the existing transaction try { assertTrue(txStore.timeCreated() > 0); assertEquals(TStatus.NEW, txStore.getStatus()); @@ -468,11 +453,12 @@ public void testAbsent() throws Exception { protected void testAbsent(FateStore store, ServerContext sctx) { // Ensure both store implementations have consistent behavior when reading a fateId that does - // not exists. + // not exist. var fateId = FateId.from(store.type(), UUID.randomUUID()); var txStore = store.read(fateId); + assertTrue(store.tryReserve(fateId).isEmpty()); assertEquals(TStatus.UNKNOWN, txStore.getStatus()); assertNull(txStore.top()); assertNull(txStore.getTransactionInfo(TxInfo.TX_NAME)); @@ -480,7 +466,6 @@ protected void testAbsent(FateStore store, ServerContext sctx) { assertEquals(Optional.empty(), txStore.getKey()); assertEquals(fateId, txStore.getID()); assertEquals(List.of(), txStore.getStack()); - assertEquals(new Pair<>(TStatus.UNKNOWN, Optional.empty()), txStore.getStatusAndKey()); } @Test @@ -547,17 +532,6 @@ protected void testListFateKeys(FateStore store, ServerContext sctx) th .forEach(fateIdStatus -> store.tryReserve(fateIdStatus.getFateId()).orElseThrow().delete()); } - // create(fateKey) method is private so expose for testing to check error states - @SuppressWarnings("unchecked") - protected Optional create(FateStore store, FateKey fateKey) throws Exception { - try { - return (Optional) fsCreateByKeyMethod.invoke(store, fateKey); - } catch (Exception e) { - Exception rootCause = (Exception) Throwables.getRootCause(e); - throw rootCause; - } - } - protected abstract void deleteKey(FateId fateId, ServerContext sctx); private static class TestOperation2 extends TestRepo { diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java index 739b0cf0110..a5e75416b60 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FlakyFate.java @@ -35,7 +35,7 @@ public class FlakyFate extends Fate { public FlakyFate(T environment, FateStore store, Function,String> toLogStrFunc, AccumuloConfiguration conf) { - super(environment, store, toLogStrFunc, conf); + super(environment, store, false, toLogStrFunc, conf); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java new file mode 100644 index 00000000000..57070bacde1 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.DefaultConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.MetaFateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; + +public class MultipleStoresIT extends SharedMiniClusterBase { + + private static final Logger LOG = LoggerFactory.getLogger(MultipleStoresIT.class); + @TempDir + private static File tempDir; + private static ZooKeeperTestingServer szk = null; + private static ZooReaderWriter zk; + private static final String FATE_DIR = "/fate"; + private ClientContext client; + + @BeforeEach + public void beforeEachSetup() { + client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); + } + + @AfterEach + public void afterEachTeardown() { + client.close(); + } + + @BeforeAll + public static void beforeAllSetup() throws Exception { + SharedMiniClusterBase.startMiniCluster(); + szk = new ZooKeeperTestingServer(tempDir); + zk = szk.getZooReaderWriter(); + } + + @AfterAll + public static void afterAllTeardown() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + szk.close(); + } + + @Test + public void testReserveUnreserve() throws Exception { + testReserveUnreserve(FateInstanceType.META); + testReserveUnreserve(FateInstanceType.USER); + } + + private void testReserveUnreserve(FateInstanceType storeType) throws Exception { + // reserving/unreserving a FateId should be reflected across instances of the stores + final String tableName = getUniqueNames(1)[0]; + final int numFateIds = 500; + final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID()); + final List> reservations = new ArrayList<>(); + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final FateStore store1, store2; + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + Map activeReservations; + + if (isUserStore) { + createFateTable(client, tableName); + store1 = new UserFateStore<>(client, tableName, lock1, null); + store2 = new UserFateStore<>(client, tableName, lock2, null); + } else { + store1 = new MetaFateStore<>(FATE_DIR, zk, lock1, null); + store2 = new MetaFateStore<>(FATE_DIR, zk, lock2, null); + } + + // Create the fate ids using store1 + for (int i = 0; i < numFateIds; i++) { + assertTrue(allIds.add(store1.create())); + } + assertEquals(numFateIds, allIds.size()); + + // Reserve half the fate ids using store1 and rest using store2, after reserving a fate id in + // one, should not be able to reserve the same in the other. Should also not matter that all the + // ids were created using store1 + int count = 0; + for (FateId fateId : allIds) { + if (count % 2 == 0) { + reservations.add(store1.reserve(fateId)); + assertTrue(store2.tryReserve(fateId).isEmpty()); + } else { + reservations.add(store2.reserve(fateId)); + assertTrue(store1.tryReserve(fateId).isEmpty()); + } + count++; + } + // Try to reserve a non-existent fate id + assertTrue(store1.tryReserve(fakeFateId).isEmpty()); + assertTrue(store2.tryReserve(fakeFateId).isEmpty()); + // Both stores should return the same reserved transactions + activeReservations = store1.getActiveReservations(); + assertEquals(allIds, activeReservations.keySet()); + activeReservations = store2.getActiveReservations(); + assertEquals(allIds, activeReservations.keySet()); + + // Test setting/getting the TStatus and unreserving the transactions + for (int i = 0; i < allIds.size(); i++) { + var reservation = reservations.get(i); + assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus()); + reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED); + assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, reservation.getStatus()); + reservation.delete(); + reservation.unreserve(Duration.ofMillis(0)); + // Attempt to set a status on a tx that has been unreserved (should throw exception) + assertThrows(IllegalStateException.class, + () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW)); + } + assertTrue(store1.getActiveReservations().isEmpty()); + assertTrue(store2.getActiveReservations().isEmpty()); + } + + @Test + public void testReserveNonExistentTxn() throws Exception { + testReserveNonExistentTxn(FateInstanceType.META); + testReserveNonExistentTxn(FateInstanceType.USER); + } + + private void testReserveNonExistentTxn(FateInstanceType storeType) throws Exception { + // Tests that reserve() doesn't hang indefinitely and instead throws an error + // on reserve() a non-existent transaction. + final FateStore store; + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final String tableName = getUniqueNames(1)[0]; + final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID()); + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); + + if (isUserStore) { + createFateTable(client, tableName); + store = new UserFateStore<>(client, tableName, lock, null); + } else { + store = new MetaFateStore<>(FATE_DIR, zk, lock, null); + } + + var err = assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId)); + assertTrue(err.getMessage().contains(fakeFateId.canonical())); + } + + @Test + public void testReserveReservedAndUnreserveUnreserved() throws Exception { + testReserveReservedAndUnreserveUnreserved(FateInstanceType.META); + testReserveReservedAndUnreserveUnreserved(FateInstanceType.USER); + } + + private void testReserveReservedAndUnreserveUnreserved(FateInstanceType storeType) + throws Exception { + final String tableName = getUniqueNames(1)[0]; + final int numFateIds = 500; + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final List> reservations = new ArrayList<>(); + final FateStore store; + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); + + if (isUserStore) { + createFateTable(client, tableName); + store = new UserFateStore<>(client, tableName, lock, null); + } else { + store = new MetaFateStore<>(FATE_DIR, zk, lock, null); + } + + // Create some FateIds and ensure that they can be reserved + for (int i = 0; i < numFateIds; i++) { + FateId fateId = store.create(); + assertTrue(allIds.add(fateId)); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservations.add(reservation.orElseThrow()); + } + assertEquals(numFateIds, allIds.size()); + + // Try to reserve again, should not reserve + for (FateId fateId : allIds) { + assertTrue(store.tryReserve(fateId).isEmpty()); + } + + // Unreserve all the FateIds + for (var reservation : reservations) { + reservation.unreserve(Duration.ofMillis(0)); + } + // Try to unreserve again (should throw exception) + for (var reservation : reservations) { + assertThrows(IllegalStateException.class, () -> reservation.unreserve(Duration.ofMillis(0))); + } + } + + @Test + public void testReserveAfterUnreserveAndReserveAfterDeleted() throws Exception { + testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.META); + testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.USER); + } + + private void testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType storeType) + throws Exception { + final String tableName = getUniqueNames(1)[0]; + final int numFateIds = 500; + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final List> reservations = new ArrayList<>(); + final FateStore store; + final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); + + if (isUserStore) { + createFateTable(client, tableName); + store = new UserFateStore<>(client, tableName, lock, null); + } else { + store = new MetaFateStore<>(FATE_DIR, zk, lock, null); + } + + // Create some FateIds and ensure that they can be reserved + for (int i = 0; i < numFateIds; i++) { + FateId fateId = store.create(); + assertTrue(allIds.add(fateId)); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservations.add(reservation.orElseThrow()); + } + assertEquals(numFateIds, allIds.size()); + + // Unreserve all + for (var reservation : reservations) { + reservation.unreserve(Duration.ofMillis(0)); + } + + // Ensure they can be reserved again, and delete and unreserve this time + for (FateId fateId : allIds) { + // Verify that the tx status is still NEW after unreserving since it hasn't been deleted + assertEquals(ReadOnlyFateStore.TStatus.NEW, store.read(fateId).getStatus()); + var reservation = store.tryReserve(fateId); + assertFalse(reservation.isEmpty()); + reservation.orElseThrow().delete(); + reservation.orElseThrow().unreserve(Duration.ofMillis(0)); + } + + for (FateId fateId : allIds) { + // Verify that the tx is now unknown since it has been deleted + assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, store.read(fateId).getStatus()); + // Attempt to reserve a deleted txn, should throw an exception and not wait indefinitely + var err = assertThrows(IllegalStateException.class, () -> store.reserve(fateId)); + assertTrue(err.getMessage().contains(fateId.canonical())); + } + } + + @Test + public void testMultipleFateInstances() throws Exception { + testMultipleFateInstances(FateInstanceType.META); + testMultipleFateInstances(FateInstanceType.USER); + } + + private void testMultipleFateInstances(FateInstanceType storeType) throws Exception { + final String tableName = getUniqueNames(1)[0]; + final int numFateIds = 500; + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final FateStore store1, store2; + final SleepingTestEnv testEnv1 = new SleepingTestEnv(50); + final SleepingTestEnv testEnv2 = new SleepingTestEnv(50); + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + final Set liveLocks = new HashSet<>(); + final Predicate isLockHeld = liveLocks::contains; + + if (isUserStore) { + createFateTable(client, tableName); + store1 = new UserFateStore<>(client, tableName, lock1, isLockHeld); + store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld); + } else { + store1 = new MetaFateStore<>(FATE_DIR, zk, lock1, isLockHeld); + store2 = new MetaFateStore<>(FATE_DIR, zk, lock2, isLockHeld); + } + liveLocks.add(lock1); + liveLocks.add(lock2); + + Fate fate1 = + new Fate<>(testEnv1, store1, true, Object::toString, DefaultConfiguration.getInstance()); + Fate fate2 = + new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance()); + + for (int i = 0; i < numFateIds; i++) { + FateId fateId; + // Start half the txns using fate1, and the other half using fate2 + if (i % 2 == 0) { + fateId = fate1.startTransaction(); + fate1.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, "test"); + } else { + fateId = fate2.startTransaction(); + fate2.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, "test"); + } + allIds.add(fateId); + } + assertEquals(numFateIds, allIds.size()); + + // Should be able to wait for completion on any fate instance + for (FateId fateId : allIds) { + fate2.waitForCompletion(fateId); + } + // Ensure that all txns have been executed and have only been executed once + assertTrue(Collections.disjoint(testEnv1.executedOps, testEnv2.executedOps)); + assertEquals(allIds, Sets.union(testEnv1.executedOps, testEnv2.executedOps)); + + fate1.shutdown(1, TimeUnit.MINUTES); + fate2.shutdown(1, TimeUnit.MINUTES); + } + + @Test + public void testDeadReservationsCleanup() throws Exception { + testDeadReservationsCleanup(FateInstanceType.META); + testDeadReservationsCleanup(FateInstanceType.USER); + } + + private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exception { + // Tests reserving some transactions, then simulating that the Manager died by creating + // a new Fate instance and store with a new LockID. The transactions which were + // reserved using the old LockID should be cleaned up by Fate's DeadReservationCleaner, + // then picked up by the new Fate/store. + + final String tableName = getUniqueNames(1)[0]; + // One transaction for each FATE worker thread + final int numFateIds = + Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue()); + final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final Set allIds = new HashSet<>(); + final FateStore store1, store2; + final LatchTestEnv testEnv1 = new LatchTestEnv(); + final LatchTestEnv testEnv2 = new LatchTestEnv(); + final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); + final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52); + final Set liveLocks = new HashSet<>(); + final Predicate isLockHeld = liveLocks::contains; + Map reservations; + + if (isUserStore) { + createFateTable(client, tableName); + store1 = new UserFateStore<>(client, tableName, lock1, isLockHeld); + } else { + store1 = new MetaFateStore<>(FATE_DIR, zk, lock1, isLockHeld); + } + liveLocks.add(lock1); + + Fate fate1 = + new Fate<>(testEnv1, store1, true, Object::toString, DefaultConfiguration.getInstance()); + + // Ensure nothing is reserved yet + assertTrue(store1.getActiveReservations().isEmpty()); + + // Create transactions + for (int i = 0; i < numFateIds; i++) { + FateId fateId; + fateId = fate1.startTransaction(); + fate1.seedTransaction("op" + i, fateId, new LatchTestRepo(), true, "test"); + allIds.add(fateId); + } + assertEquals(numFateIds, allIds.size()); + + // Wait for all the fate worker threads to start working on the transactions + Wait.waitFor(() -> testEnv1.numWorkers.get() == numFateIds); + // Each fate worker will be hung up working (IN_PROGRESS) on a single transaction + + // Verify store1 has the transactions reserved and that they were reserved with lock1 + reservations = store1.getActiveReservations(); + assertEquals(allIds, reservations.keySet()); + reservations.values().forEach( + res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID()))); + + if (isUserStore) { + store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld); + } else { + store2 = new MetaFateStore<>(FATE_DIR, zk, lock2, isLockHeld); + } + + // Verify store2 can see the reserved transactions even though they were reserved using + // store1 + reservations = store2.getActiveReservations(); + assertEquals(allIds, reservations.keySet()); + reservations.values().forEach( + res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID()))); + + // Simulate what would happen if the Manager using the Fate object (fate1) died. + // isLockHeld would return false for the LockId of the Manager that died (in this case, lock1) + // and true for the new Manager's lock (lock2) + liveLocks.remove(lock1); + liveLocks.add(lock2); + + // Create the new Fate/start the Fate threads (the work finder and the workers). + // Don't run another dead reservation cleaner since we already have one running from fate1. + Fate fate2 = + new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance()); + + // Wait for the "dead" reservations to be deleted and picked up again (reserved using + // fate2/store2/lock2 now). + // They are considered "dead" if they are held by lock1 in this test. We don't have to worry + // about fate1/store1/lock1 being used to reserve the transactions again since all + // the workers for fate1 are hung up + Wait.waitFor(() -> { + Map store2Reservations = store2.getActiveReservations(); + boolean allReservedWithLock2 = store2Reservations.values().stream() + .allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2)); + return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; + }, 60_000); + + // Finish work and shutdown + testEnv1.workersLatch.countDown(); + testEnv2.workersLatch.countDown(); + fate1.shutdown(1, TimeUnit.MINUTES); + fate2.shutdown(1, TimeUnit.MINUTES); + } + + public static class SleepingTestRepo implements Repo { + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId fateId, SleepingTestEnv environment) { + return 0; + } + + @Override + public String getName() { + return null; + } + + @Override + public Repo call(FateId fateId, SleepingTestEnv environment) throws Exception { + environment.executedOps.add(fateId); + LOG.debug("Thread " + Thread.currentThread() + " in SleepingTestRepo.call() sleeping for " + + environment.sleepTimeMs + " millis"); + Thread.sleep(environment.sleepTimeMs); // Simulate some work + LOG.debug("Thread " + Thread.currentThread() + " finished SleepingTestRepo.call()"); + return null; + } + + @Override + public void undo(FateId fateId, SleepingTestEnv environment) { + + } + + @Override + public String getReturn() { + return null; + } + } + + public static class SleepingTestEnv { + public final Set executedOps = Collections.synchronizedSet(new HashSet<>()); + public final int sleepTimeMs; + + public SleepingTestEnv(int sleepTimeMs) { + this.sleepTimeMs = sleepTimeMs; + } + } + + public static class LatchTestRepo implements Repo { + private static final long serialVersionUID = 1L; + + @Override + public long isReady(FateId fateId, LatchTestEnv environment) { + return 0; + } + + @Override + public String getName() { + return null; + } + + @Override + public Repo call(FateId fateId, LatchTestEnv environment) throws Exception { + LOG.debug("Thread " + Thread.currentThread() + " in LatchTestRepo.call()"); + environment.numWorkers.incrementAndGet(); + environment.workersLatch.await(); + LOG.debug("Thread " + Thread.currentThread() + " finished LatchTestRepo.call()"); + environment.numWorkers.decrementAndGet(); + return null; + } + + @Override + public void undo(FateId fateId, LatchTestEnv environment) { + + } + + @Override + public String getReturn() { + return null; + } + } + + public static class LatchTestEnv { + public final AtomicInteger numWorkers = new AtomicInteger(0); + public final CountDownLatch workersLatch = new CountDownLatch(1); + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index b9e8c101271..a23dde06448 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; @@ -75,8 +76,8 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(sctx); - testMethod.execute( - new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred, fateIdGenerator), sctx); + testMethod.execute(new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, createDummyLockID(), null, + maxDeferred, fateIdGenerator), sctx); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java index 0ba14f730e2..bfd267630f5 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.test.fate.meta; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; + import java.util.UUID; import org.apache.accumulo.core.Constants; @@ -39,6 +41,6 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred String path = ZK_ROOT + Constants.ZFATE; ZooReaderWriter zk = sctx.getZooReaderWriter(); zk.mkdirs(ZK_ROOT); - testMethod.execute(new MetaFateStore<>(path, zk), sctx); + testMethod.execute(new MetaFateStore<>(path, zk, createDummyLockID(), null), sctx); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java index 5dc1daf9b29..994c7af2ebe 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.test.fate.meta; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; + import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.MetaFateStore; @@ -32,6 +34,6 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, ServerContext sctx = getCluster().getServerContext(); String path = sctx.getZooKeeperRoot() + Constants.ZFATE; ZooReaderWriter zk = sctx.getZooReaderWriter(); - testMethod.execute(new MetaFateStore<>(path, zk), sctx); + testMethod.execute(new MetaFateStore<>(path, zk, createDummyLockID(), null), sctx); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index b75b76a55be..beb48a5304e 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.meta; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; @@ -28,11 +29,13 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.Optional; import java.util.UUID; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -75,8 +78,8 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(sctx); - MetaFateStore store = - new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred, fateIdGenerator); + MetaFateStore store = new MetaFateStore<>(ZK_ROOT + Constants.ZFATE, zk, + createDummyLockID(), null, maxDeferred, fateIdGenerator); // Check that the store has no transactions before and after each test assertEquals(0, store.list().count()); @@ -89,32 +92,53 @@ protected void deleteKey(FateId fateId, ServerContext sctx) { try { // We have to use reflection since the NodeValue is internal to the store - // Grab both the constructor that uses the serialized bytes and status + // Grab both the constructors that use the serialized bytes and status, reservation Class nodeClass = Class.forName(MetaFateStore.class.getName() + "$NodeValue"); - Constructor statusCons = nodeClass.getDeclaredConstructor(TStatus.class); + Constructor statusReservationCons = + nodeClass.getDeclaredConstructor(TStatus.class, FateStore.FateReservation.class); Constructor serializedCons = nodeClass.getDeclaredConstructor(byte[].class); - statusCons.setAccessible(true); + statusReservationCons.setAccessible(true); serializedCons.setAccessible(true); - // Get the status field so it can be read and the serialize method + // Get the status and reservation fields so they can be read and get the serialize method Field nodeStatus = nodeClass.getDeclaredField("status"); + Field nodeReservation = nodeClass.getDeclaredField("reservation"); Method nodeSerialize = nodeClass.getDeclaredMethod("serialize"); nodeStatus.setAccessible(true); + nodeReservation.setAccessible(true); nodeSerialize.setAccessible(true); - // Get the existing status for the node and build a new node with an empty key + // Get the existing status and reservation for the node and build a new node with an empty key // but uses the existing tid String txPath = ZK_ROOT + Constants.ZFATE + "/tx_" + fateId.getTxUUIDStr(); Object currentNode = serializedCons.newInstance(new Object[] {zk.getData(txPath)}); TStatus currentStatus = (TStatus) nodeStatus.get(currentNode); - // replace the node with no key and just a tid and existing status - Object newNode = statusCons.newInstance(currentStatus); + Optional currentReservation = + getCurrentReservation(nodeReservation, currentNode); + // replace the node with no key and just a tid and existing status and reservation + Object newNode = + statusReservationCons.newInstance(currentStatus, currentReservation.orElse(null)); - // Replace the transaction with the same status and no key + // Replace the transaction with the same status and reservation but no key zk.putPersistentData(txPath, (byte[]) nodeSerialize.invoke(newNode), NodeExistsPolicy.OVERWRITE); } catch (Exception e) { throw new IllegalStateException(e); } } + + private Optional getCurrentReservation(Field nodeReservation, + Object currentNode) throws Exception { + Object currentResAsObject = nodeReservation.get(currentNode); + Optional currentReservation = Optional.empty(); + if (currentResAsObject instanceof Optional) { + Optional currentResAsOptional = (Optional) currentResAsObject; + if (currentResAsOptional.isPresent() + && currentResAsOptional.orElseThrow() instanceof FateStore.FateReservation) { + currentReservation = + Optional.of((FateStore.FateReservation) currentResAsOptional.orElseThrow()); + } + } + return currentReservation; + } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java index aaa9be01f16..fe16e2b014d 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java @@ -33,8 +33,11 @@ import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.user.FateMutator; import org.apache.accumulo.core.fate.user.FateMutatorImpl; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.test.fate.FateIT; import org.junit.jupiter.api.AfterAll; @@ -169,6 +172,77 @@ public void requireStatus() throws Exception { } + @Test + public void testReservations() throws Exception { + final String table = getUniqueNames(1)[0]; + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table, ntc); + + ClientContext context = (ClientContext) client; + + FateId fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + ZooUtil.LockID lockID = new ZooUtil.LockID("/locks", "L1", 50); + FateStore.FateReservation reservation = + FateStore.FateReservation.from(lockID, UUID.randomUUID()); + FateStore.FateReservation wrongReservation = + FateStore.FateReservation.from(lockID, UUID.randomUUID()); + + // Ensure that we cannot do anything in the column until it is initialized + FateMutator.Status status = + new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + status = + new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + + // Initialize the column and ensure we can't do it twice + status = new FateMutatorImpl<>(context, table, fateId).putInitReservationVal().tryMutate(); + assertEquals(ACCEPTED, status); + status = new FateMutatorImpl<>(context, table, fateId).putInitReservationVal().tryMutate(); + assertEquals(REJECTED, status); + + // Ensure that reserving is the only thing we can do + status = + new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); + assertEquals(ACCEPTED, status); + + // Should not be able to reserve when it is already reserved + status = + new FateMutatorImpl<>(context, table, fateId).putReservedTx(wrongReservation).tryMutate(); + assertEquals(REJECTED, status); + status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + + // Should be able to unreserve + status = new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(wrongReservation) + .tryMutate(); + assertEquals(REJECTED, status); + status = + new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + assertEquals(ACCEPTED, status); + status = + new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); + assertEquals(REJECTED, status); + + // Verify putReservedTxOnCreation works as expected + status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) + .tryMutate(); + assertEquals(ACCEPTED, status); + status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) + .tryMutate(); + assertEquals(REJECTED, status); + status = + new FateMutatorImpl<>(context, table, fateId1).putUnreserveTx(reservation).tryMutate(); + assertEquals(ACCEPTED, status); + status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) + .tryMutate(); + assertEquals(REJECTED, status); + } + } + void logAllEntriesInTable(String tableName, AccumuloClient client) throws Exception { client.createScanner(tableName).forEach(e -> log.info(e.getKey() + " " + e.getValue())); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java index 4d6a8da1188..7f0383e6f4c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; import java.util.stream.StreamSupport; @@ -60,8 +61,8 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, maxDeferred, fateIdGenerator), - getCluster().getServerContext()); + testMethod.execute(new UserFateStore<>(client, table, createDummyLockID(), null, maxDeferred, + fateIdGenerator), getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java index afd4d8ac5aa..83a87db975d 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateInterleavingIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; import org.apache.accumulo.core.client.Accumulo; @@ -34,8 +35,8 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, maxDeferred, fateIdGenerator), - getCluster().getServerContext()); + testMethod.execute(new UserFateStore<>(client, table, createDummyLockID(), null, maxDeferred, + fateIdGenerator), getCluster().getServerContext()); client.tableOperations().delete(table); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java index fde93de3460..3fe3192e6ba 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java @@ -18,6 +18,8 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; + import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; import org.apache.accumulo.test.fate.FateOpsCommandsIT; @@ -26,7 +28,8 @@ public class UserFateOpsCommandsIT extends FateOpsCommandsIT { @Override public void executeTest(FateTestExecutor testMethod, int maxDeferred, AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception { - testMethod.execute(new UserFateStore<>(getCluster().getServerContext()), + testMethod.execute( + new UserFateStore<>(getCluster().getServerContext(), createDummyLockID(), null), getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java index 2a3248f7671..c967fbea5ab 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.user; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; import static org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable; @@ -56,8 +57,8 @@ public void executeTest(FateTestExecutor testMethod, int maxDeferred, try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { createFateTable(client, table); - testMethod.execute(new UserFateStore<>(client, table, maxDeferred, fateIdGenerator), - getCluster().getServerContext()); + testMethod.execute(new UserFateStore<>(client, table, createDummyLockID(), null, maxDeferred, + fateIdGenerator), getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java index 760fa871491..be007a1f255 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java @@ -81,7 +81,7 @@ private static class TestUserFateStore extends UserFateStore { // use the list of fateIds to simulate collisions on fateIds public TestUserFateStore(ClientContext context, String tableName, List fateIds) { - super(context, tableName); + super(context, tableName, createDummyLockID(), null); this.fateIdIterator = fateIds.iterator(); } @@ -232,7 +232,7 @@ private void injectStatus(ClientContext client, String table, FateId fateId, TSt // Create the fate table with the exact configuration as the real Fate user instance table // including table properties and TabletAvailability - protected static void createFateTable(ClientContext client, String table) throws Exception { + public static void createFateTable(ClientContext client, String table) throws Exception { final var fateTableProps = client.tableOperations().getTableProperties(AccumuloTable.FATE.tableName()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index ad9cc33cd6b..c5e6e5eea1f 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -261,10 +262,10 @@ public void getFateStatus() { InstanceId instanceId = context.getInstanceID(); ZooReaderWriter zk = context.getZooReader().asWriter(secret); - MetaFateStore mfs = - new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk); + MetaFateStore mfs = new MetaFateStore<>( + ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk, createDummyLockID(), null); + UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); var lockPath = context.getServerPaths().createTableLocksPath(tableId.toString()); - UserFateStore ufs = new UserFateStore<>(context); Map> fateStores = Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); @@ -354,8 +355,8 @@ private boolean lookupFateInZookeeper(final String tableName) throws KeeperExcep InstanceId instanceId = context.getInstanceID(); ZooReaderWriter zk = context.getZooReader().asWriter(secret); - MetaFateStore mfs = - new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, zk); + MetaFateStore mfs = new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE, + zk, createDummyLockID(), null); var lockPath = context.getServerPaths().createTableLocksPath(tableId.toString()); AdminUtil.FateStatus fateStatus = admin.getStatus(mfs, zk, lockPath, null, null, null); @@ -384,8 +385,8 @@ private boolean lookupFateInAccumulo(final String tableName) throws KeeperExcept log.trace("tid: {}", tableId); - UserFateStore as = new UserFateStore<>(context); - AdminUtil.FateStatus fateStatus = admin.getStatus(as, null, null, null); + UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); + AdminUtil.FateStatus fateStatus = admin.getStatus(ufs, null, null, null); log.trace("current fates: {}", fateStatus.getTransactions().size()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 936b2312719..9172a2d7b45 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -19,6 +19,7 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.fate.AbstractFateStore.createDummyLockID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -233,9 +234,9 @@ private static FateStatus getFateStatus(AccumuloCluster cluster) { AdminUtil admin = new AdminUtil<>(false); ServerContext context = cluster.getServerContext(); ZooReaderWriter zk = context.getZooReaderWriter(); - MetaFateStore mfs = - new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, zk); - UserFateStore ufs = new UserFateStore<>(context); + MetaFateStore mfs = new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE, + zk, createDummyLockID(), null); + UserFateStore ufs = new UserFateStore<>(context, createDummyLockID(), null); Map> fateStores = Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs); var lockPath = context.getServerPaths().createTableLocksPath();