diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 055d721bcbe..702313f5abf 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -40,10 +41,13 @@ import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +import com.google.common.hash.HashCode; +import com.google.common.hash.Hashing; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -53,12 +57,22 @@ public abstract class AbstractFateStore implements FateStore { // Default maximum size of 100,000 transactions before deferral is stopped and // all existing transactions are processed immediately again - protected static final int DEFAULT_MAX_DEFERRED = 100_000; + public static final int DEFAULT_MAX_DEFERRED = 100_000; + + public static final FateIdGenerator DEFAULT_FATE_ID_GENERATOR = new FateIdGenerator() { + @Override + public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey) { + HashCode hashCode = Hashing.murmur3_128().hashBytes(fateKey.getSerialized()); + long tid = hashCode.asLong() & 0x7fffffffffffffffL; + return FateId.from(instanceType, tid); + } + }; protected final Set reserved; protected final Map deferred; private final int maxDeferred; private final AtomicBoolean deferredOverflow = new AtomicBoolean(); + private final FateIdGenerator fateIdGenerator; // This is incremented each time a transaction was unreserved that was non new protected final SignalCount unreservedNonNewCount = new SignalCount(); @@ -67,11 +81,12 @@ public abstract class AbstractFateStore implements FateStore { protected final SignalCount unreservedRunnableCount = new SignalCount(); public AbstractFateStore() { - this(DEFAULT_MAX_DEFERRED); + this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } - public AbstractFateStore(int maxDeferred) { + public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) { this.maxDeferred = maxDeferred; + this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator); this.reserved = new HashSet<>(); this.deferred = new HashMap<>(); } @@ -239,12 +254,105 @@ public int getDeferredCount() { } } + private Optional create(FateKey fateKey) { + FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey); + + try { + create(fateId, fateKey); + } catch (IllegalStateException e) { + Pair> statusAndKey = getStatusAndKey(fateId); + TStatus status = statusAndKey.getFirst(); + Optional tFateKey = statusAndKey.getSecond(); + + // Case 1: Status is NEW so this is unseeded, we can return and allow the calling code + // to reserve/seed as long as the existing key is the same and not different as that would + // mean a collision + if (status == TStatus.NEW) { + Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing from tid %s", + fateId.getTid()); + Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()), + "Collision detected for tid %s", fateId.getTid()); + // Case 2: Status is some other state which means already in progress + // so we can just log and return empty optional + } else { + log.trace("Existing transaction {} already exists for key {} with status {}", fateId, + fateKey, status); + return Optional.empty(); + } + } + + return Optional.of(fateId); + } + + @Override + public Optional> createAndReserve(FateKey fateKey) { + FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey); + final Optional> txStore; + + // First make sure we can reserve in memory the fateId, if not + // we can return an empty Optional as it is reserved and in progress + // This reverses the usual order of creation and then reservation but + // this prevents a race condition by ensuring we can reserve first. + // This will create the FateTxStore before creation but this object + // is not exposed until after creation is finished so there should not + // be any errors. + final Optional> reservedTxStore; + synchronized (this) { + reservedTxStore = tryReserve(fateId); + } + + // If present we were able to reserve so try and create + if (reservedTxStore.isPresent()) { + try { + var fateIdFromCreate = create(fateKey); + if (fateIdFromCreate.isPresent()) { + Preconditions.checkState(fateId.equals(fateIdFromCreate.orElseThrow()), + "Transaction creation returned unexpected %s, expected %s", fateIdFromCreate, fateId); + txStore = reservedTxStore; + } else { + // We already exist in a non-new state then un-reserve and an empty + // Optional will be returned. This is expected to happen when the + // system is busy and operations are not running, and we keep seeding them + synchronized (this) { + reserved.remove(fateId); + } + txStore = Optional.empty(); + } + } catch (Exception e) { + // Clean up the reservation if the creation failed + // And then throw error + synchronized (this) { + reserved.remove(fateId); + } + if (e instanceof IllegalStateException) { + throw e; + } else { + throw new IllegalStateException(e); + } + } + } else { + // Could not reserve so return empty + log.trace("Another thread currently has transaction {} key {} reserved", fateId, fateKey); + txStore = Optional.empty(); + } + + return txStore; + } + + protected abstract void create(FateId fateId, FateKey fateKey); + + protected abstract Pair> getStatusAndKey(FateId fateId); + protected abstract Stream getTransactions(); protected abstract TStatus _getStatus(FateId fateId); + protected abstract Optional getKey(FateId fateId); + protected abstract FateTxStore newFateTxStore(FateId fateId, boolean isReserved); + protected abstract FateInstanceType getInstanceType(); + protected abstract class AbstractFateTxStoreImpl implements FateTxStore { protected final FateId fateId; protected final boolean isReserved; @@ -337,34 +445,50 @@ public TStatus getStatus() { return status; } + @Override + public Optional getKey() { + verifyReserved(false); + return AbstractFateStore.this.getKey(fateId); + } + + @Override + public Pair> getStatusAndKey() { + verifyReserved(false); + return AbstractFateStore.this.getStatusAndKey(fateId); + } + @Override public FateId getID() { return fateId; } + } - protected byte[] serializeTxInfo(Serializable so) { - if (so instanceof String) { - return ("S " + so).getBytes(UTF_8); - } else { - byte[] sera = serialize(so); - byte[] data = new byte[sera.length + 2]; - System.arraycopy(sera, 0, data, 2, sera.length); - data[0] = 'O'; - data[1] = ' '; - return data; - } + public interface FateIdGenerator { + FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey); + } + + protected byte[] serializeTxInfo(Serializable so) { + if (so instanceof String) { + return ("S " + so).getBytes(UTF_8); + } else { + byte[] sera = serialize(so); + byte[] data = new byte[sera.length + 2]; + System.arraycopy(sera, 0, data, 2, sera.length); + data[0] = 'O'; + data[1] = ' '; + return data; } + } - protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { - if (data[0] == 'O') { - byte[] sera = new byte[data.length - 2]; - System.arraycopy(data, 2, sera, 0, sera.length); - return (Serializable) deserialize(sera); - } else if (data[0] == 'S') { - return new String(data, 2, data.length - 2, UTF_8); - } else { - throw new IllegalStateException("Bad node data " + txInfo); - } + protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) { + if (data[0] == 'O') { + byte[] sera = new byte[data.length - 2]; + System.arraycopy(data, 2, sera, 0, sera.length); + return (Serializable) deserialize(sera); + } else if (data[0] == 'S') { + return new String(data, 2, data.length - 2, UTF_8); + } else { + throw new IllegalStateException("Bad node data " + txInfo); } } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java new file mode 100644 index 00000000000..2b4238d087e --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateKey.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Objects; +import java.util.Optional; + +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.hadoop.io.DataInputBuffer; + +public class FateKey { + + private final FateKeyType type; + private final Optional keyExtent; + private final Optional compactionId; + private final byte[] serialized; + + private FateKey(FateKeyType type, KeyExtent keyExtent) { + this.type = Objects.requireNonNull(type); + this.keyExtent = Optional.of(keyExtent); + this.compactionId = Optional.empty(); + this.serialized = serialize(type, keyExtent); + } + + private FateKey(FateKeyType type, ExternalCompactionId compactionId) { + this.type = Objects.requireNonNull(type); + this.keyExtent = Optional.empty(); + this.compactionId = Optional.of(compactionId); + this.serialized = serialize(type, compactionId); + } + + private FateKey(byte[] serialized) { + try (DataInputBuffer buffer = new DataInputBuffer()) { + buffer.reset(serialized, serialized.length); + this.type = FateKeyType.valueOf(buffer.readUTF()); + this.keyExtent = deserializeKeyExtent(type, buffer); + this.compactionId = deserializeCompactionId(type, buffer); + this.serialized = serialized; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public FateKeyType getType() { + return type; + } + + public Optional getKeyExtent() { + return keyExtent; + } + + public Optional getCompactionId() { + return compactionId; + } + + public byte[] getSerialized() { + return serialized; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FateKey fateKey = (FateKey) o; + return Arrays.equals(serialized, fateKey.serialized); + } + + @Override + public int hashCode() { + return Arrays.hashCode(serialized); + } + + public static FateKey deserialize(byte[] serialized) { + return new FateKey(serialized); + } + + public static FateKey forSplit(KeyExtent extent) { + return new FateKey(FateKeyType.SPLIT, extent); + } + + public static FateKey forCompactionCommit(ExternalCompactionId compactionId) { + return new FateKey(FateKeyType.COMPACTION_COMMIT, compactionId); + } + + public enum FateKeyType { + SPLIT, COMPACTION_COMMIT + } + + private static byte[] serialize(FateKeyType type, KeyExtent ke) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeUTF(type.toString()); + ke.writeTo(dos); + dos.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static byte[] serialize(FateKeyType type, ExternalCompactionId compactionId) { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeUTF(type.toString()); + dos.writeUTF(compactionId.canonical()); + dos.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static Optional deserializeKeyExtent(FateKeyType type, DataInputBuffer buffer) + throws IOException { + switch (type) { + case SPLIT: + return Optional.of(KeyExtent.readFrom(buffer)); + case COMPACTION_COMMIT: + return Optional.empty(); + default: + throw new IllegalStateException("Unexpected FateInstanceType found " + type); + } + } + + private static Optional deserializeCompactionId(FateKeyType type, + DataInputBuffer buffer) throws IOException { + switch (type) { + case SPLIT: + return Optional.empty(); + case COMPACTION_COMMIT: + return Optional.of(ExternalCompactionId.of(buffer.readUTF())); + default: + throw new IllegalStateException("Unexpected FateInstanceType found " + type); + } + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java index d8495906e3a..088e5025229 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -39,6 +39,21 @@ public interface FateStore extends ReadOnlyFateStore { */ FateId create(); + /** + * Creates and reserves a transaction using the given key. If something is already running for the + * given key, then Optional.empty() will be returned. When this returns a non-empty id, it will be + * in the new state. + * + *

+ * In the case where a process dies in the middle of a call to this. If later, another call is + * made with the same key and its in the new state then the FateId for that key will be returned. + *

+ * + * @throws IllegalStateException when there is an unexpected collision. This can occur if two key + * hash to the same FateId or if a random FateId already exists. + */ + Optional> createAndReserve(FateKey fateKey); + /** * An interface that allows read/write access to the data related to a single fate operation. */ diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index 6be4e76506b..deb79413c92 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -21,10 +21,13 @@ import java.io.Serializable; import java.util.EnumSet; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.stream.Stream; +import org.apache.accumulo.core.util.Pair; + /** * Read only access to a Transaction Store. * @@ -87,6 +90,10 @@ interface ReadOnlyFateTxStore { */ TStatus getStatus(); + Optional getKey(); + + Pair> getStatusAndKey(); + /** * Wait for the status of a transaction to change * diff --git a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java index de103c7902b..031a3ece02b 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java @@ -21,8 +21,12 @@ import java.io.Serializable; import java.util.EnumSet; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.util.Pair; + public class WrappedFateTxStore implements FateStore.FateTxStore { protected final FateStore.FateTxStore wrapped; @@ -55,6 +59,16 @@ public FateStore.TStatus getStatus() { return wrapped.getStatus(); } + @Override + public Optional getKey() { + return wrapped.getKey(); + } + + @Override + public Pair> getStatusAndKey() { + return wrapped.getStatusAndKey(); + } + @Override public void setStatus(FateStore.TStatus status) { wrapped.setStatus(status); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 99c47a5624d..d0ef9600546 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -19,20 +19,28 @@ package org.apache.accumulo.core.fate; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.accumulo.core.util.LazySingletons.RANDOM; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.function.Supplier; import java.util.stream.Stream; +import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.core.util.Pair; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -40,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; //TODO use zoocache? - ACCUMULO-1297 @@ -57,12 +66,13 @@ private String getTXPath(FateId fateId) { } public ZooStore(String path, ZooReaderWriter zk) throws KeeperException, InterruptedException { - this(path, zk, DEFAULT_MAX_DEFERRED); + this(path, zk, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } - public ZooStore(String path, ZooReaderWriter zk, int maxDeferred) + @VisibleForTesting + public ZooStore(String path, ZooReaderWriter zk, int maxDeferred, FateIdGenerator fateIdGenerator) throws KeeperException, InterruptedException { - super(maxDeferred); + super(maxDeferred, fateIdGenerator); this.path = path; this.zk = zk; @@ -81,7 +91,7 @@ public FateId create() { // looking at the code for SecureRandom, it appears to be thread safe long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; FateId fateId = FateId.from(fateInstanceType, tid); - zk.putPersistentData(getTXPath(fateId), TStatus.NEW.name().getBytes(UTF_8), + zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW).serialize(), NodeExistsPolicy.FAIL); return fateId; } catch (NodeExistsException nee) { @@ -92,6 +102,22 @@ public FateId create() { } } + @Override + protected void create(FateId fateId, FateKey key) { + try { + zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, key).serialize(), + NodeExistsPolicy.FAIL); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + + @Override + protected Pair> getStatusAndKey(FateId fateId) { + final NodeValue node = getNode(fateId); + return new Pair<>(node.status, node.fateKey); + } + private class FateTxStoreImpl extends AbstractFateTxStoreImpl { private FateTxStoreImpl(FateId fateId, boolean isReserved) { @@ -192,7 +218,7 @@ public void setStatus(TStatus status) { verifyReserved(true); try { - zk.putPersistentData(getTXPath(fateId), status.name().getBytes(UTF_8), + zk.putPersistentData(getTXPath(fateId), new NodeValue(status).serialize(), NodeExistsPolicy.OVERWRITE); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); @@ -228,13 +254,7 @@ public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { public Serializable getTransactionInfo(Fate.TxInfo txInfo) { verifyReserved(false); - try { - return deserializeTxInfo(txInfo, zk.getData(getTXPath(fateId) + "/" + txInfo)); - } catch (NoNodeException nne) { - return null; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } + return ZooStore.this.getTransactionInfo(txInfo, fateId); } @Override @@ -291,12 +311,31 @@ public List> getStack() { } } + private Serializable getTransactionInfo(TxInfo txInfo, FateId fateId) { + try { + return deserializeTxInfo(txInfo, zk.getData(getTXPath(fateId) + "/" + txInfo)); + } catch (NoNodeException nne) { + return null; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + @Override protected TStatus _getStatus(FateId fateId) { + return getNode(fateId).status; + } + + @Override + protected Optional getKey(FateId fateId) { + return getNode(fateId).fateKey; + } + + private NodeValue getNode(FateId fateId) { try { - return TStatus.valueOf(new String(zk.getData(getTXPath(fateId)), UTF_8)); + return new NodeValue(zk.getData(getTXPath(fateId))); } catch (NoNodeException nne) { - return TStatus.UNKNOWN; + return new NodeValue(TStatus.UNKNOWN); } catch (KeeperException | InterruptedException e) { throw new IllegalStateException(e); } @@ -307,6 +346,11 @@ protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { return new FateTxStoreImpl(fateId, isReserved); } + @Override + protected FateInstanceType getInstanceType() { + return fateInstanceType; + } + @Override protected Stream getTransactions() { try { @@ -328,4 +372,54 @@ public TStatus getStatus() { } } + protected static class NodeValue { + final TStatus status; + final Optional fateKey; + + private NodeValue(byte[] serialized) { + try (DataInputBuffer buffer = new DataInputBuffer()) { + buffer.reset(serialized, serialized.length); + this.status = TStatus.valueOf(buffer.readUTF()); + this.fateKey = deserializeFateKey(buffer); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private NodeValue(TStatus status) { + this(status, null); + } + + private NodeValue(TStatus status, FateKey fateKey) { + this.status = Objects.requireNonNull(status); + this.fateKey = Optional.ofNullable(fateKey); + } + + private Optional deserializeFateKey(DataInputBuffer buffer) throws IOException { + int length = buffer.readInt(); + if (length > 0) { + return Optional.of(FateKey.deserialize(buffer.readNBytes(length))); + } + return Optional.empty(); + } + + byte[] serialize() { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + dos.writeUTF(status.name()); + if (fateKey.isPresent()) { + byte[] serialized = fateKey.orElseThrow().getSerialized(); + dos.writeInt(serialized.length); + dos.write(serialized); + } else { + dos.writeInt(0); + } + dos.close(); + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 9ae596bb83e..328560b1506 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -22,6 +22,7 @@ import java.io.Serializable; import java.util.List; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.function.Function; @@ -31,11 +32,14 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; @@ -45,11 +49,13 @@ import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; public class AccumuloStore extends AbstractFateStore { @@ -65,11 +71,13 @@ public class AccumuloStore extends AbstractFateStore { com.google.common.collect.Range.closed(1, maxRepos); public AccumuloStore(ClientContext context, String tableName) { - this(context, tableName, DEFAULT_MAX_DEFERRED); + this(context, tableName, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR); } - public AccumuloStore(ClientContext context, String tableName, int maxDeferred) { - super(maxDeferred); + @VisibleForTesting + public AccumuloStore(ClientContext context, String tableName, int maxDeferred, + FateIdGenerator fateIdGenerator) { + super(maxDeferred, fateIdGenerator); this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); } @@ -112,6 +120,38 @@ public FateId getFateId() { return FateId.from(fateInstanceType, tid); } + @Override + protected void create(FateId fateId, FateKey fateKey) { + final int maxAttempts = 5; + + for (int attempt = 0; attempt < maxAttempts; attempt++) { + + if (attempt >= 1) { + log.debug("Failed to create transaction with fateId {} and fateKey {}, trying again", + fateId, fateKey); + UtilWaitThread.sleep(100); + } + + var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey) + .putCreateTime(System.currentTimeMillis()).tryMutate(); + + switch (status) { + case ACCEPTED: + return; + case UNKNOWN: + continue; + case REJECTED: + throw new IllegalStateException("Attempt to create transaction with fateId " + fateId + + " and fateKey " + fateKey + " was rejected"); + default: + throw new IllegalStateException("Unknown status " + status); + } + } + + throw new IllegalStateException("Failed to create transaction with fateId " + fateId + + " and fateKey " + fateKey + " after " + maxAttempts + " attempts"); + } + @Override protected Stream getTransactions() { try { @@ -143,13 +183,60 @@ protected TStatus _getStatus(FateId fateId) { }); } + @Override + protected Optional getKey(FateId fateId) { + return scanTx(scanner -> { + scanner.setRange(getRow(fateId)); + TxColumnFamily.TX_KEY_COLUMN.fetch(scanner); + return scanner.stream().map(e -> FateKey.deserialize(e.getValue().get())).findFirst(); + }); + } + + @Override + protected Pair> getStatusAndKey(FateId fateId) { + return scanTx(scanner -> { + scanner.setRange(getRow(fateId)); + TxColumnFamily.STATUS_COLUMN.fetch(scanner); + TxColumnFamily.TX_KEY_COLUMN.fetch(scanner); + + TStatus status = null; + FateKey key = null; + + for (Entry entry : scanner) { + final String qual = entry.getKey().getColumnQualifierData().toString(); + switch (qual) { + case TxColumnFamily.STATUS: + status = TStatus.valueOf(entry.getValue().toString()); + break; + case TxColumnFamily.TX_KEY: + key = FateKey.deserialize(entry.getValue().get()); + break; + default: + throw new IllegalStateException("Unexpected column qualifier: " + qual); + } + } + + return new Pair<>(Optional.ofNullable(status).orElse(TStatus.UNKNOWN), + Optional.ofNullable(key)); + }); + } + @Override protected FateTxStore newFateTxStore(FateId fateId, boolean isReserved) { return new FateTxStoreImpl(fateId, isReserved); } + @Override + protected FateInstanceType getInstanceType() { + return fateInstanceType; + } + static Range getRow(FateId fateId) { - return new Range("tx_" + fateId.getHexTid()); + return new Range(getRowId(fateId)); + } + + public static String getRowId(FateId fateId) { + return "tx_" + fateId.getHexTid(); } private FateMutatorImpl newMutator(FateId fateId) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java index 22497006db5..b538a9f665d 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutator.java @@ -19,6 +19,7 @@ package org.apache.accumulo.core.fate.accumulo; import org.apache.accumulo.core.fate.Fate; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; @@ -26,6 +27,8 @@ public interface FateMutator { FateMutator putStatus(TStatus status); + FateMutator putKey(FateKey fateKey); + FateMutator putCreateTime(long ctime); FateMutator putName(byte[] data); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java index c373190487d..342bfc6e83c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/FateMutatorImpl.java @@ -20,6 +20,7 @@ import static org.apache.accumulo.core.fate.AbstractFateStore.serialize; import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.getRow; +import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.getRowId; import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.invertRepo; import java.util.Objects; @@ -37,6 +38,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily; @@ -56,7 +58,7 @@ public FateMutatorImpl(ClientContext context, String tableName, FateId fateId) { this.context = Objects.requireNonNull(context); this.tableName = Objects.requireNonNull(tableName); this.fateId = fateId; - this.mutation = new ConditionalMutation(new Text("tx_" + fateId.getHexTid())); + this.mutation = new ConditionalMutation(new Text(getRowId(fateId))); } @Override @@ -65,6 +67,12 @@ public FateMutator putStatus(TStatus status) { return this; } + @Override + public FateMutator putKey(FateKey fateKey) { + TxColumnFamily.TX_KEY_COLUMN.put(mutation, new Value(fateKey.getSerialized())); + return this; + } + @Override public FateMutator putCreateTime(long ctime) { TxColumnFamily.CREATE_TIME_COLUMN.put(mutation, new Value(Long.toString(ctime))); diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java index 7e4e639a7ce..73664f7a1b4 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/schema/FateSchema.java @@ -30,6 +30,9 @@ public static class TxColumnFamily { public static final String STATUS = "status"; public static final ColumnFQ STATUS_COLUMN = new ColumnFQ(NAME, new Text(STATUS)); + public static final String TX_KEY = "txkey"; + public static final ColumnFQ TX_KEY_COLUMN = new ColumnFQ(NAME, new Text(TX_KEY)); + public static final String CREATE_TIME = "ctime"; public static final ColumnFQ CREATE_TIME_COLUMN = new ColumnFQ(NAME, new Text(CREATE_TIME)); } diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index fe525bf37d3..0879fbaea85 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore; @@ -141,6 +142,22 @@ public int getDeferredCount() { public boolean isDeferredOverflow() { return store.isDeferredOverflow(); } + + @Override + public Optional> createAndReserve(FateKey fateKey) { + Optional> txStore = store.createAndReserve(fateKey); + if (storeLog.isTraceEnabled()) { + if (txStore.isPresent()) { + storeLog.trace("{} created and reserved fate transaction using key : {}", + txStore.orElseThrow().getID(), fateKey); + } else { + storeLog.trace( + "fate transaction was not created using key : {}, existing transaction exists", + fateKey); + } + } + return txStore; + } }; } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 3c81318c54a..18089848df1 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -32,25 +32,35 @@ import java.util.function.Consumer; import java.util.stream.Stream; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.util.Pair; + /** * Transient in memory store for transactions. */ public class TestStore implements FateStore { private long nextId = 1; - private Map statuses = new HashMap<>(); - private Set reserved = new HashSet<>(); - + private final Map>> statuses = new HashMap<>(); + private final Map> txInfos = new HashMap<>(); + private final Set reserved = new HashSet<>(); private static final FateInstanceType fateInstanceType = FateInstanceType.USER; - private Map> txInfos = new HashMap<>(); @Override public FateId create() { FateId fateId = FateId.from(fateInstanceType, nextId++); - statuses.put(fateId, TStatus.NEW); + statuses.put(fateId, new Pair<>(TStatus.NEW, Optional.empty())); return fateId; } + @Override + public Optional> createAndReserve(FateKey key) { + throw new UnsupportedOperationException(); + } + @Override public FateTxStore reserve(FateId fateId) { if (reserved.contains(fateId)) { @@ -92,14 +102,25 @@ public List> getStack() { @Override public TStatus getStatus() { + return getStatusAndKey().getFirst(); + } + + @Override + public Optional getKey() { + return getStatusAndKey().getSecond(); + } + + @Override + public Pair> getStatusAndKey() { if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - TStatus status = statuses.get(fateId); + Pair> status = statuses.get(fateId); if (status == null) { - return TStatus.UNKNOWN; + return new Pair<>(TStatus.UNKNOWN, Optional.empty()); } + return status; } @@ -143,10 +164,11 @@ public void setStatus(TStatus status) { if (!reserved.contains(fateId)) { throw new IllegalStateException(); } - if (!statuses.containsKey(fateId)) { + Pair> currentStatus = statuses.get(fateId); + if (currentStatus == null) { throw new IllegalStateException(); } - statuses.put(fateId, status); + statuses.put(fateId, new Pair<>(status, currentStatus.getSecond())); } @Override @@ -189,7 +211,7 @@ public FateId getFateId() { @Override public TStatus getStatus() { - return e.getValue(); + return e.getValue().getFirst(); } }); } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index 79cbdae89d4..a373a58c730 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.conf.ConfigurationCopy; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateStore; @@ -43,23 +44,20 @@ import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; import org.apache.accumulo.test.util.Wait; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class FateIT extends SharedMiniClusterBase implements FateTestRunner { +public abstract class FateIT extends SharedMiniClusterBase implements FateTestRunner { private static final Logger LOG = LoggerFactory.getLogger(FateIT.class); private static CountDownLatch callStarted; private static CountDownLatch finishCall; - public static class TestEnv { - - } - public static class TestRepo implements Repo { private static final long serialVersionUID = 1L; @@ -289,7 +287,7 @@ public void testDeferredOverflow() throws Exception { // Set a maximum deferred map size of 10 transactions so that when the 11th // is seen the Fate store should clear the deferred map and mark // the flag as overflow so that all the deferred transactions will be run - executeTest(this::testDeferredOverflow, 10); + executeTest(this::testDeferredOverflow, 10, AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); } protected void testDeferredOverflow(FateStore store, ServerContext sctx) diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java index b87702df910..244f7991116 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateTestRunner.java @@ -18,20 +18,27 @@ */ package org.apache.accumulo.test.fate; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.test.fate.FateIT.TestEnv; +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; -public interface FateTestRunner { +public interface FateTestRunner { - void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception; + void executeTest(FateTestExecutor testMethod, int maxDeferred, FateIdGenerator fateIdGenerator) + throws Exception; - default void executeTest(FateTestExecutor testMethod) throws Exception { - executeTest(testMethod, 100_000); + default void executeTest(FateTestExecutor testMethod) throws Exception { + executeTest(testMethod, AbstractFateStore.DEFAULT_MAX_DEFERRED, + AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); } - interface FateTestExecutor { - void execute(FateStore store, ServerContext sctx) throws Exception; + interface FateTestExecutor { + void execute(FateStore store, ServerContext sctx) throws Exception; } + class TestEnv { + + } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java index 0da99ada403..3091f8c88fd 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloFateIT.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.accumulo.AccumuloStore; @@ -51,12 +52,13 @@ public static void teardown() { } @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception { + public void executeTest(FateTestExecutor testMethod, int maxDeferred, + FateIdGenerator fateIdGenerator) throws Exception { table = getUniqueNames(1)[0]; try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { client.tableOperations().create(table); - testMethod.execute(new AccumuloStore<>(client, table, maxDeferred), + testMethod.execute(new AccumuloStore<>(client, table, maxDeferred, fateIdGenerator), getCluster().getServerContext()); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreFateIT.java index 7ef3e575deb..da0b62e14d8 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreFateIT.java @@ -18,10 +18,20 @@ */ package org.apache.accumulo.test.fate.accumulo; +import static org.apache.accumulo.core.fate.accumulo.AccumuloStore.getRowId; + import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.accumulo.AccumuloStore; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.server.ServerContext; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -38,13 +48,26 @@ public static void teardown() { } @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception { - String table = getUniqueNames(1)[0]; + public void executeTest(FateTestExecutor testMethod, int maxDeferred, + FateIdGenerator fateIdGenerator) throws Exception { + String table = getUniqueNames(1)[0] + "fatestore"; try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { client.tableOperations().create(table); - testMethod.execute(new AccumuloStore<>(client, table, maxDeferred), + testMethod.execute(new AccumuloStore<>(client, table, maxDeferred, fateIdGenerator), getCluster().getServerContext()); } } + + @Override + protected void deleteKey(FateId fateId, ServerContext context) { + String table = getUniqueNames(1)[0] + "fatestore"; + try (BatchWriter bw = context.createBatchWriter(table)) { + Mutation mut = new Mutation(getRowId(fateId)); + TxColumnFamily.TX_KEY_COLUMN.putDelete(mut); + bw.addMutation(mut); + } catch (TableNotFoundException | MutationsRejectedException e) { + throw new IllegalStateException(e); + } + } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java index 3c41fd0705f..181a21b9c64 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java @@ -21,35 +21,60 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.lang.reflect.Method; import java.time.Duration; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.AbstractFateStore; import org.apache.accumulo.core.fate.Fate.TxInfo; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.ReadOnlyRepo; import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.test.fate.FateIT.TestEnv; import org.apache.accumulo.test.fate.FateIT.TestRepo; import org.apache.accumulo.test.fate.FateTestRunner; +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv; import org.apache.accumulo.test.util.Wait; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.Test; -public abstract class FateStoreIT extends SharedMiniClusterBase implements FateTestRunner { +import com.google.common.base.Throwables; + +public abstract class FateStoreIT extends SharedMiniClusterBase implements FateTestRunner { + + private static final Method fsCreateByKeyMethod; + + static { + try { + // Private method, need to capture for testing + fsCreateByKeyMethod = AbstractFateStore.class.getDeclaredMethod("create", FateKey.class); + fsCreateByKeyMethod.setAccessible(true); + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } @Override protected Duration defaultTimeout() { @@ -70,6 +95,7 @@ protected void testReadWrite(FateStore store, ServerContext sctx) FateId fateId = store.create(); FateTxStore txStore = store.reserve(fateId); assertTrue(txStore.timeCreated() > 0); + assertFalse(txStore.getKey().isPresent()); assertEquals(1, store.list().count()); // Push a test FATE op and verify we can read it back @@ -141,7 +167,7 @@ protected void testReadWriteTxInfo(FateStore store, ServerContext sctx) @Test public void testDeferredOverflow() throws Exception { - executeTest(this::testDeferredOverflow, 10); + executeTest(this::testDeferredOverflow, 10, AbstractFateStore.DEFAULT_FATE_ID_GENERATOR); } protected void testDeferredOverflow(FateStore store, ServerContext sctx) @@ -217,10 +243,191 @@ protected void testDeferredOverflow(FateStore store, ServerContext sctx } finally { executor.shutdownNow(); // Cleanup so we don't interfere with other tests - store.list().forEach(fateIdStatus -> store.reserve(fateIdStatus.getFateId()).delete()); + // All stores should already be unreserved + store.list().forEach( + fateIdStatus -> store.tryReserve(fateIdStatus.getFateId()).orElseThrow().delete()); + } + } + + @Test + public void testCreateWithKey() throws Exception { + executeTest(this::testCreateWithKey); + } + + protected void testCreateWithKey(FateStore store, ServerContext sctx) { + KeyExtent ke1 = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + + long existing = store.list().count(); + FateKey fateKey1 = FateKey.forSplit(ke1); + FateKey fateKey2 = + FateKey.forCompactionCommit(ExternalCompactionId.generate(UUID.randomUUID())); + + FateTxStore txStore1 = store.createAndReserve(fateKey1).orElseThrow(); + FateTxStore txStore2 = store.createAndReserve(fateKey2).orElseThrow(); + + assertNotEquals(txStore1.getID(), txStore2.getID()); + + try { + assertTrue(txStore1.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore1.getStatus()); + assertEquals(fateKey1, txStore1.getKey().orElseThrow()); + + assertTrue(txStore2.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore2.getStatus()); + assertEquals(fateKey2, txStore2.getKey().orElseThrow()); + + assertEquals(existing + 2, store.list().count()); + } finally { + txStore1.delete(); + txStore2.delete(); + txStore1.unreserve(0, TimeUnit.SECONDS); + txStore2.unreserve(0, TimeUnit.SECONDS); + } + } + + @Test + public void testCreateWithKeyDuplicate() throws Exception { + executeTest(this::testCreateWithKeyDuplicate); + } + + protected void testCreateWithKeyDuplicate(FateStore store, ServerContext sctx) { + KeyExtent ke = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + + // Creating with the same key should be fine if the status is NEW + // A second call to createAndReserve() should just return an empty optional + // since it's already in reserved and in progress + FateKey fateKey = FateKey.forSplit(ke); + FateTxStore txStore = store.createAndReserve(fateKey).orElseThrow(); + + // second call is empty + assertTrue(store.createAndReserve(fateKey).isEmpty()); + + try { + assertTrue(txStore.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore.getStatus()); + assertEquals(fateKey, txStore.getKey().orElseThrow()); + assertEquals(1, store.list().count()); + } finally { + txStore.delete(); + txStore.unreserve(0, TimeUnit.SECONDS); } } + @Test + public void testCreateWithKeyInProgress() throws Exception { + executeTest(this::testCreateWithKeyInProgress); + } + + protected void testCreateWithKeyInProgress(FateStore store, ServerContext sctx) + throws Exception { + KeyExtent ke = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + FateKey fateKey = FateKey.forSplit(ke); + + FateTxStore txStore = store.createAndReserve(fateKey).orElseThrow(); + ; + try { + assertTrue(txStore.timeCreated() > 0); + txStore.setStatus(TStatus.IN_PROGRESS); + + // We have an existing transaction with the same key in progress + // so should return an empty Optional + assertTrue(create(store, fateKey).isEmpty()); + assertEquals(TStatus.IN_PROGRESS, txStore.getStatus()); + } finally { + txStore.delete(); + txStore.unreserve(0, TimeUnit.SECONDS); + } + + try { + // After deletion, make sure we can create again with the same key + txStore = store.createAndReserve(fateKey).orElseThrow(); + assertTrue(txStore.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore.getStatus()); + } finally { + txStore.delete(); + txStore.unreserve(0, TimeUnit.SECONDS); + } + + } + + @Test + public void testCreateWithKeyCollision() throws Exception { + // Replace the default hasing algorithm with one that always returns the same tid so + // we can check duplicate detection with different keys + executeTest(this::testCreateWithKeyCollision, AbstractFateStore.DEFAULT_MAX_DEFERRED, + (instanceType, fateKey) -> FateId.from(instanceType, 1000)); + } + + protected void testCreateWithKeyCollision(FateStore store, ServerContext sctx) { + String[] tables = getUniqueNames(2); + KeyExtent ke1 = new KeyExtent(TableId.of(tables[0]), new Text("zzz"), new Text("aaa")); + KeyExtent ke2 = new KeyExtent(TableId.of(tables[1]), new Text("ddd"), new Text("bbb")); + + FateKey fateKey1 = FateKey.forSplit(ke1); + FateKey fateKey2 = FateKey.forSplit(ke2); + + FateTxStore txStore = store.createAndReserve(fateKey1).orElseThrow(); + try { + var e = assertThrows(IllegalStateException.class, () -> create(store, fateKey2)); + assertEquals("Collision detected for tid 1000", e.getMessage()); + assertEquals(fateKey1, txStore.getKey().orElseThrow()); + } finally { + txStore.delete(); + txStore.unreserve(0, TimeUnit.SECONDS); + } + + } + + @Test + public void testCollisionWithRandomFateId() throws Exception { + executeTest(this::testCollisionWithRandomFateId); + } + + protected void testCollisionWithRandomFateId(FateStore store, ServerContext sctx) + throws Exception { + KeyExtent ke = + new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new Text("aaa")); + + FateKey fateKey = FateKey.forSplit(ke); + FateId fateId = create(store, fateKey).orElseThrow(); + + // After create a fate transaction using a key we can simulate a collision with + // a random FateId by deleting the key out of Fate and calling create again to verify + // it detects the key is missing. Then we can continue and see if we can still reserve + // and use the existing transaction, which we should. + deleteKey(fateId, sctx); + var e = assertThrows(IllegalStateException.class, () -> store.createAndReserve(fateKey)); + assertEquals("Tx Key is missing from tid " + fateId.getTid(), e.getMessage()); + + // We should still be able to reserve and continue when not using a key + // just like a normal transaction + FateTxStore txStore = store.reserve(fateId); + try { + assertTrue(txStore.timeCreated() > 0); + assertEquals(TStatus.NEW, txStore.getStatus()); + } finally { + txStore.delete(); + txStore.unreserve(0, TimeUnit.SECONDS); + } + + } + + // create(fateKey) method is private so expose for testing to check error states + @SuppressWarnings("unchecked") + protected Optional create(FateStore store, FateKey fateKey) throws Exception { + try { + return (Optional) fsCreateByKeyMethod.invoke(store, fateKey); + } catch (Exception e) { + Exception rootCause = (Exception) Throwables.getRootCause(e); + throw rootCause; + } + } + + protected abstract void deleteKey(FateId fateId, ServerContext sctx); + private static class TestOperation2 extends TestRepo { private static final long serialVersionUID = 1L; diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooStoreFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooStoreFateIT.java index 04530e317fa..c679e0d54f5 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooStoreFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZooStoreFateIT.java @@ -24,11 +24,18 @@ import static org.easymock.EasyMock.replay; import java.io.File; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.UUID; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.accumulo.FateStoreIT; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; @@ -61,12 +68,48 @@ public static void teardown() throws Exception { } @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception { + public void executeTest(FateTestExecutor testMethod, int maxDeferred, + FateIdGenerator fateIdGenerator) throws Exception { ServerContext sctx = createMock(ServerContext.class); expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(sctx); - testMethod.execute(new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred), sctx); + testMethod.execute(new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred, fateIdGenerator), + sctx); + } + + @Override + protected void deleteKey(FateId fateId, ServerContext sctx) { + try { + // We have to use reflection since the NodeValue is internal to the store + + // Grab both the constructor that uses the serialized bytes and status + Class nodeClass = Class.forName(ZooStore.class.getName() + "$NodeValue"); + Constructor statusCons = nodeClass.getDeclaredConstructor(TStatus.class); + Constructor serializedCons = nodeClass.getDeclaredConstructor(byte[].class); + statusCons.setAccessible(true); + serializedCons.setAccessible(true); + + // Get the status field so it can be read and the serialize method + Field nodeStatus = nodeClass.getDeclaredField("status"); + Method nodeSerialize = nodeClass.getDeclaredMethod("serialize"); + nodeStatus.setAccessible(true); + nodeSerialize.setAccessible(true); + + // Get the existing status for the node and build a new node with an empty key + // but uses the existing tid + String txPath = ZK_ROOT + Constants.ZFATE + "/tx_" + fateId.getHexTid(); + Object currentNode = serializedCons.newInstance(new Object[] {zk.getData(txPath)}); + TStatus currentStatus = (TStatus) nodeStatus.get(currentNode); + // replace the node with no key and just a tid and existing status + Object newNode = statusCons.newInstance(currentStatus); + + // Replace the transaction with the same status and no key + zk.putPersistentData(txPath, (byte[]) nodeSerialize.invoke(newNode), + NodeExistsPolicy.OVERWRITE); + } catch (Exception e) { + throw new IllegalStateException(e); + } } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java index bd58df57009..c8688a89519 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/ZookeeperFateIT.java @@ -18,16 +18,18 @@ */ package org.apache.accumulo.test.fate.zookeeper; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.UUID; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.ZooStore; @@ -35,6 +37,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.test.fate.FateIT; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -65,13 +68,15 @@ public static void teardown() throws Exception { } @Override - public void executeTest(FateTestExecutor testMethod, int maxDeferred) throws Exception { + public void executeTest(FateTestExecutor testMethod, int maxDeferred, + FateIdGenerator fateIdGenerator) throws Exception { ServerContext sctx = createMock(ServerContext.class); expect(sctx.getZooKeeperRoot()).andReturn(ZK_ROOT).anyTimes(); expect(sctx.getZooReaderWriter()).andReturn(zk).anyTimes(); replay(sctx); - testMethod.execute(new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred), sctx); + testMethod.execute(new ZooStore<>(ZK_ROOT + Constants.ZFATE, zk, maxDeferred, fateIdGenerator), + sctx); } @Override @@ -91,11 +96,17 @@ private static TStatus getTxStatus(ZooReaderWriter zrw, FateId fateId) throws KeeperException, InterruptedException { zrw.sync(ZK_ROOT); String txdir = String.format("%s%s/tx_%s", ZK_ROOT, Constants.ZFATE, fateId.getHexTid()); - try { - return TStatus.valueOf(new String(zrw.getData(txdir), UTF_8)); + + try (DataInputBuffer buffer = new DataInputBuffer()) { + var serialized = zrw.getData(txdir); + buffer.reset(serialized, serialized.length); + return TStatus.valueOf(buffer.readUTF()); + } catch (IOException e) { + throw new UncheckedIOException(e); } catch (KeeperException.NoNodeException e) { return TStatus.UNKNOWN; } + } }