Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,15 @@
import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
import org.apache.accumulo.core.dataImpl.thrift.TSummarizerConfiguration;
import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.FateOperation;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
import org.apache.accumulo.core.manager.thrift.TFateId;
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.TServerInstance;
Expand Down Expand Up @@ -277,12 +280,13 @@ public void create(String tableName, NewTableConfiguration ntc)
}
}

private long beginFateOperation() throws ThriftSecurityException, TException {
private TFateId beginFateOperation(TFateInstanceType type)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was really surprised by how minimal the changes were to this class.

throws ThriftSecurityException, TException {
while (true) {
FateService.Client client = null;
try {
client = ThriftClientTypes.FATE.getConnectionWithRetry(context);
return client.beginFateOperation(TraceUtil.traceInfo(), context.rpcCreds());
return client.beginFateOperation(TraceUtil.traceInfo(), context.rpcCreds(), type);
} catch (TTransportException tte) {
log.debug("Failed to call beginFateOperation(), retrying ... ", tte);
sleepUninterruptibly(100, MILLISECONDS);
Expand All @@ -298,7 +302,7 @@ private long beginFateOperation() throws ThriftSecurityException, TException {

// This method is for retrying in the case of network failures;
// anything else it passes to the caller to deal with
private void executeFateOperation(long opid, FateOperation op, List<ByteBuffer> args,
private void executeFateOperation(TFateId opid, FateOperation op, List<ByteBuffer> args,
Map<String,String> opts, boolean autoCleanUp)
throws ThriftSecurityException, TException, ThriftTableOperationException {
while (true) {
Expand All @@ -321,7 +325,7 @@ private void executeFateOperation(long opid, FateOperation op, List<ByteBuffer>
}
}

private String waitForFateOperation(long opid)
private String waitForFateOperation(TFateId opid)
throws ThriftSecurityException, TException, ThriftTableOperationException {
while (true) {
FateService.Client client = null;
Expand All @@ -341,7 +345,7 @@ private String waitForFateOperation(long opid)
}
}

private void finishFateOperation(long opid) throws ThriftSecurityException, TException {
private void finishFateOperation(TFateId opid) throws ThriftSecurityException, TException {
while (true) {
FateService.Client client = null;
try {
Expand Down Expand Up @@ -387,10 +391,12 @@ String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,Strin
String tableOrNamespaceName, boolean wait)
throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
Long opid = null;
TFateId opid = null;

try {
opid = beginFateOperation();
TFateInstanceType t =
FateInstanceType.fromNamespaceOrTableName(tableOrNamespaceName).toThrift();
opid = beginFateOperation(t);
executeFateOperation(opid, op, args, opts, !wait);
if (!wait) {
opid = null;
Expand Down
110 changes: 68 additions & 42 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,19 @@ public AdminUtil(boolean exitOnError) {
public static class TransactionStatus {

private final long txid;
private final FateInstanceType instanceType;
private final TStatus status;
private final String txName;
private final List<String> hlocks;
private final List<String> wlocks;
private final String top;
private final long timeCreated;

private TransactionStatus(Long tid, TStatus status, String txName, List<String> hlocks,
List<String> wlocks, String top, Long timeCreated) {
private TransactionStatus(Long tid, FateInstanceType instanceType, TStatus status,
String txName, List<String> hlocks, List<String> wlocks, String top, Long timeCreated) {

this.txid = tid;
this.instanceType = instanceType;
this.status = status;
this.txName = txName;
this.hlocks = Collections.unmodifiableList(hlocks);
Expand All @@ -103,6 +105,10 @@ public String getTxid() {
return FastFormat.toHexString(txid);
}

public FateInstanceType getInstanceType() {
return instanceType;
}

public TStatus getStatus() {
return status;
}
Expand Down Expand Up @@ -216,15 +222,16 @@ public Map<String,List<String>> getDanglingWaitingLocks() {
* method does not process lock information, if lock information is desired, use
* {@link #getStatus(ReadOnlyFateStore, ZooReader, ServiceLockPath, Set, EnumSet)}
*
* @param zs read-only zoostore
* @param fateStores read-only fate stores
* @param filterTxid filter results to include for provided transaction ids.
* @param filterStatus filter results to include only provided status types
* @return list of FATE transactions that match filter criteria
*/
public List<TransactionStatus> getTransactionStatus(ReadOnlyFateStore<T> zs, Set<Long> filterTxid,
public List<TransactionStatus> getTransactionStatus(
Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus) {

FateStatus status = getTransactionStatus(zs, filterTxid, filterStatus,
FateStatus status = getTransactionStatus(fateStores, filterTxid, filterStatus,
Collections.<Long,List<String>>emptyMap(), Collections.<Long,List<String>>emptyMap());

return status.getTransactions();
Expand All @@ -251,7 +258,26 @@ public FateStatus getStatus(ReadOnlyFateStore<T> zs, ZooReader zk,

findLocks(zk, lockPath, heldLocks, waitingLocks);

return getTransactionStatus(zs, filterTxid, filterStatus, heldLocks, waitingLocks);
return getTransactionStatus(Map.of(FateInstanceType.META, zs), filterTxid, filterStatus,
heldLocks, waitingLocks);
}

public FateStatus getStatus(ReadOnlyFateStore<T> as, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus) throws KeeperException, InterruptedException {

return getTransactionStatus(Map.of(FateInstanceType.USER, as), filterTxid, filterStatus,
new HashMap<>(), new HashMap<>());
}

public FateStatus getStatus(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, ZooReader zk,
ServiceLock.ServiceLockPath lockPath, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
throws KeeperException, InterruptedException {
Map<Long,List<String>> heldLocks = new HashMap<>();
Map<Long,List<String>> waitingLocks = new HashMap<>();

findLocks(zk, lockPath, heldLocks, waitingLocks);

return getTransactionStatus(fateStores, filterTxid, filterStatus, heldLocks, waitingLocks);
}

/**
Expand Down Expand Up @@ -327,7 +353,7 @@ private void findLocks(ZooReader zk, final ServiceLock.ServiceLockPath lockPath,
/**
* Returns fate status, possibly filtered
*
* @param zs read-only access to a populated transaction store.
* @param fateStores read-only access to populated transaction stores.
* @param filterTxid Optional. List of transactions to filter results - if null, all transactions
* are returned
* @param filterStatus Optional. List of status types to filter results - if null, all
Expand All @@ -336,49 +362,49 @@ private void findLocks(ZooReader zk, final ServiceLock.ServiceLockPath lockPath,
* @param waitingLocks populated list of locks held by transaction - or an empty map if none.
* @return current fate and lock status
*/
private FateStatus getTransactionStatus(ReadOnlyFateStore<T> zs, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks,
private FateStatus getTransactionStatus(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores,
Set<Long> filterTxid, EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks,
Map<Long,List<String>> waitingLocks) {
final List<TransactionStatus> statuses = new ArrayList<>();

try (Stream<Long> tids = zs.list()) {
List<TransactionStatus> statuses = new ArrayList<>();
fateStores.forEach((type, store) -> {
try (Stream<Long> tids = store.list()) {
tids.forEach(tid -> {

tids.forEach(tid -> {
ReadOnlyFateTxStore<T> txStore = store.read(tid);

ReadOnlyFateTxStore<T> txStore = zs.read(tid);
String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);

String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);
List<String> hlocks = heldLocks.remove(tid);

List<String> hlocks = heldLocks.remove(tid);

if (hlocks == null) {
hlocks = Collections.emptyList();
}

List<String> wlocks = waitingLocks.remove(tid);
if (hlocks == null) {
hlocks = Collections.emptyList();
}

if (wlocks == null) {
wlocks = Collections.emptyList();
}
List<String> wlocks = waitingLocks.remove(tid);

String top = null;
ReadOnlyRepo<T> repo = txStore.top();
if (repo != null) {
top = repo.getName();
}
if (wlocks == null) {
wlocks = Collections.emptyList();
}

TStatus status = txStore.getStatus();
String top = null;
ReadOnlyRepo<T> repo = txStore.top();
if (repo != null) {
top = repo.getName();
}

long timeCreated = txStore.timeCreated();
TStatus status = txStore.getStatus();

if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses
.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated));
}
});
long timeCreated = txStore.timeCreated();

return new FateStatus(statuses, heldLocks, waitingLocks);
}
if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses.add(
new TransactionStatus(tid, type, status, txName, hlocks, wlocks, top, timeCreated));
}
});
}
});
return new FateStatus(statuses, heldLocks, waitingLocks);
}

private boolean includeByStatus(TStatus status, EnumSet<TStatus> filterStatus) {
Expand All @@ -389,15 +415,15 @@ private boolean includeByTxid(Long tid, Set<Long> filterTxid) {
return (filterTxid == null) || filterTxid.isEmpty() || filterTxid.contains(tid);
}

public void printAll(ReadOnlyFateStore<T> zs, ZooReader zk,
public void printAll(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, ZooReader zk,
ServiceLock.ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException {
print(zs, zk, tableLocksPath, new Formatter(System.out), null, null);
print(fateStores, zk, tableLocksPath, new Formatter(System.out), null, null);
}

public void print(ReadOnlyFateStore<T> zs, ZooReader zk,
public void print(Map<FateInstanceType,ReadOnlyFateStore<T>> fateStores, ZooReader zk,
ServiceLock.ServiceLockPath tableLocksPath, Formatter fmt, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus) throws KeeperException, InterruptedException {
FateStatus fateStatus = getStatus(zs, zk, tableLocksPath, filterTxid, filterStatus);
FateStatus fateStatus = getStatus(fateStores, zk, tableLocksPath, filterTxid, filterStatus);

for (TransactionStatus txStatus : fateStatus.getTransactions()) {
fmt.format(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.fate;

import java.util.Set;

import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
import org.apache.accumulo.core.metadata.FateTable;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;

public enum FateInstanceType {
META, USER;

private static final Set<TableId> META_TABLES =
Set.of(RootTable.ID, MetadataTable.ID, FateTable.ID);

public static FateInstanceType fromNamespaceOrTableName(String tableOrNamespaceName) {
return tableOrNamespaceName.startsWith(Namespace.ACCUMULO.name()) ? FateInstanceType.META
: FateInstanceType.USER;
}

public TFateInstanceType toThrift() {
switch (this) {
case USER:
return TFateInstanceType.USER;
case META:
return TFateInstanceType.META;
default:
throw new IllegalStateException("Unknown FateInstance type " + this);
}
}

public static FateInstanceType fromThrift(TFateInstanceType tfit) {
switch (tfit) {
case USER:
return FateInstanceType.USER;
case META:
return FateInstanceType.META;
default:
throw new IllegalStateException("Unknown type " + tfit);
}
}

public static FateInstanceType fromTableId(TableId tableId) {
return META_TABLES.contains(tableId) ? META : USER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily;
import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily;
import org.apache.accumulo.core.metadata.FateTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
import org.apache.accumulo.core.util.FastFormat;
Expand All @@ -61,6 +62,10 @@ public AccumuloStore(ClientContext context, String tableName) {
this.tableName = Objects.requireNonNull(tableName);
}

public AccumuloStore(ClientContext context) {
this(context, FateTable.NAME);
}

@Override
public long create() {
long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.metadata;

import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.data.TableId;

public class FateTable {
public static final TableId ID = TableId.of("+fate");
public static final String NAME = Namespace.ACCUMULO.name() + ".fate";
}
Loading