From 1804706fc1f3e6cdc7c91f94648d6f3be1fef001 Mon Sep 17 00:00:00 2001 From: bbaker Date: Tue, 14 Oct 2025 11:54:03 +1100 Subject: [PATCH] Updating the code to use reentrant locks --- src/main/java/org/dataloader/DataLoader.java | 52 ++++++----- .../java/org/dataloader/DataLoaderHelper.java | 37 ++++++-- .../reactive/AbstractBatchSubscriber.java | 3 + .../reactive/BatchSubscriberImpl.java | 90 ++++++++++-------- .../reactive/MappedBatchSubscriberImpl.java | 92 +++++++++++-------- 5 files changed, 170 insertions(+), 104 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 321b58c..68e699b 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -35,6 +35,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -77,6 +79,7 @@ public class DataLoader { private final ValueCache valueCache; private final DataLoaderOptions options; private final Object batchLoadFunction; + final Lock lock; @VisibleForTesting DataLoader(@Nullable String name, Object batchLoadFunction, @Nullable DataLoaderOptions options) { @@ -93,7 +96,7 @@ public class DataLoader { this.batchLoadFunction = nonNull(batchLoadFunction); this.options = loaderOptions; this.name = name; - + this.lock = new ReentrantLock(); this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.valueCache, this.stats, clock); } @@ -261,18 +264,16 @@ public CompletableFuture> loadMany(List keys, List keyContext nonNull(keys); nonNull(keyContexts); - synchronized (this) { - List> collect = new ArrayList<>(keys.size()); - for (int i = 0; i < keys.size(); i++) { - K key = keys.get(i); - Object keyContext = null; - if (i < keyContexts.size()) { - keyContext = keyContexts.get(i); - } - collect.add(load(key, keyContext)); + List> collect = new ArrayList<>(keys.size()); + for (int i = 0; i < keys.size(); i++) { + K key = keys.get(i); + Object keyContext = null; + if (i < keyContexts.size()) { + keyContext = keyContexts.get(i); } - return CompletableFutureKit.allOf(collect); + collect.add(load(key, keyContext)); } + return CompletableFutureKit.allOf(collect); } /** @@ -292,15 +293,13 @@ public CompletableFuture> loadMany(List keys, List keyContext public CompletableFuture> loadMany(Map keysAndContexts) { nonNull(keysAndContexts); - synchronized (this) { - Map> collect = new HashMap<>(keysAndContexts.size()); - for (Map.Entry entry : keysAndContexts.entrySet()) { - K key = entry.getKey(); - Object keyContext = entry.getValue(); - collect.put(key, load(key, keyContext)); - } - return CompletableFutureKit.allOf(collect); + Map> collect = new HashMap<>(keysAndContexts.size()); + for (Map.Entry entry : keysAndContexts.entrySet()) { + K key = entry.getKey(); + Object keyContext = entry.getValue(); + collect.put(key, load(key, keyContext)); } + return CompletableFutureKit.allOf(collect); } /** @@ -376,9 +375,12 @@ public DataLoader clear(K key) { */ public DataLoader clear(K key, BiConsumer handler) { Object cacheKey = getCacheKey(key); - synchronized (this) { + try { + lock.lock(); futureCache.delete(cacheKey); valueCache.delete(key).whenComplete(handler); + } finally { + lock.unlock(); } return this; } @@ -400,9 +402,12 @@ public DataLoader clearAll() { * @return the data loader for fluent coding */ public DataLoader clearAll(BiConsumer handler) { - synchronized (this) { + try { + lock.lock(); futureCache.clear(); valueCache.clear().whenComplete(handler); + } finally { + lock.unlock(); } return this; } @@ -442,10 +447,13 @@ public DataLoader prime(K key, Exception error) { */ public DataLoader prime(K key, CompletableFuture value) { Object cacheKey = getCacheKey(key); - synchronized (this) { + try { + lock.lock(); if (!futureCache.containsKey(cacheKey)) { futureCache.set(cacheKey, value); } + } finally { + lock.unlock(); } return this; } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index f4a8f10..83389e4 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -82,6 +83,7 @@ Object getCallContext() { private final StatisticsCollector stats; private final Clock clock; private final AtomicReference lastDispatchTime; + private final Lock lock; DataLoaderHelper(DataLoader dataLoader, Object batchLoadFunction, @@ -95,6 +97,7 @@ Object getCallContext() { this.loaderOptions = loaderOptions; this.futureCache = futureCache; this.valueCache = valueCache; + this.lock = dataLoader.lock; this.loaderQueue = new ArrayList<>(); this.stats = stats; this.clock = clock; @@ -111,7 +114,8 @@ public Instant getLastDispatchTime() { } Optional> getIfPresent(K key) { - synchronized (dataLoader) { + try { + lock.lock(); boolean cachingEnabled = loaderOptions.cachingEnabled(); if (cachingEnabled) { Object cacheKey = getCacheKey(nonNull(key)); @@ -124,12 +128,16 @@ Optional> getIfPresent(K key) { } catch (Exception ignored) { } } + } finally { + lock.unlock(); } return Optional.empty(); } Optional> getIfCompleted(K key) { - synchronized (dataLoader) { + try { + lock.lock(); + Optional> cachedPromise = getIfPresent(key); if (cachedPromise.isPresent()) { CompletableFuture promise = cachedPromise.get(); @@ -137,13 +145,18 @@ Optional> getIfCompleted(K key) { return cachedPromise; } } + } finally { + lock.unlock(); } return Optional.empty(); } + @GuardedBy("lock") CompletableFuture load(K key, Object loadContext) { - synchronized (dataLoader) { + try { + lock.lock(); + boolean batchingEnabled = loaderOptions.batchingEnabled(); boolean cachingEnabled = loaderOptions.cachingEnabled(); @@ -158,6 +171,8 @@ CompletableFuture load(K key, Object loadContext) { ctx.onDispatched(); cf.whenComplete(ctx::onCompleted); return cf; + } finally { + lock.unlock(); } } @@ -173,6 +188,7 @@ Object getCacheKeyWithContext(K key, Object context) { loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key; } + @GuardedBy("lock") DispatchResult dispatch() { DataLoaderInstrumentationContext> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader)); @@ -180,7 +196,9 @@ DispatchResult dispatch() { final List keys; final List callContexts; final List> queuedFutures; - synchronized (dataLoader) { + try { + lock.lock(); + int queueSize = loaderQueue.size(); if (queueSize == 0) { lastDispatchTime.set(now()); @@ -200,6 +218,8 @@ DispatchResult dispatch() { }); loaderQueue.clear(); lastDispatchTime.set(now()); + } finally { + lock.unlock(); } if (!batchingEnabled) { instrCtx.onDispatched(); @@ -334,7 +354,7 @@ private void possiblyClearCacheEntriesOnExceptions(List keys) { } } - @GuardedBy("dataLoader") + @GuardedBy("lock") private CompletableFuture loadFromCache(K key, Object loadContext, boolean batchingEnabled) { final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext); @@ -353,7 +373,7 @@ private CompletableFuture loadFromCache(K key, Object loadContext, boolean ba return loadCallFuture; } - @GuardedBy("dataLoader") + @GuardedBy("lock") private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, boolean cachingEnabled) { if (batchingEnabled) { CompletableFuture loadCallFuture = new CompletableFuture<>(); @@ -606,8 +626,11 @@ private DataLoaderInstrumentation instrumentation() { } int dispatchDepth() { - synchronized (dataLoader) { + try { + lock.lock(); return loaderQueue.size(); + } finally { + lock.unlock(); } } diff --git a/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java index c2f5438..76c94b3 100644 --- a/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java +++ b/src/main/java/org/dataloader/reactive/AbstractBatchSubscriber.java @@ -10,6 +10,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import static org.dataloader.impl.Assertions.assertState; @@ -25,6 +27,7 @@ abstract class AbstractBatchSubscriber implements Subscriber { final List callContexts; final List> queuedFutures; final ReactiveSupport.HelperIntegration helperIntegration; + final Lock lock = new ReentrantLock(); List clearCacheKeys = new ArrayList<>(); List completedValues = new ArrayList<>(); diff --git a/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java index d0b8110..74a500a 100644 --- a/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java +++ b/src/main/java/org/dataloader/reactive/BatchSubscriberImpl.java @@ -29,58 +29,74 @@ class BatchSubscriberImpl extends AbstractBatchSubscriber { super(valuesFuture, keys, callContexts, queuedFutures, helperIntegration); } - // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee + // onNext may be called by multiple threads - for the time being, we use a ReentrantLock to guarantee // correctness (at the cost of speed). @Override - public synchronized void onNext(V value) { - super.onNext(value); + public void onNext(V value) { + try { + lock.lock(); - if (idx >= keys.size()) { - // hang on they have given us more values than we asked for in keys - // we cant handle this - return; - } - K key = keys.get(idx); - Object callContext = callContexts.get(idx); - CompletableFuture future = queuedFutures.get(idx); - onNextValue(key, value, callContext, List.of(future)); + super.onNext(value); + + if (idx >= keys.size()) { + // hang on they have given us more values than we asked for in keys + // we cant handle this + return; + } + K key = keys.get(idx); + Object callContext = callContexts.get(idx); + CompletableFuture future = queuedFutures.get(idx); + onNextValue(key, value, callContext, List.of(future)); - completedValues.add(value); - idx++; + completedValues.add(value); + idx++; + } finally { + lock.unlock(); + } } @Override - public synchronized void onComplete() { - super.onComplete(); - if (keys.size() != completedValues.size()) { - // we have more or less values than promised - // we will go through all the outstanding promises and mark those that - // have not finished as failed - for (CompletableFuture queuedFuture : queuedFutures) { - if (!queuedFuture.isDone()) { - queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); + public void onComplete() { + try { + lock.lock(); + super.onComplete(); + if (keys.size() != completedValues.size()) { + // we have more or less values than promised + // we will go through all the outstanding promises and mark those that + // have not finished as failed + for (CompletableFuture queuedFuture : queuedFutures) { + if (!queuedFuture.isDone()) { + queuedFuture.completeExceptionally(new DataLoaderAssertionException("The size of the promised values MUST be the same size as the key list")); + } } } + possiblyClearCacheEntriesOnExceptions(); + valuesFuture.complete(completedValues); + } finally { + lock.unlock(); } - possiblyClearCacheEntriesOnExceptions(); - valuesFuture.complete(completedValues); } @Override - public synchronized void onError(Throwable ex) { - super.onError(ex); - ex = unwrapThrowable(ex); - // Set the remaining keys to the exception. - for (int i = idx; i < queuedFutures.size(); i++) { - K key = keys.get(i); - CompletableFuture future = queuedFutures.get(i); - if (!future.isDone()) { - future.completeExceptionally(ex); - // clear any cached view of this key because it failed - helperIntegration.clearCacheView(key); + public void onError(Throwable ex) { + try { + lock.lock(); + super.onError(ex); + ex = unwrapThrowable(ex); + // Set the remaining keys to the exception. + for (int i = idx; i < queuedFutures.size(); i++) { + K key = keys.get(i); + CompletableFuture future = queuedFutures.get(i); + if (!future.isDone()) { + future.completeExceptionally(ex); + // clear any cached view of this key because it failed + helperIntegration.clearCacheView(key); + } } + valuesFuture.completeExceptionally(ex); + } finally { + lock.unlock(); } - valuesFuture.completeExceptionally(ex); } } diff --git a/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java index d56efa0..3c937b0 100644 --- a/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java +++ b/src/main/java/org/dataloader/reactive/MappedBatchSubscriberImpl.java @@ -43,61 +43,77 @@ class MappedBatchSubscriberImpl extends AbstractBatchSubscriber entry) { - super.onNext(entry); - K key = entry.getKey(); - V value = entry.getValue(); + public void onNext(Map.Entry entry) { + try { + lock.lock(); + super.onNext(entry); + K key = entry.getKey(); + V value = entry.getValue(); - Object callContext = callContextByKey.get(key); - List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + Object callContext = callContextByKey.get(key); + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); - onNextValue(key, value, callContext, futures); + onNextValue(key, value, callContext, futures); - // did we have an actual key for this value - ignore it if they send us one outside the key set - if (!futures.isEmpty()) { - completedValuesByKey.put(key, value); + // did we have an actual key for this value - ignore it if they send us one outside the key set + if (!futures.isEmpty()) { + completedValuesByKey.put(key, value); + } + } finally { + lock.unlock(); } + } @Override - public synchronized void onComplete() { - super.onComplete(); + public void onComplete() { + try { + lock.lock(); + super.onComplete(); - possiblyClearCacheEntriesOnExceptions(); - List values = new ArrayList<>(keys.size()); - for (K key : keys) { - V value = completedValuesByKey.get(key); - values.add(value); + possiblyClearCacheEntriesOnExceptions(); + List values = new ArrayList<>(keys.size()); + for (K key : keys) { + V value = completedValuesByKey.get(key); + values.add(value); - List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); - for (CompletableFuture future : futures) { - if (!future.isDone()) { - // we have a future that never came back for that key - // but the publisher is done sending in data - it must be null - // e.g. for key X when found no value - future.complete(null); + List> futures = queuedFuturesByKey.getOrDefault(key, List.of()); + for (CompletableFuture future : futures) { + if (!future.isDone()) { + // we have a future that never came back for that key + // but the publisher is done sending in data - it must be null + // e.g. for key X when found no value + future.complete(null); + } } } + valuesFuture.complete(values); + } finally { + lock.unlock(); } - valuesFuture.complete(values); } @Override - public synchronized void onError(Throwable ex) { - super.onError(ex); - ex = unwrapThrowable(ex); - // Complete the futures for the remaining keys with the exception. - for (int idx = 0; idx < queuedFutures.size(); idx++) { - K key = keys.get(idx); - List> futures = queuedFuturesByKey.get(key); - if (!completedValuesByKey.containsKey(key)) { - for (CompletableFuture future : futures) { - future.completeExceptionally(ex); + public void onError(Throwable ex) { + try { + lock.lock(); + super.onError(ex); + ex = unwrapThrowable(ex); + // Complete the futures for the remaining keys with the exception. + for (int idx = 0; idx < queuedFutures.size(); idx++) { + K key = keys.get(idx); + List> futures = queuedFuturesByKey.get(key); + if (!completedValuesByKey.containsKey(key)) { + for (CompletableFuture future : futures) { + future.completeExceptionally(ex); + } + // clear any cached view of this key because they all failed + helperIntegration.clearCacheView(key); } - // clear any cached view of this key because they all failed - helperIntegration.clearCacheView(key); } + valuesFuture.completeExceptionally(ex); + } finally { + lock.unlock(); } - valuesFuture.completeExceptionally(ex); } }