Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 0 additions & 18 deletions core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Optional;
import java.util.UUID;

import org.apache.accumulo.core.fate.user.FateMutatorImpl;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.hadoop.io.DataInputBuffer;

Expand Down Expand Up @@ -147,19 +146,6 @@ public static FateReservation from(ZooUtil.LockID lockID, UUID reservationUUID)
return new FateReservation(lockID, reservationUUID);
}

/**
* @param serializedFateRes the value present in the table for the reservation column
* @return true if the array represents a valid serialized FateReservation object, false if it
* represents an unreserved value, error otherwise
*/
public static boolean isFateReservation(byte[] serializedFateRes) {
if (Arrays.equals(serializedFateRes, FateMutatorImpl.NOT_RESERVED)) {
return false;
}
deserialize(serializedFateRes);
return true;
}

public ZooUtil.LockID getLockID() {
return lockID;
}
Expand Down Expand Up @@ -195,10 +181,6 @@ public static FateReservation deserialize(byte[] serialized) {
}
}

public static boolean locksAreEqual(ZooUtil.LockID lockID1, ZooUtil.LockID lockID2) {
return lockID1.serialize("/").equals(lockID2.serialize("/"));
}

@Override
public String toString() {
return lockID.serialize("/") + ":" + reservationUUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,31 +44,13 @@ public interface FateMutator<T> {

/**
* Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
* put the reservation if the column doesn't exist yet. This should only be used for
* {@link UserFateStore#createAndReserve(FateKey)}
*
* @param reservation the reservation to attempt to put
* @return the FateMutator with this added mutation
*/
FateMutator<T> putReservedTxOnCreation(FateStore.FateReservation reservation);

/**
* Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
* remove the given reservation if it matches what is present in the column.
* delete the column if the column value matches the given reservation
*
* @param reservation the reservation to attempt to remove
* @return the FateMutator with this added mutation
*/
FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation);

/**
* Add a conditional mutation to {@link FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
* put the initial column value if it has not already been set yet
*
* @return the FateMutator with this added mutation
*/
FateMutator<T> putInitReservationVal();

FateMutator<T> putName(byte[] data);

FateMutator<T> putAutoClean(byte[] data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.accumulo.core.fate.user;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.fate.AbstractFateStore.serialize;
import static org.apache.accumulo.core.fate.user.UserFateStore.getRow;
import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId;
Expand Down Expand Up @@ -51,8 +50,6 @@

public class FateMutatorImpl<T> implements FateMutator<T> {

public static final byte[] NOT_RESERVED = "".getBytes(UTF_8);

private final ClientContext context;
private final String tableName;
private final FateId fateId;
Expand Down Expand Up @@ -85,15 +82,6 @@ public FateMutator<T> putCreateTime(long ctime) {

@Override
public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED);
mutation.addCondition(condition);
TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(reservation.getSerialized()));
return this;
}

@Override
public FateMutator<T> putReservedTxOnCreation(FateStore.FateReservation reservation) {
Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
mutation.addCondition(condition);
Expand All @@ -107,16 +95,7 @@ public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) {
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
.setValue(reservation.getSerialized());
mutation.addCondition(condition);
TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED));
return this;
}

@Override
public FateMutator<T> putInitReservationVal() {
Condition condition = new Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
mutation.addCondition(condition);
TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED));
TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public FateId create() {
}

var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW)
.putCreateTime(System.currentTimeMillis()).putInitReservationVal().tryMutate();
.putCreateTime(System.currentTimeMillis()).tryMutate();

