Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 148 additions & 24 deletions core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,10 +41,13 @@
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

Expand All @@ -53,12 +57,22 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {

// Default maximum size of 100,000 transactions before deferral is stopped and
// all existing transactions are processed immediately again
protected static final int DEFAULT_MAX_DEFERRED = 100_000;
public static final int DEFAULT_MAX_DEFERRED = 100_000;

public static final FateIdGenerator DEFAULT_FATE_ID_GENERATOR = new FateIdGenerator() {
@Override
public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) {
HashCode hashCode = Hashing.murmur3_128().hashBytes(fateKey.getSerialized());
long tid = hashCode.asLong() & 0x7fffffffffffffffL;
return FateId.from(instanceType, tid);
}
};

protected final Set<FateId> reserved;
protected final Map<FateId,Long> deferred;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
private final FateIdGenerator fateIdGenerator;

// This is incremented each time a transaction was unreserved that was non new
protected final SignalCount unreservedNonNewCount = new SignalCount();
Expand All @@ -67,11 +81,12 @@ public abstract class AbstractFateStore<T> implements FateStore<T> {
protected final SignalCount unreservedRunnableCount = new SignalCount();

public AbstractFateStore() {
this(DEFAULT_MAX_DEFERRED);
this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
}

public AbstractFateStore(int maxDeferred) {
public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
this.maxDeferred = maxDeferred;
this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
this.reserved = new HashSet<>();
this.deferred = new HashMap<>();
}
Expand Down Expand Up @@ -239,12 +254,105 @@ public int getDeferredCount() {
}
}

private Optional<FateId> create(FateKey fateKey) {
FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);

try {
create(fateId, fateKey);
} catch (IllegalStateException e) {
Pair<TStatus,Optional<FateKey>> statusAndKey = getStatusAndKey(fateId);
TStatus status = statusAndKey.getFirst();
Optional<FateKey> tFateKey = statusAndKey.getSecond();

// Case 1: Status is NEW so this is unseeded, we can return and allow the calling code
// to reserve/seed as long as the existing key is the same and not different as that would
// mean a collision
if (status == TStatus.NEW) {
Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing from tid %s",
fateId.getTid());
Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()),
"Collision detected for tid %s", fateId.getTid());
// Case 2: Status is some other state which means already in progress
// so we can just log and return empty optional
} else {
log.trace("Existing transaction {} already exists for key {} with status {}", fateId,
fateKey, status);
return Optional.empty();
}
}

return Optional.of(fateId);
}

@Override
public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);
final Optional<FateTxStore<T>> txStore;

// First make sure we can reserve in memory the fateId, if not
// we can return an empty Optional as it is reserved and in progress
// This reverses the usual order of creation and then reservation but
// this prevents a race condition by ensuring we can reserve first.
// This will create the FateTxStore before creation but this object
// is not exposed until after creation is finished so there should not
// be any errors.
final Optional<FateTxStore<T>> reservedTxStore;
synchronized (this) {
reservedTxStore = tryReserve(fateId);
}

// If present we were able to reserve so try and create
if (reservedTxStore.isPresent()) {
try {
var fateIdFromCreate = create(fateKey);
if (fateIdFromCreate.isPresent()) {
Preconditions.checkState(fateId.equals(fateIdFromCreate.orElseThrow()),
"Transaction creation returned unexpected %s, expected %s", fateIdFromCreate, fateId);
txStore = reservedTxStore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could modify the code to keep the return value of create and validate it.

Suggested change
txStore = reservedTxStore;
Precondition.checkState(fateId.equals(fateIdFromCreate));
txStore = reservedTxStore;

} else {
// We already exist in a non-new state then un-reserve and an empty
// Optional will be returned. This is expected to happen when the
// system is busy and operations are not running, and we keep seeding them
synchronized (this) {
reserved.remove(fateId);
}
txStore = Optional.empty();
}
} catch (Exception e) {
// Clean up the reservation if the creation failed
// And then throw error
synchronized (this) {
reserved.remove(fateId);
}
if (e instanceof IllegalStateException) {
throw e;
} else {
throw new IllegalStateException(e);
}
}
} else {
// Could not reserve so return empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created the following after seeing another log stmt in this PR. Thought it would be good to log something here like it does elsewhere. These trace stmts will be nice for tracking down bugs

Suggested change
// Could not reserve so return empty
// Could not reserve so return empty
log.trace("Another thread currently has transaction {} key {} reserved", fateId,
fateKey);

log.trace("Another thread currently has transaction {} key {} reserved", fateId, fateKey);
txStore = Optional.empty();
}

return txStore;
}

protected abstract void create(FateId fateId, FateKey fateKey);

protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId);

protected abstract Stream<FateIdStatus> getTransactions();

protected abstract TStatus _getStatus(FateId fateId);

protected abstract Optional<FateKey> getKey(FateId fateId);

protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean isReserved);

protected abstract FateInstanceType getInstanceType();

protected abstract class AbstractFateTxStoreImpl<T> implements FateTxStore<T> {
protected final FateId fateId;
protected final boolean isReserved;
Expand Down Expand Up @@ -337,34 +445,50 @@ public TStatus getStatus() {
return status;
}

@Override
public Optional<FateKey> getKey() {
verifyReserved(false);
return AbstractFateStore.this.getKey(fateId);
}

@Override
public Pair<TStatus,Optional<FateKey>> getStatusAndKey() {
verifyReserved(false);
return AbstractFateStore.this.getStatusAndKey(fateId);
}

@Override
public FateId getID() {
return fateId;
}
}

protected byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
public interface FateIdGenerator {
FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey);
}

protected byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
}

protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
}
}
Loading