Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -148,11 +150,6 @@ public long create() {
return txid;
}

@Override
public FateTxStore<T> reserve() {
return new AgeOffFateTxStore(store.reserve());
}

@Override
public FateTxStore<T> reserve(long tid) {
return new AgeOffFateTxStore(store.reserve(tid));
Expand Down Expand Up @@ -204,4 +201,9 @@ public ReadOnlyFateTxStore<T> read(long tid) {
public List<Long> list() {
return store.list();
}

@Override
public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
return store.runnable(keepWaiting);
}
}
84 changes: 82 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.accumulo.core.fate;

import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
Expand All @@ -33,9 +34,11 @@
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

Expand All @@ -48,6 +51,7 @@
import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.thrift.TApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -68,25 +72,91 @@ public class Fate<T> {
private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);

private final AtomicBoolean keepRunning = new AtomicBoolean(true);
private final TransferQueue<Long> workQueue;
private final Thread workFinder;

public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, RETURN_VALUE
}

/**
* A single thread that finds transactions to work on and queues them up. Do not want each worker
* thread going to the store and looking for work as it would place more load on the store.
*/
private class WorkFinder implements Runnable {

@Override
public void run() {
while (keepRunning.get()) {
try {
var iter = store.runnable(keepRunning);

while (iter.hasNext() && keepRunning.get()) {
Long txid = iter.next();
try {
while (keepRunning.get()) {
// The reason for calling transfer instead of queueing is avoid rescanning the
// storage layer and adding the same thing over and over. For example if all threads
// were busy, the queue size was 100, and there are three runnable things in the
// store. Do not want to keep scanning the store adding those same 3 runnable things
// until the queue is full.
if (workQueue.tryTransfer(txid, 100, MILLISECONDS)) {
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}
} catch (Exception e) {
if (keepRunning.get()) {
log.warn("Failure while attempting to find work for fate", e);
} else {
log.debug("Failure while attempting to find work for fate", e);
}

workQueue.clear();
}
}
}
}

private class TransactionRunner implements Runnable {

private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException {
while (keepRunning.get()) {
var unreservedTid = workQueue.poll(100, MILLISECONDS);

if (unreservedTid == null) {
continue;
}
var optionalopStore = store.tryReserve(unreservedTid);
if (optionalopStore.isPresent()) {
return optionalopStore;
}
}

return Optional.empty();
}

@Override
public void run() {
while (keepRunning.get()) {
long deferTime = 0;
FateTxStore<T> txStore = null;
try {
txStore = store.reserve();
var optionalopStore = reserveFateTx();
if (optionalopStore.isPresent()) {
txStore = optionalopStore.orElseThrow();
} else {
continue;
}
TStatus status = txStore.getStatus();
Repo<T> op = txStore.top();
if (status == FAILED_IN_PROGRESS) {
processFailed(txStore, op);
} else {
} else if (status == SUBMITTED || status == IN_PROGRESS) {
Repo<T> prevOp = null;
try {
deferTime = op.isReady(txStore.getID(), environment);
Expand Down Expand Up @@ -231,6 +301,7 @@ public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStr
this.environment = environment;
final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
this.workQueue = new LinkedTransferQueue<>();
this.fatePoolWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.schedule(() -> {
Expand All @@ -257,6 +328,9 @@ public Fate(T environment, FateStore<T> store, Function<Repo<T>,String> toLogStr
}
}, 3, SECONDS));
this.executor = pool;

this.workFinder = Threads.createThread("Fate work finder", new WorkFinder());
this.workFinder.start();
}

// get a transaction id back to the requester before doing any work
Expand Down Expand Up @@ -399,6 +473,12 @@ public void shutdown() {
if (executor != null) {
executor.shutdown();
}
workFinder.interrupt();
try {
workFinder.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
17 changes: 5 additions & 12 deletions core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
*/
long create();

/**
* An interface that allows read/write access to the data related to a single fate operation.
*/
interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
@Override
Repo<T> top();
Expand Down Expand Up @@ -81,8 +84,8 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
* upon successful return the store now controls the referenced transaction id. caller should no
* longer interact with it.
*
* @param deferTime time in millis to keep this transaction out of the pool used in the
* {@link #reserve() reserve} method. must be non-negative.
* @param deferTime time in millis to keep this transaction from being returned by
* {@link #runnable(java.util.concurrent.atomic.AtomicBoolean)}. Must be non-negative.
*/
void unreserve(long deferTime);
}
Expand All @@ -104,14 +107,4 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
*/
FateTxStore<T> reserve(long tid);

/**
* Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS.
*
* Reserving a transaction id ensures that nothing else in-process interacting via the same
* instance will be operating on that transaction id.
*
* @return a transaction id that is safe to interact with, chosen by the store.
*/
FateTxStore<T> reserve();

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.Serializable;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Read only access to a Transaction Store.
Expand Down Expand Up @@ -121,4 +123,11 @@ interface ReadOnlyFateTxStore<T> {
* @return all outstanding transactions, including those reserved by others.
*/
List<Long> list();

/**
* @return an iterator over fate op ids that are (IN_PROGRESS or FAILED_IN_PROGRESS) and
* unreserved. This method will block until it finds something that is runnable or until
* the keepWaiting parameter is false.
*/
Iterator<Long> runnable(AtomicBoolean keepWaiting);
}
70 changes: 70 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/fate/SignalCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.fate;

import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.LongPredicate;

import com.google.common.base.Preconditions;

class SignalCount {
private long count = 0;

synchronized void increment() {
count++;
this.notifyAll();
}

synchronized void decrement() {
Preconditions.checkState(count > 0);
count--;
this.notifyAll();
}

synchronized long getCount() {
return count;
}

synchronized boolean waitFor(LongPredicate predicate, BooleanSupplier keepWaiting) {
return waitFor(predicate, Long.MAX_VALUE, keepWaiting);
}

synchronized boolean waitFor(LongPredicate predicate, long maxWait, BooleanSupplier keepWaiting) {
Preconditions.checkArgument(maxWait >= 0);

if (maxWait == 0) {
return predicate.test(count);
}

long start = System.nanoTime();

while (!predicate.test(count) && keepWaiting.getAsBoolean()
&& TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) < maxWait) {
try {
wait(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

return predicate.test(count);
}
}
Loading