switch (status) {
case ACCEPTED:
Expand Down Expand Up @@ -137,8 +137,7 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
// Only need to retry if it is UNKNOWN
for (int attempt = 0; attempt < maxAttempts; attempt++) {
status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
.putReservedTxOnCreation(reservation).putCreateTime(System.currentTimeMillis())
.tryMutate();
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
if (status != FateMutator.Status.UNKNOWN) {
break;
}
Expand Down Expand Up @@ -182,9 +181,7 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
break;
case TxColumnFamily.RESERVATION:
if (FateReservation.isFateReservation(val.get())) {
reservationSeen = Optional.of(FateReservation.deserialize(val.get()));
}
reservationSeen = Optional.of(FateReservation.deserialize(val.get()));
break;
default:
throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq);
Expand Down Expand Up @@ -231,7 +228,9 @@ public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
// Create a unique FateReservation for this reservation attempt
FateReservation reservation = FateReservation.from(lockID, UUID.randomUUID());

FateMutator.Status status = newMutator(fateId).putReservedTx(reservation).tryMutate();
// requiring any status prevents creating an entry if the fate id doesn't exist
FateMutator.Status status =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice comment here. Interesting, I suppose the status check was not needed previously because it was assumed the reservation column always existed if the fate operation existed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, before this, putReservedTx only worked if the reservation column had a NOT_RESERVED value, which was only set on creation and unreservation.

newMutator(fateId).requireStatus(TStatus.values()).putReservedTx(reservation).tryMutate();
if (status.equals(FateMutator.Status.ACCEPTED)) {
return Optional.of(new FateTxStoreImpl(fateId, reservation));
} else if (status.equals(FateMutator.Status.UNKNOWN)) {
Expand All @@ -246,10 +245,9 @@ public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
scanner.setRange(getRow(fateId));
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
FateReservation persistedRes = scanner.stream()
.filter(entry -> FateReservation.isFateReservation(entry.getValue().get()))
.map(entry -> FateReservation.deserialize(entry.getValue().get())).findFirst()
.orElse(null);
FateReservation persistedRes =
scanner.stream().map(entry -> FateReservation.deserialize(entry.getValue().get()))
.findFirst().orElse(null);
if (persistedRes != null && persistedRes.equals(reservation)) {
return Optional.of(new FateTxStoreImpl(fateId, reservation));
}
Expand Down Expand Up @@ -318,9 +316,7 @@ protected Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses) {
status = TStatus.valueOf(val.toString());
break;
case TxColumnFamily.RESERVATION:
if (FateReservation.isFateReservation(val.get())) {
reservation = FateReservation.deserialize(val.get());
}
reservation = FateReservation.deserialize(val.get());
break;
default:
throw new IllegalStateException("Unexpected column seen: " + colf + ":" + colq);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.data.InstanceId;
Expand Down Expand Up @@ -92,6 +93,24 @@ public String serialize(String root) {
public String toString() {
return " path = " + path + " node = " + node + " eid = " + Long.toHexString(eid);
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj instanceof LockID) {
LockID other = (LockID) obj;
return this.path.equals(other.path) && this.node.equals(other.node)
&& this.eid == other.eid;
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(path, node, eid);
}
}

// Need to use Collections.unmodifiableList() instead of List.of() or List.copyOf(), because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce
// Verify store1 has the transactions reserved and that they were reserved with lock1
reservations = store1.getActiveReservations();
assertEquals(allIds, reservations.keySet());
reservations.values().forEach(
res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID())));
reservations.values().forEach(res -> assertEquals(lock1, res.getLockID()));

if (isUserStore) {
store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld);
Expand All @@ -434,8 +433,7 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce
// store1
reservations = store2.getActiveReservations();
assertEquals(allIds, reservations.keySet());
reservations.values().forEach(
res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1, res.getLockID())));
reservations.values().forEach(res -> assertEquals(lock1, res.getLockID()));

// Simulate what would happen if the Manager using the Fate object (fate1) died.
// isLockHeld would return false for the LockId of the Manager that died (in this case, lock1)
Expand All @@ -455,8 +453,8 @@ private void testDeadReservationsCleanup(FateInstanceType storeType) throws Exce
// the workers for fate1 are hung up
Wait.waitFor(() -> {
Map<FateId,FateStore.FateReservation> store2Reservations = store2.getActiveReservations();
boolean allReservedWithLock2 = store2Reservations.values().stream()
.allMatch(entry -> FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2));
boolean allReservedWithLock2 =
store2Reservations.values().stream().allMatch(entry -> entry.getLockID().equals(lock2));
return store2Reservations.keySet().equals(allIds) && allReservedWithLock2;
}, fate1.getDeadResCleanupDelay().toMillis() * 2);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,29 +181,14 @@ public void testReservations() throws Exception {
ClientContext context = (ClientContext) client;

FateId fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
ZooUtil.LockID lockID = new ZooUtil.LockID("/locks", "L1", 50);
FateStore.FateReservation reservation =
FateStore.FateReservation.from(lockID, UUID.randomUUID());
FateStore.FateReservation wrongReservation =
FateStore.FateReservation.from(lockID, UUID.randomUUID());

// Ensure that we cannot do anything in the column until it is initialized
FateMutator.Status status =
new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate();
assertEquals(REJECTED, status);
status =
new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate();
assertEquals(REJECTED, status);

// Initialize the column and ensure we can't do it twice
status = new FateMutatorImpl<>(context, table, fateId).putInitReservationVal().tryMutate();
assertEquals(ACCEPTED, status);
status = new FateMutatorImpl<>(context, table, fateId).putInitReservationVal().tryMutate();
assertEquals(REJECTED, status);

// Ensure that reserving is the only thing we can do
status =
FateMutator.Status status =
new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate();
assertEquals(REJECTED, status);
status = new FateMutatorImpl<>(context, table, fateId).putReservedTx(reservation).tryMutate();
Expand All @@ -226,20 +211,6 @@ public void testReservations() throws Exception {
status =
new FateMutatorImpl<>(context, table, fateId).putUnreserveTx(reservation).tryMutate();
assertEquals(REJECTED, status);

// Verify putReservedTxOnCreation works as expected
status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation)
.tryMutate();
assertEquals(ACCEPTED, status);
status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation)
.tryMutate();
assertEquals(REJECTED, status);
status =
new FateMutatorImpl<>(context, table, fateId1).putUnreserveTx(reservation).tryMutate();
assertEquals(ACCEPTED, status);
status = new FateMutatorImpl<>(context, table, fateId1).putReservedTxOnCreation(reservation)
.tryMutate();
assertEquals(REJECTED, status);
}
}

Expand Down