diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index 6b7b68baf58..ae193d8df81 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -29,7 +29,6 @@ import java.util.Optional; import java.util.UUID; -import org.apache.accumulo.core.fate.user.FateMutatorImpl; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.hadoop.io.DataInputBuffer; @@ -147,19 +146,6 @@ public static FateReservation from(ZooUtil.LockID lockID, UUID reservationUUID) return new FateReservation(lockID, reservationUUID); } - /** - * @param serializedFateRes the value present in the table for the reservation column - * @return true if the array represents a valid serialized FateReservation object, false if it - * represents an unreserved value, error otherwise - */ - public static boolean isFateReservation(byte[] serializedFateRes) { - if (Arrays.equals(serializedFateRes, FateMutatorImpl.NOT_RESERVED)) { - return false; - } - deserialize(serializedFateRes); - return true; - } - public ZooUtil.LockID getLockID() { return lockID; } @@ -195,10 +181,6 @@ public static FateReservation deserialize(byte[] serialized) { } } - public static boolean locksAreEqual(ZooUtil.LockID lockID1, ZooUtil.LockID lockID2) { - return lockID1.serialize("/").equals(lockID2.serialize("/")); - } - @Override public String toString() { return lockID.serialize("/") + ":" + reservationUUID; diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java index 8c39e897000..d199a7463e4 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java @@ -44,31 +44,13 @@ public interface FateMutator { /** * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * put the reservation if the column doesn't exist yet. This should only be used for - * {@link UserFateStore#createAndReserve(FateKey)} - * - * @param reservation the reservation to attempt to put - * @return the FateMutator with this added mutation - */ - FateMutator putReservedTxOnCreation(FateStore.FateReservation reservation); - - /** - * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * remove the given reservation if it matches what is present in the column. + * delete the column if the column value matches the given reservation * * @param reservation the reservation to attempt to remove * @return the FateMutator with this added mutation */ FateMutator putUnreserveTx(FateStore.FateReservation reservation); - /** - * Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will - * put the initial column value if it has not already been set yet - * - * @return the FateMutator with this added mutation - */ - FateMutator putInitReservationVal(); - FateMutator putName(byte[] data); FateMutator putAutoClean(byte[] data); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java index fcb0e4f1f16..5d99a8df3a3 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java @@ -18,7 +18,6 @@ */ package org.apache.accumulo.core.fate.user; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.core.fate.AbstractFateStore.serialize; import static org.apache.accumulo.core.fate.user.UserFateStore.getRow; import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId; @@ -51,8 +50,6 @@ public class FateMutatorImpl implements FateMutator { - public static final byte[] NOT_RESERVED = "".getBytes(UTF_8); - private final ClientContext context; private final String tableName; private final FateId fateId; @@ -85,15 +82,6 @@ public FateMutator putCreateTime(long ctime) { @Override public FateMutator putReservedTx(FateStore.FateReservation reservation) { - Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED); - mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized())); - return this; - } - - @Override - public FateMutator putReservedTxOnCreation(FateStore.FateReservation reservation) { Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); mutation.addCondition(condition); @@ -107,16 +95,7 @@ public FateMutator putUnreserveTx(FateStore.FateReservation reservation) { TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()) .setValue(reservation.getSerialized()); mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED)); - return this; - } - - @Override - public FateMutator putInitReservationVal() { - Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), - TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); - mutation.addCondition(condition); - TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED)); + TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation); return this; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java index e1cb4d6405f..7446d1fafe3 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java @@ -107,7 +107,7 @@ public FateId create() { } var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW) - .putCreateTime(System.currentTimeMillis()).putInitReservationVal().tryMutate(); + .putCreateTime(System.currentTimeMillis()).tryMutate(); switch (status) { case ACCEPTED: @@ -137,8 +137,7 @@ public Optional> createAndReserve(FateKey fateKey) { // Only need to retry if it is UNKNOWN for (int attempt = 0; attempt < maxAttempts; attempt++) { status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey) - .putReservedTxOnCreation(reservation).putCreateTime(System.currentTimeMillis()) - .tryMutate(); + .putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate(); if (status != FateMutator.Status.UNKNOWN) { break; } @@ -182,9 +181,7 @@ public Optional> createAndReserve(FateKey fateKey) { fateKeySeen = Optional.of(FateKey.deserialize(val.get())); break; case TxColumnFamily.RESERVATION: - if (FateReservation.isFateReservation(val.get())) { - reservationSeen = Optional.of(FateReservation.deserialize(val.get())); - } + reservationSeen = Optional.of(FateReservation.deserialize(val.get())); break; default: throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq); @@ -231,7 +228,9 @@ public Optional> tryReserve(FateId fateId) { // Create a unique FateReservation for this reservation attempt FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID()); - FateMutator.Status status = newMutator(fateId).putReservedTx(reservation).tryMutate(); + // requiring any status prevents creating an entry if the fate id doesn't exist + FateMutator.Status status = + newMutator(fateId).requireStatus(TStatus.values()).putReservedTx(reservation).tryMutate(); if (status.equals(FateMutator.Status.ACCEPTED)) { return Optional.of(new FateTxStoreImpl(fateId, reservation)); } else if (status.equals(FateMutator.Status.UNKNOWN)) { @@ -246,10 +245,9 @@ public Optional> tryReserve(FateId fateId) { scanner.setRange(getRow(fateId)); scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(), TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()); - FateReservation persistedRes = scanner.stream() - .filter(entry -> FateReservation.isFateReservation(entry.getValue().get())) - .map(entry -> FateReservation.deserialize(entry.getValue().get())).findFirst() - .orElse(null); + FateReservation persistedRes = + scanner.stream().map(entry -> FateReservation.deserialize(entry.getValue().get())) + .findFirst().orElse(null); if (persistedRes != null && persistedRes.equals(reservation)) { return Optional.of(new FateTxStoreImpl(fateId, reservation)); } @@ -318,9 +316,7 @@ protected Stream getTransactions(EnumSet statuses) { status = TStatus.valueOf(val.toString()); break; case TxColumnFamily.RESERVATION: - if (FateReservation.isFateReservation(val.get())) { - reservation = FateReservation.deserialize(val.get()); - } + reservation = FateReservation.deserialize(val.get()); break; default: throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java index 51002ee5743..263f17e440c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.data.InstanceId; @@ -92,6 +93,24 @@ public String serialize(String root) { public String toString() { return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid); } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj instanceof LockID) { + LockID other = (LockID) obj; + return this.path.equals(other.path) && this.node.equals(other.node) + && this.eid == other.eid; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(path, node, eid); + } } // Need to use Collections.unmodifiableList() instead of List.of() or List.copyOf(), because diff --git a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java index f5e537394db..edd6a538597 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java @@ -421,8 +421,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce // Verify store1 has the transactions reserved and that they were reserved with lock1 reservations = store1.getActiveReservations(); assertEquals(allIds, reservations.keySet()); - reservations.values().forEach( - res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID()))); + reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); if (isUserStore) { store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld); @@ -434,8 +433,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce // store1 reservations = store2.getActiveReservations(); assertEquals(allIds, reservations.keySet()); - reservations.values().forEach( - res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID()))); + reservations.values().forEach(res -> assertEquals(lock1, res.getLockID())); // Simulate what would happen if the Manager using the Fate object (fate1) died. // isLockHeld would return false for the LockId of the Manager that died (in this case, lock1) @@ -455,8 +453,8 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce // the workers for fate1 are hung up Wait.waitFor(() -> { Map store2Reservations = store2.getActiveReservations(); - boolean allReservedWithLock2 = store2Reservations.values().stream() - .allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2)); + boolean allReservedWithLock2 = + store2Reservations.values().stream().allMatch(entry -> entry.getLockID().equals(lock2)); return store2Reservations.keySet().equals(allIds) && allReservedWithLock2; }, fate1.getDeadResCleanupDelay().toMillis() * 2); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java index fe16e2b014d..1d80a5fb9b6 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java @@ -181,29 +181,14 @@ public void testReservations() throws Exception { ClientContext context = (ClientContext) client; FateId fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); - FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID()); ZooUtil.LockID lockID = new ZooUtil.LockID("/locks", "L1", 50); FateStore.FateReservation reservation = FateStore.FateReservation.from(lockID, UUID.randomUUID()); FateStore.FateReservation wrongReservation = FateStore.FateReservation.from(lockID, UUID.randomUUID()); - // Ensure that we cannot do anything in the column until it is initialized - FateMutator.Status status = - new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); - assertEquals(REJECTED, status); - status = - new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); - assertEquals(REJECTED, status); - - // Initialize the column and ensure we can't do it twice - status = new FateMutatorImpl<>(context, table, fateId).putInitReservationVal().tryMutate(); - assertEquals(ACCEPTED, status); - status = new FateMutatorImpl<>(context, table, fateId).putInitReservationVal().tryMutate(); - assertEquals(REJECTED, status); - // Ensure that reserving is the only thing we can do - status = + FateMutator.Status status = new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); assertEquals(REJECTED, status); status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate(); @@ -226,20 +211,6 @@ public void testReservations() throws Exception { status = new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate(); assertEquals(REJECTED, status); - - // Verify putReservedTxOnCreation works as expected - status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) - .tryMutate(); - assertEquals(ACCEPTED, status); - status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) - .tryMutate(); - assertEquals(REJECTED, status); - status = - new FateMutatorImpl<>(context, table, fateId1).putUnreserveTx(reservation).tryMutate(); - assertEquals(ACCEPTED, status); - status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation) - .tryMutate(); - assertEquals(REJECTED, status); } }