From 8e562c5df018b28ab3944110fa39bedf31d5034f Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Fri, 20 Sep 2024 16:20:14 -0400 Subject: [PATCH 1/5] Several misc Fate changes - Add a toString() to FateKey - Move MetaFateStore to org.apache.accumulo.core.fate.zookeeper - Periodic clean up of dead reservations increased from every 30 seconds to every few minutes - New fate test case added to FateIT that ensures no write ops can be performed on a transaction after it has been deleted - Added new check to verifyReserved() that checks whether the transaction is deleted - Fixed UserFateStoreIT to work with new change and misc cleanup to the class --- .../accumulo/core/fate/AbstractFateStore.java | 14 +++--- .../org/apache/accumulo/core/fate/Fate.java | 2 +- .../apache/accumulo/core/fate/FateKey.java | 8 ++++ .../core/fate/user/UserFateStore.java | 18 +++---- .../fate/{ => zookeeper}/MetaFateStore.java | 30 +++++++----- .../apache/accumulo/server/util/Admin.java | 2 +- .../org/apache/accumulo/manager/Manager.java | 2 +- .../metrics/fate/meta/MetaFateMetrics.java | 2 +- .../compaction/ExternalCompaction_1_IT.java | 2 +- .../org/apache/accumulo/test/fate/FateIT.java | 32 +++++++++++++ .../accumulo/test/fate/FateOpsCommandsIT.java | 2 +- .../accumulo/test/fate/MultipleStoresIT.java | 16 +++---- .../accumulo/test/fate/meta/MetaFateIT.java | 2 +- .../fate/meta/MetaFateInterleavingIT.java | 2 +- .../test/fate/meta/MetaFateOpsCommandsIT.java | 2 +- .../test/fate/meta/MetaFateStoreFateIT.java | 2 +- .../test/fate/user/UserFateStoreIT.java | 47 ++++++------------- .../test/functional/FateConcurrencyIT.java | 2 +- .../test/functional/FunctionalTestUtils.java | 2 +- 19 files changed, 107 insertions(+), 82 deletions(-) rename core/src/main/java/org/apache/accumulo/core/fate/{ => zookeeper}/MetaFateStore.java (96%) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 96d5805e6e9..1cb47a00274 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -311,7 +311,7 @@ protected boolean isReserved() { public TStatus waitForStatusChange(EnumSet expected) { Preconditions.checkState(!isReserved(), "Attempted to wait for status change while reserved: " + fateId); - verifyReserved(false); + verifyReservedAndNotDeleted(false); int currNumCallers = concurrentStatusChangeCallers.incrementAndGet(); @@ -376,16 +376,14 @@ public void unreserve(Duration deferTime) { protected abstract void unreserve(); - protected void verifyReserved(boolean isWrite) { - if (!isReserved() && isWrite) { - throw new IllegalStateException( - "Attempted write on unreserved FATE transaction: " + fateId); - } + protected void verifyReservedAndNotDeleted(boolean isWrite) { + Preconditions.checkState(!isWrite || (isReserved() && !deleted), + "Attempted write on unreserved or deleted FATE transaction: " + fateId); } @Override public TStatus getStatus() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); var status = _getStatus(fateId); observedStatus = status; return status; @@ -393,7 +391,7 @@ public TStatus getStatus() { @Override public Optional getKey() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return AbstractFateStore.this.getKey(fateId); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index e2d4e7cbe55..0e04a1a255b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -384,7 +384,7 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, store.type() + "-dead-reservation-cleaner-pool"); ScheduledFuture deadReservationCleaner = deadResCleanerExecutor - .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 30, SECONDS); + .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 180, SECONDS); ThreadPools.watchCriticalScheduledTask(deadReservationCleaner); } this.deadResCleanerExecutor = deadResCleanerExecutor; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java index 6c1663627c7..ec41d159b01 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java @@ -168,4 +168,12 @@ private static Optional deserializeCompactionId(FateKeyTyp throw new IllegalStateException("Unexpected FateInstanceType found " + type); } } + + @Override + public String toString() { + return "[" + getClass().getSimpleName() + " FateKeyType:" + type + + (keyExtent.isPresent() ? ", KeyExtent:" + keyExtent.orElseThrow() + : ", ExternalCompactionID:" + compactionId.orElseThrow()) + + "]"; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index 1d45c170f61..2c27cd9f5f1 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -420,7 +420,7 @@ private FateTxStoreImpl(FateId fateId, FateReservation reservation) { @Override public Repo top() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return scanTx(scanner -> { scanner.setRange(getRow(fateId)); @@ -436,7 +436,7 @@ public Repo top() { @Override public List> getStack() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return scanTx(scanner -> { scanner.setRange(getRow(fateId)); @@ -451,7 +451,7 @@ public List> getStack() { @Override public Serializable getTransactionInfo(TxInfo txInfo) { - verifyReserved(false); + verifyReservedAndNotDeleted(false); try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { scanner.setRange(getRow(fateId)); @@ -487,7 +487,7 @@ public Serializable getTransactionInfo(TxInfo txInfo) { @Override public long timeCreated() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return scanTx(scanner -> { scanner.setRange(getRow(fateId)); @@ -499,7 +499,7 @@ public long timeCreated() { @Override public void push(Repo repo) throws StackOverflowException { - verifyReserved(true); + verifyReservedAndNotDeleted(true); Optional top = findTop(); @@ -514,7 +514,7 @@ public void push(Repo repo) throws StackOverflowException { @Override public void pop() { - verifyReserved(true); + verifyReservedAndNotDeleted(true); Optional top = findTop(); top.ifPresent(t -> newMutator(fateId) @@ -523,7 +523,7 @@ public void pop() { @Override public void setStatus(TStatus status) { - verifyReserved(true); + verifyReservedAndNotDeleted(true); newMutator(fateId).putStatus(status).mutate(); observedStatus = status; @@ -531,7 +531,7 @@ public void setStatus(TStatus status) { @Override public void setTransactionInfo(TxInfo txInfo, Serializable so) { - verifyReserved(true); + verifyReservedAndNotDeleted(true); final byte[] serialized = serializeTxInfo(so); @@ -540,7 +540,7 @@ public void setTransactionInfo(TxInfo txInfo, Serializable so) { @Override public void delete() { - verifyReserved(true); + verifyReservedAndNotDeleted(true); var mutator = newMutator(fateId); mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java similarity index 96% rename from core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java rename to core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java index d801167d074..07651edfc0a 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.core.fate; +package org.apache.accumulo.core.fate.zookeeper; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -40,9 +40,15 @@ import java.util.stream.Stream; import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.Fate.TxInfo; -import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; -import org.apache.accumulo.core.fate.zookeeper.ZooUtil; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.StackOverflowException; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.hadoop.io.DataInputBuffer; @@ -240,7 +246,7 @@ private FateTxStoreImpl(FateId fateId, FateReservation reservation) { @Override public Repo top() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); for (int i = 0; i < RETRIES; i++) { String txpath = getTXPath(fateId); @@ -292,7 +298,7 @@ private String findTop(String txpath) throws KeeperException, InterruptedExcepti @Override public void push(Repo repo) throws StackOverflowException { - verifyReserved(true); + verifyReservedAndNotDeleted(true); String txpath = getTXPath(fateId); try { @@ -311,7 +317,7 @@ public void push(Repo repo) throws StackOverflowException { @Override public void pop() { - verifyReserved(true); + verifyReservedAndNotDeleted(true); try { String txpath = getTXPath(fateId); @@ -327,7 +333,7 @@ public void pop() { @Override public void setStatus(TStatus status) { - verifyReserved(true); + verifyReservedAndNotDeleted(true); try { zk.mutateExisting(getTXPath(fateId), currSerializedData -> { @@ -354,7 +360,7 @@ public void setStatus(TStatus status) { @Override public void delete() { - verifyReserved(true); + verifyReservedAndNotDeleted(true); try { zk.recursiveDelete(getTXPath(fateId), NodeMissingPolicy.SKIP); @@ -366,7 +372,7 @@ public void delete() { @Override public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { - verifyReserved(true); + verifyReservedAndNotDeleted(true); try { zk.putPersistentData(getTXPath(fateId) + "/" + txInfo, serializeTxInfo(so), @@ -378,14 +384,14 @@ public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { @Override public Serializable getTransactionInfo(Fate.TxInfo txInfo) { - verifyReserved(false); + verifyReservedAndNotDeleted(false); return MetaFateStore.this.getTransactionInfo(txInfo, fateId); } @Override public long timeCreated() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); try { Stat stat = zk.getZooKeeper().exists(getTXPath(fateId), false); @@ -397,7 +403,7 @@ public long timeCreated() { @Override public List> getStack() { - verifyReserved(false); + verifyReservedAndNotDeleted(false); String txpath = getTXPath(fateId); outer: while (true) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 1dd789c4d36..61b333a5518 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -71,9 +71,9 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.lock.ServiceLock; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index d8874b0c6d5..4b1e67e16d0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -76,8 +76,8 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache.ZcStat; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java index 1087cf1b9be..02aa3a28f45 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/meta/MetaFateMetrics.java @@ -26,8 +26,8 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.manager.metrics.fate.FateMetrics; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.KeeperException; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java index 314212693a3..e8955e465ab 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java @@ -78,8 +78,8 @@ import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.iterators.DevNull; import org.apache.accumulo.core.iterators.Filter; import org.apache.accumulo.core.iterators.IteratorEnvironment; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index bd7c4a2395b..d36e98bdecb 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -24,8 +24,10 @@ import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; @@ -43,6 +45,7 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -479,6 +482,35 @@ protected void testRepoFails(FateStore store, ServerContext sctx) throw } } + @Test + @Timeout(30) + public void testNoWriteAfterDelete() throws Exception { + executeTest(this::testNoWriteAfterDelete); + } + + protected void testNoWriteAfterDelete(FateStore store, ServerContext sctx) + throws Exception { + final String tableName = getUniqueNames(1)[0]; + final FateId fateId = store.create(); + final Repo repo = new TestRepo("testNoWriteAfterDelete"); + + var txStore = store.reserve(fateId); + + // all write ops should be ok after reservation + assertDoesNotThrow(() -> txStore.push(repo)); + assertDoesNotThrow(() -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL)); + assertDoesNotThrow(txStore::pop); + assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, "name")); + assertDoesNotThrow(txStore::delete); + + // test that all write ops result in an exception since the tx has been deleted + assertThrows(Exception.class, () -> txStore.push(repo)); + assertThrows(Exception.class, () -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL)); + assertThrows(Exception.class, txStore::pop); + assertThrows(Exception.class, () -> txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, "name")); + assertThrows(Exception.class, txStore::delete); + } + private void submitDeferred(Fate fate, ServerContext sctx, Set transactions) { FateId fateId = fate.startTransaction(); transactions.add(fateId); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java index 8b52f88f97f..0db83c044a4 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java @@ -59,9 +59,9 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.iterators.IteratorUtil; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index 57070bacde1..d51ab3d7ff4 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -46,10 +46,10 @@ import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.harness.SharedMiniClusterBase; @@ -111,7 +111,7 @@ private void testReserveUnreserve(FateInstanceType storeType) throws Exception { final int numFateIds = 500; final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID()); final List> reservations = new ArrayList<>(); - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set allIds = new HashSet<>(); final FateStore store1, store2; final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50); @@ -182,7 +182,7 @@ private void testReserveNonExistentTxn(FateInstanceType storeType) throws Except // Tests that reserve() doesn't hang indefinitely and instead throws an error // on reserve() a non-existent transaction. final FateStore store; - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final String tableName = getUniqueNames(1)[0]; final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID()); final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50); @@ -208,7 +208,7 @@ private void testReserveReservedAndUnreserveUnreserved(FateInstanceType storeTyp throws Exception { final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set allIds = new HashSet<>(); final List> reservations = new ArrayList<>(); final FateStore store; @@ -256,7 +256,7 @@ private void testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType st throws Exception { final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set allIds = new HashSet<>(); final List> reservations = new ArrayList<>(); final FateStore store; @@ -312,7 +312,7 @@ public void testMultipleFateInstances() throws Exception { private void testMultipleFateInstances(FateInstanceType storeType) throws Exception { final String tableName = getUniqueNames(1)[0]; final int numFateIds = 500; - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set allIds = new HashSet<>(); final FateStore store1, store2; final SleepingTestEnv testEnv1 = new SleepingTestEnv(50); @@ -380,7 +380,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce // One transaction for each FATE worker thread final int numFateIds = Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue()); - final boolean isUserStore = storeType.equals(FateInstanceType.USER); + final boolean isUserStore = storeType == FateInstanceType.USER; final Set allIds = new HashSet<>(); final FateStore store1, store2; final LatchTestEnv testEnv1 = new LatchTestEnv(); @@ -458,7 +458,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce boolean allReservedWithLock2 = store2Reservations.values().stream() .allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2)); return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; - }, 60_000); + }, 60_000 * 4); // Finish work and shutdown testEnv1.workersLatch.countDown(); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java index a23dde06448..c5f541b5e9d 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java @@ -32,8 +32,8 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java index bfd267630f5..d306e0bfefd 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateInterleavingIT.java @@ -24,7 +24,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; -import org.apache.accumulo.core.fate.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateInterleavingIT; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java index 994c7af2ebe..c4c1e5b24a5 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java @@ -22,7 +22,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.fate.AbstractFateStore; -import org.apache.accumulo.core.fate.MetaFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateOpsCommandsIT; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java index beb48a5304e..af8b98db0f9 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java @@ -36,8 +36,8 @@ import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.ServerContext; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java index be007a1f255..9ed9a19c36c 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreIT.java @@ -23,10 +23,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.Iterator; -import java.util.List; import java.util.Set; -import java.util.UUID; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.BatchWriter; @@ -40,6 +37,7 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateStore; @@ -76,29 +74,6 @@ public static void teardown() { SharedMiniClusterBase.stopMiniCluster(); } - private static class TestUserFateStore extends UserFateStore { - private final Iterator fateIdIterator; - - // use the list of fateIds to simulate collisions on fateIds - public TestUserFateStore(ClientContext context, String tableName, List fateIds) { - super(context, tableName, createDummyLockID(), null); - this.fateIdIterator = fateIds.iterator(); - } - - @Override - public FateId getFateId() { - if (fateIdIterator.hasNext()) { - return fateIdIterator.next(); - } else { - return FateId.from(fateInstanceType, UUID.randomUUID()); - } - } - - public TStatus getStatus(FateId fateId) { - return _getStatus(fateId); - } - } - // Test that configs related to the correctness of the FATE instance user table // are initialized correctly @Test @@ -151,7 +126,7 @@ class TestStatusEnforcement { String tableName; ClientContext client; FateId fateId; - TestUserFateStore store; + UserFateStore store; FateStore.FateTxStore txStore; @BeforeEach @@ -159,9 +134,8 @@ public void setup() throws Exception { client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); tableName = getUniqueNames(1)[0]; createFateTable(client, tableName); - fateId = FateId.from(fateInstanceType, UUID.randomUUID()); - store = new TestUserFateStore(client, tableName, List.of(fateId)); - store.create(); + store = new UserFateStore<>(client, tableName, AbstractFateStore.createDummyLockID(), null); + fateId = store.create(); txStore = store.reserve(fateId); } @@ -177,7 +151,10 @@ private void testOperationWithStatuses(Runnable beforeOperation, Executable oper beforeOperation.run(); injectStatus(client, tableName, fateId, status); - assertEquals(status, store.getStatus(fateId)); + var fateIdStatus = + store.list().filter(statusEntry -> statusEntry.getFateId().equals(fateId)).findFirst() + .orElseThrow(); + assertEquals(status, fateIdStatus.getStatus()); if (!acceptableStatuses.contains(status)) { assertThrows(IllegalStateException.class, operation, "Expected operation to fail with status " + status + " but it did not"); @@ -210,8 +187,12 @@ public void pop() throws Exception { @Test public void delete() throws Exception { - testOperationWithStatuses(() -> {}, // No special setup needed for delete - txStore::delete, + testOperationWithStatuses(() -> { + // Setup for delete: Create a new txStore before each delete since delete cannot be called + // on the same txStore more than once + fateId = store.create(); + txStore = store.reserve(fateId); + }, () -> txStore.delete(), Set.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java index c5e6e5eea1f..5e5775110f9 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java @@ -51,9 +51,9 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.manager.state.tables.TableState; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index 9172a2d7b45..28b08dbbf02 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -62,9 +62,9 @@ import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.AdminUtil.FateStatus; import org.apache.accumulo.core.fate.FateInstanceType; -import org.apache.accumulo.core.fate.MetaFateStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.StoredTabletFile; From 9599bd87d02a58413080cc1b241a6285857d2778 Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Mon, 23 Sep 2024 12:10:46 -0400 Subject: [PATCH 2/5] Improved runtime of testDeadReservationsCleanup - created new class FastFate which performs the dead reservation cleanup more often --- .../org/apache/accumulo/core/fate/Fate.java | 8 +++- .../apache/accumulo/test/fate/FastFate.java | 43 +++++++++++++++++++ .../accumulo/test/fate/MultipleStoresIT.java | 10 ++--- 3 files changed, 54 insertions(+), 7 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/fate/FastFate.java diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 0e04a1a255b..6d0bc6c7ad2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -383,8 +383,8 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, // reservations held by dead processes, if they exist. deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, store.type() + "-dead-reservation-cleaner-pool"); - ScheduledFuture deadReservationCleaner = deadResCleanerExecutor - .scheduleWithFixedDelay(new DeadReservationCleaner(), 3, 180, SECONDS); + ScheduledFuture deadReservationCleaner = deadResCleanerExecutor.scheduleWithFixedDelay( + new DeadReservationCleaner(), 3, getDeadResCleanupDelay(), SECONDS); ThreadPools.watchCriticalScheduledTask(deadReservationCleaner); } this.deadResCleanerExecutor = deadResCleanerExecutor; @@ -393,6 +393,10 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, this.workFinder.start(); } + protected long getDeadResCleanupDelay() { + return 180; + } + // get a transaction id back to the requester before doing any work public FateId startTransaction() { return store.create(); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java new file mode 100644 index 00000000000..ec800e2e89a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate; + +import java.util.function.Function; + +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.Repo; + +/** + * A FATE which performs the dead reservation cleanup with a much shorter delay between + */ +public class FastFate extends Fate { + public static final long delay = 15; + + public FastFate(T environment, FateStore store, boolean runDeadResCleaner, + Function,String> toLogStrFunc, AccumuloConfiguration conf) { + super(environment, store, runDeadResCleaner, toLogStrFunc, conf); + } + + @Override + protected long getDeadResCleanupDelay() { + return delay; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index d51ab3d7ff4..762babfd730 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -399,8 +399,8 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce } liveLocks.add(lock1); - Fate fate1 = - new Fate<>(testEnv1, store1, true, Object::toString, DefaultConfiguration.getInstance()); + FastFate fate1 = new FastFate<>(testEnv1, store1, true, Object::toString, + DefaultConfiguration.getInstance()); // Ensure nothing is reserved yet assertTrue(store1.getActiveReservations().isEmpty()); @@ -445,8 +445,8 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce // Create the new Fate/start the Fate threads (the work finder and the workers). // Don't run another dead reservation cleaner since we already have one running from fate1. - Fate fate2 = - new Fate<>(testEnv2, store2, false, Object::toString, DefaultConfiguration.getInstance()); + FastFate fate2 = new FastFate<>(testEnv2, store2, false, Object::toString, + DefaultConfiguration.getInstance()); // Wait for the "dead" reservations to be deleted and picked up again (reserved using // fate2/store2/lock2 now). @@ -458,7 +458,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce boolean allReservedWithLock2 = store2Reservations.values().stream() .allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2)); return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; - }, 60_000 * 4); + }, FastFate.delay * 2 * 1000); // Finish work and shutdown testEnv1.workersLatch.countDown(); From f7556c92ab59567b19abd37c7c4394b9c5aab82e Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Mon, 23 Sep 2024 14:55:01 -0400 Subject: [PATCH 3/5] changed delay to Duration instead of long --- core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 6 +++--- .../main/java/org/apache/accumulo/test/fate/FastFate.java | 6 +++--- .../org/apache/accumulo/test/fate/MultipleStoresIT.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 6d0bc6c7ad2..b6860c557d8 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -384,7 +384,7 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, deadResCleanerExecutor = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, store.type() + "-dead-reservation-cleaner-pool"); ScheduledFuture deadReservationCleaner = deadResCleanerExecutor.scheduleWithFixedDelay( - new DeadReservationCleaner(), 3, getDeadResCleanupDelay(), SECONDS); + new DeadReservationCleaner(), 3, getDeadResCleanupDelay().toSeconds(), SECONDS); ThreadPools.watchCriticalScheduledTask(deadReservationCleaner); } this.deadResCleanerExecutor = deadResCleanerExecutor; @@ -393,8 +393,8 @@ public Fate(T environment, FateStore store, boolean runDeadResCleaner, this.workFinder.start(); } - protected long getDeadResCleanupDelay() { - return 180; + public Duration getDeadResCleanupDelay() { + return Duration.ofMinutes(3); } // get a transaction id back to the requester before doing any work diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java index ec800e2e89a..71b198c0ac9 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FastFate.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate; +import java.time.Duration; import java.util.function.Function; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@ -29,7 +30,6 @@ * A FATE which performs the dead reservation cleanup with a much shorter delay between */ public class FastFate extends Fate { - public static final long delay = 15; public FastFate(T environment, FateStore store, boolean runDeadResCleaner, Function,String> toLogStrFunc, AccumuloConfiguration conf) { @@ -37,7 +37,7 @@ public FastFate(T environment, FateStore store, boolean runDeadResCleaner, } @Override - protected long getDeadResCleanupDelay() { - return delay; + public Duration getDeadResCleanupDelay() { + return Duration.ofSeconds(15); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index 762babfd730..f5e537394db 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -458,7 +458,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce boolean allReservedWithLock2 = store2Reservations.values().stream() .allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2)); return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; - }, FastFate.delay * 2 * 1000); + }, fate1.getDeadResCleanupDelay().toMillis() * 2); // Finish work and shutdown testEnv1.workersLatch.countDown(); From 3ca39d0d57fbf5321b3ca5abc8fee1b16a52c37f Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Fri, 27 Sep 2024 12:34:01 -0400 Subject: [PATCH 4/5] FateKey toString reformatting --- .../java/org/apache/accumulo/core/fate/FateKey.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java index ec41d159b01..43c680c21e2 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java @@ -28,6 +28,8 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.hadoop.io.DataInputBuffer; public class FateKey { @@ -171,9 +173,10 @@ private static Optional deserializeCompactionId(FateKeyTyp @Override public String toString() { - return "[" + getClass().getSimpleName() + " FateKeyType:" + type - + (keyExtent.isPresent() ? ", KeyExtent:" + keyExtent.orElseThrow() - : ", ExternalCompactionID:" + compactionId.orElseThrow()) - + "]"; + var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE); + buf.append("FateKeyType", type); + keyExtent.ifPresentOrElse(keyExtent -> buf.append("KeyExtent", keyExtent), + () -> buf.append("ExternalCompactionID", compactionId.orElseThrow())); + return buf.toString(); } } From d61c531ba39695d1eadf8fce242a0157eee851cd Mon Sep 17 00:00:00 2001 From: Kevin Rathbun Date: Fri, 27 Sep 2024 12:49:01 -0400 Subject: [PATCH 5/5] formatting --- core/src/main/java/org/apache/accumulo/core/fate/FateKey.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java index 43c680c21e2..8942149a6ff 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java @@ -176,7 +176,7 @@ public String toString() { var buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE); buf.append("FateKeyType", type); keyExtent.ifPresentOrElse(keyExtent -> buf.append("KeyExtent", keyExtent), - () -> buf.append("ExternalCompactionID", compactionId.orElseThrow())); + () -> buf.append("ExternalCompactionID", compactionId.orElseThrow())); return buf.toString(); } }