Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 30 additions & 22 deletions src/main/java/org/dataloader/DataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

Expand Down Expand Up @@ -77,6 +79,7 @@ public class DataLoader<K, V extends @Nullable Object> {
private final ValueCache<K, V> valueCache;
private final DataLoaderOptions options;
private final Object batchLoadFunction;
final Lock lock;

@VisibleForTesting
DataLoader(@Nullable String name, Object batchLoadFunction, @Nullable DataLoaderOptions options) {
Expand All @@ -93,7 +96,7 @@ public class DataLoader<K, V extends @Nullable Object> {
this.batchLoadFunction = nonNull(batchLoadFunction);
this.options = loaderOptions;
this.name = name;

this.lock = new ReentrantLock();
this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.valueCache, this.stats, clock);
}

Expand Down Expand Up @@ -261,18 +264,16 @@ public CompletableFuture<List<V>> loadMany(List<K> keys, List<Object> keyContext
nonNull(keys);
nonNull(keyContexts);

synchronized (this) {
List<CompletableFuture<V>> collect = new ArrayList<>(keys.size());
for (int i = 0; i < keys.size(); i++) {
K key = keys.get(i);
Object keyContext = null;
if (i < keyContexts.size()) {
keyContext = keyContexts.get(i);
}
collect.add(load(key, keyContext));
List<CompletableFuture<V>> collect = new ArrayList<>(keys.size());
for (int i = 0; i < keys.size(); i++) {
K key = keys.get(i);
Object keyContext = null;
if (i < keyContexts.size()) {
keyContext = keyContexts.get(i);
}
return CompletableFutureKit.allOf(collect);
collect.add(load(key, keyContext));
}
return CompletableFutureKit.allOf(collect);
}

/**
Expand All @@ -292,15 +293,13 @@ public CompletableFuture<List<V>> loadMany(List<K> keys, List<Object> keyContext
public CompletableFuture<Map<K, V>> loadMany(Map<K, ?> keysAndContexts) {
nonNull(keysAndContexts);

synchronized (this) {
Map<K, CompletableFuture<V>> collect = new HashMap<>(keysAndContexts.size());
for (Map.Entry<K, ?> entry : keysAndContexts.entrySet()) {
K key = entry.getKey();
Object keyContext = entry.getValue();
collect.put(key, load(key, keyContext));
}
return CompletableFutureKit.allOf(collect);
Map<K, CompletableFuture<V>> collect = new HashMap<>(keysAndContexts.size());
for (Map.Entry<K, ?> entry : keysAndContexts.entrySet()) {
K key = entry.getKey();
Object keyContext = entry.getValue();
collect.put(key, load(key, keyContext));
}
return CompletableFutureKit.allOf(collect);
}

/**
Expand Down Expand Up @@ -376,9 +375,12 @@ public DataLoader<K, V> clear(K key) {
*/
public DataLoader<K, V> clear(K key, BiConsumer<Void, Throwable> handler) {
Object cacheKey = getCacheKey(key);
synchronized (this) {
try {
lock.lock();
futureCache.delete(cacheKey);
valueCache.delete(key).whenComplete(handler);
} finally {
lock.unlock();
}
return this;
}
Expand All @@ -400,9 +402,12 @@ public DataLoader<K, V> clearAll() {
* @return the data loader for fluent coding
*/
public DataLoader<K, V> clearAll(BiConsumer<Void, Throwable> handler) {
synchronized (this) {
try {
lock.lock();
futureCache.clear();
valueCache.clear().whenComplete(handler);
} finally {
lock.unlock();
}
return this;
}
Expand Down Expand Up @@ -442,10 +447,13 @@ public DataLoader<K, V> prime(K key, Exception error) {
*/
public DataLoader<K, V> prime(K key, CompletableFuture<V> value) {
Object cacheKey = getCacheKey(key);
synchronized (this) {
try {
lock.lock();
if (!futureCache.containsKey(cacheKey)) {
futureCache.set(cacheKey, value);
}
} finally {
lock.unlock();
}
return this;
}
Expand Down
37 changes: 30 additions & 7 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -82,6 +83,7 @@ Object getCallContext() {
private final StatisticsCollector stats;
private final Clock clock;
private final AtomicReference<Instant> lastDispatchTime;
private final Lock lock;

DataLoaderHelper(DataLoader<K, V> dataLoader,
Object batchLoadFunction,
Expand All @@ -95,6 +97,7 @@ Object getCallContext() {
this.loaderOptions = loaderOptions;
this.futureCache = futureCache;
this.valueCache = valueCache;
this.lock = dataLoader.lock;
this.loaderQueue = new ArrayList<>();
this.stats = stats;
this.clock = clock;
Expand All @@ -111,7 +114,8 @@ public Instant getLastDispatchTime() {
}

Optional<CompletableFuture<V>> getIfPresent(K key) {
synchronized (dataLoader) {
try {
lock.lock();
boolean cachingEnabled = loaderOptions.cachingEnabled();
if (cachingEnabled) {
Object cacheKey = getCacheKey(nonNull(key));
Expand All @@ -124,26 +128,35 @@ Optional<CompletableFuture<V>> getIfPresent(K key) {
} catch (Exception ignored) {
}
}
} finally {
lock.unlock();
}
return Optional.empty();
}

Optional<CompletableFuture<V>> getIfCompleted(K key) {
synchronized (dataLoader) {
try {
lock.lock();

Optional<CompletableFuture<V>> cachedPromise = getIfPresent(key);
if (cachedPromise.isPresent()) {
CompletableFuture<V> promise = cachedPromise.get();
if (promise.isDone()) {
return cachedPromise;
}
}
} finally {
lock.unlock();
}
return Optional.empty();
}


@GuardedBy("lock")
CompletableFuture<V> load(K key, Object loadContext) {
synchronized (dataLoader) {
try {
lock.lock();

boolean batchingEnabled = loaderOptions.batchingEnabled();
boolean cachingEnabled = loaderOptions.cachingEnabled();

Expand All @@ -158,6 +171,8 @@ CompletableFuture<V> load(K key, Object loadContext) {
ctx.onDispatched();
cf.whenComplete(ctx::onCompleted);
return cf;
} finally {
lock.unlock();
}
}

Expand All @@ -173,14 +188,17 @@ Object getCacheKeyWithContext(K key, Object context) {
loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key;
}

@GuardedBy("lock")
DispatchResult<V> dispatch() {
DataLoaderInstrumentationContext<DispatchResult<?>> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader));

boolean batchingEnabled = loaderOptions.batchingEnabled();
final List<K> keys;
final List<Object> callContexts;
final List<CompletableFuture<V>> queuedFutures;
synchronized (dataLoader) {
try {
lock.lock();

int queueSize = loaderQueue.size();
if (queueSize == 0) {
lastDispatchTime.set(now());
Expand All @@ -200,6 +218,8 @@ DispatchResult<V> dispatch() {
});
loaderQueue.clear();
lastDispatchTime.set(now());
} finally {
lock.unlock();
}
if (!batchingEnabled) {
instrCtx.onDispatched();
Expand Down Expand Up @@ -334,7 +354,7 @@ private void possiblyClearCacheEntriesOnExceptions(List<K> keys) {
}
}

@GuardedBy("dataLoader")
@GuardedBy("lock")
private CompletableFuture<V> loadFromCache(K key, Object loadContext, boolean batchingEnabled) {
final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext);

Expand All @@ -353,7 +373,7 @@ private CompletableFuture<V> loadFromCache(K key, Object loadContext, boolean ba
return loadCallFuture;
}

@GuardedBy("dataLoader")
@GuardedBy("lock")
private CompletableFuture<V> queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, boolean cachingEnabled) {
if (batchingEnabled) {
CompletableFuture<V> loadCallFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -606,8 +626,11 @@ private DataLoaderInstrumentation instrumentation() {
}

int dispatchDepth() {
synchronized (dataLoader) {
try {
lock.lock();
return loaderQueue.size();
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.dataloader.impl.Assertions.assertState;

Expand All @@ -25,6 +27,7 @@ abstract class AbstractBatchSubscriber<K, V, T> implements Subscriber<T> {
final List<Object> callContexts;
final List<CompletableFuture<V>> queuedFutures;
final ReactiveSupport.HelperIntegration<K> helperIntegration;
final Lock lock = new ReentrantLock();

List<K> clearCacheKeys = new ArrayList<>();
List<V> completedValues = new ArrayList<>();
Expand Down
90 changes: 53 additions & 37 deletions src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,58 +29,74 @@ class BatchSubscriberImpl<K, V> extends AbstractBatchSubscriber<K, V, V> {
super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration);
}

// onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
// onNext may be called by multiple threads - for the time being, we use a ReentrantLock to guarantee
// correctness (at the cost of speed).
@Override
public synchronized void onNext(V value) {
super.onNext(value);
public void onNext(V value) {
try {
lock.lock();

if (idx >= keys.size()) {
// hang on they have given us more values than we asked for in keys
// we cant handle this
return;
}
K key = keys.get(idx);
Object callContext = callContexts.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
onNextValue(key, value, callContext, List.of(future));
super.onNext(value);

if (idx >= keys.size()) {
// hang on they have given us more values than we asked for in keys
// we cant handle this
return;
}
K key = keys.get(idx);
Object callContext = callContexts.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
onNextValue(key, value, callContext, List.of(future));

completedValues.add(value);
idx++;
completedValues.add(value);
idx++;
} finally {
lock.unlock();
}
}


@Override
public synchronized void onComplete() {
super.onComplete();
if (keys.size() != completedValues.size()) {
// we have more or less values than promised
// we will go through all the outstanding promises and mark those that
// have not finished as failed
for (CompletableFuture<V> queuedFuture : queuedFutures) {
if (!queuedFuture.isDone()) {
queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list"));
public void onComplete() {
try {
lock.lock();
super.onComplete();
if (keys.size() != completedValues.size()) {
// we have more or less values than promised
// we will go through all the outstanding promises and mark those that
// have not finished as failed
for (CompletableFuture<V> queuedFuture : queuedFutures) {
if (!queuedFuture.isDone()) {
queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list"));
}
}
}
possiblyClearCacheEntriesOnExceptions();
valuesFuture.complete(completedValues);
} finally {
lock.unlock();
}
possiblyClearCacheEntriesOnExceptions();
valuesFuture.complete(completedValues);
}

@Override
public synchronized void onError(Throwable ex) {
super.onError(ex);
ex = unwrapThrowable(ex);
// Set the remaining keys to the exception.
for (int i = idx; i < queuedFutures.size(); i++) {
K key = keys.get(i);
CompletableFuture<V> future = queuedFutures.get(i);
if (!future.isDone()) {
future.completeExceptionally(ex);
// clear any cached view of this key because it failed
helperIntegration.clearCacheView(key);
public void onError(Throwable ex) {
try {
lock.lock();
super.onError(ex);
ex = unwrapThrowable(ex);
// Set the remaining keys to the exception.
for (int i = idx; i < queuedFutures.size(); i++) {
K key = keys.get(i);
CompletableFuture<V> future = queuedFutures.get(i);
if (!future.isDone()) {
future.completeExceptionally(ex);
// clear any cached view of this key because it failed
helperIntegration.clearCacheView(key);
}
}
valuesFuture.completeExceptionally(ex);
} finally {
lock.unlock();
}
valuesFuture.completeExceptionally(ex);
}
}
Loading