Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 67 additions & 62 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.zookeeper.FateLock;
import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
Expand Down Expand Up @@ -210,14 +212,14 @@ public Map<String,List<String>> getDanglingWaitingLocks() {
/**
* Returns a list of the FATE transactions, optionally filtered by transaction id and status. This
* method does not process lock information, if lock information is desired, use
* {@link #getStatus(ReadOnlyTStore, ZooReader, ServiceLockPath, Set, EnumSet)}
* {@link #getStatus(ReadOnlyFateStore, ZooReader, ServiceLockPath, Set, EnumSet)}
*
* @param zs read-only zoostore
* @param filterTxid filter results to include for provided transaction ids.
* @param filterStatus filter results to include only provided status types
* @return list of FATE transactions that match filter criteria
*/
public List<TransactionStatus> getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTxid,
public List<TransactionStatus> getTransactionStatus(ReadOnlyFateStore<T> zs, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus) {

FateStatus status = getTransactionStatus(zs, filterTxid, filterStatus,
Expand All @@ -239,7 +241,7 @@ public List<TransactionStatus> getTransactionStatus(ReadOnlyTStore<T> zs, Set<Lo
* @throws KeeperException if zookeeper exception occurs
* @throws InterruptedException if process is interrupted.
*/
public FateStatus getStatus(ReadOnlyTStore<T> zs, ZooReader zk,
public FateStatus getStatus(ReadOnlyFateStore<T> zs, ZooReader zk,
ServiceLock.ServiceLockPath lockPath, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
throws KeeperException, InterruptedException {
Map<Long,List<String>> heldLocks = new HashMap<>();
Expand Down Expand Up @@ -332,7 +334,7 @@ private void findLocks(ZooReader zk, final ServiceLock.ServiceLockPath lockPath,
* @param waitingLocks populated list of locks held by transaction - or an empty map if none.
* @return current fate and lock status
*/
private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTxid,
private FateStatus getTransactionStatus(ReadOnlyFateStore<T> zs, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks,
Map<Long,List<String>> waitingLocks) {

Expand All @@ -341,9 +343,9 @@ private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTx

for (Long tid : transactions) {

zs.reserve(tid);
ReadOnlyFateTxStore<T> txStore = zs.read(tid);

String txName = (String) zs.getTransactionInfo(tid, Fate.TxInfo.TX_NAME);
String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);

List<String> hlocks = heldLocks.remove(tid);

Expand All @@ -358,16 +360,14 @@ private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTx
}

String top = null;
ReadOnlyRepo<T> repo = zs.top(tid);
ReadOnlyRepo<T> repo = txStore.top();
if (repo != null) {
top = repo.getName();
}

TStatus status = zs.getStatus(tid);
TStatus status = txStore.getStatus();

long timeCreated = zs.timeCreated(tid);

zs.unreserve(tid, 0);
long timeCreated = txStore.timeCreated();

if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated));
Expand All @@ -386,14 +386,14 @@ private boolean includeByTxid(Long tid, Set<Long> filterTxid) {
return (filterTxid == null) || filterTxid.isEmpty() || filterTxid.contains(tid);
}

public void printAll(ReadOnlyTStore<T> zs, ZooReader zk,
public void printAll(ReadOnlyFateStore<T> zs, ZooReader zk,
ServiceLock.ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException {
print(zs, zk, tableLocksPath, new Formatter(System.out), null, null);
}

public void print(ReadOnlyTStore<T> zs, ZooReader zk, ServiceLock.ServiceLockPath tableLocksPath,
Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
throws KeeperException, InterruptedException {
public void print(ReadOnlyFateStore<T> zs, ZooReader zk,
ServiceLock.ServiceLockPath tableLocksPath, Formatter fmt, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus) throws KeeperException, InterruptedException {
FateStatus fateStatus = getStatus(zs, zk, tableLocksPath, filterTxid, filterStatus);

for (TransactionStatus txStatus : fateStatus.getTransactions()) {
Expand All @@ -417,7 +417,7 @@ public void print(ReadOnlyTStore<T> zs, ZooReader zk, ServiceLock.ServiceLockPat
}
}

public boolean prepDelete(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath path,
public boolean prepDelete(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath path,
String txidStr) {
if (!checkGlobalLock(zk, path)) {
return false;
Expand All @@ -431,30 +431,32 @@ public boolean prepDelete(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath path
return false;
}
boolean state = false;
zs.reserve(txid);
TStatus ts = zs.getStatus(txid);
switch (ts) {
case UNKNOWN:
System.out.printf("Invalid transaction ID: %016x%n", txid);
break;

case SUBMITTED:
case IN_PROGRESS:
case NEW:
case FAILED:
case FAILED_IN_PROGRESS:
case SUCCESSFUL:
System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
zs.delete(txid);
state = true;
break;
FateTxStore<T> txStore = zs.reserve(txid);
try {
TStatus ts = txStore.getStatus();
switch (ts) {
case UNKNOWN:
System.out.printf("Invalid transaction ID: %016x%n", txid);
break;

case SUBMITTED:
case IN_PROGRESS:
case NEW:
case FAILED:
case FAILED_IN_PROGRESS:
case SUCCESSFUL:
System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
txStore.delete();
state = true;
break;
}
} finally {
txStore.unreserve(0);
}

zs.unreserve(txid, 0);
return state;
}

public boolean prepFail(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath,
public boolean prepFail(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath,
String txidStr) {
if (!checkGlobalLock(zk, zLockManagerPath)) {
return false;
Expand All @@ -468,33 +470,36 @@ public boolean prepFail(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockM
return false;
}
boolean state = false;
zs.reserve(txid);
TStatus ts = zs.getStatus(txid);
switch (ts) {
case UNKNOWN:
System.out.printf("Invalid transaction ID: %016x%n", txid);
break;

case SUBMITTED:
case IN_PROGRESS:
case NEW:
System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
state = true;
break;

case SUCCESSFUL:
System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts);
break;

case FAILED:
case FAILED_IN_PROGRESS:
System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts);
state = true;
break;
FateTxStore<T> txStore = zs.reserve(txid);
try {
TStatus ts = txStore.getStatus();
switch (ts) {
case UNKNOWN:
System.out.printf("Invalid transaction ID: %016x%n", txid);
break;

case SUBMITTED:
case IN_PROGRESS:
case NEW:
System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
txStore.setStatus(TStatus.FAILED_IN_PROGRESS);
state = true;
break;

case SUCCESSFUL:
System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts);
break;

case FAILED:
case FAILED_IN_PROGRESS:
System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts);
state = true;
break;
}
} finally {
txStore.unreserve(0);
}

zs.unreserve(txid, 0);
return state;
}

Expand Down
Loading