From 9671d60862e6a6c4b7099ba72949e47844fbeecc Mon Sep 17 00:00:00 2001 From: bbaker Date: Mon, 27 Oct 2025 09:39:59 +1100 Subject: [PATCH] Updated the documentation of the CacheMap and also some refacoring of common code --- src/main/java/org/dataloader/CacheMap.java | 25 ++++++++++---- .../java/org/dataloader/DataLoaderHelper.java | 33 ++++++++++--------- .../org/dataloader/impl/DefaultCacheMap.java | 7 ++-- .../dataloader/DataLoaderCacheMapTest.java | 32 +++++++++++++++--- 4 files changed, 68 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/dataloader/CacheMap.java b/src/main/java/org/dataloader/CacheMap.java index b3929af..3e6d895 100644 --- a/src/main/java/org/dataloader/CacheMap.java +++ b/src/main/java/org/dataloader/CacheMap.java @@ -28,7 +28,13 @@ * CacheMap is used by data loaders that use caching promises to values aka {@link CompletableFuture}<V>. A better name for this * class might have been FutureCache but that is history now. *

- * The default implementation used by the data loader is based on a {@link java.util.LinkedHashMap}. + * The default implementation used by the data loader is based on a {@link java.util.concurrent.ConcurrentHashMap} because + * the data loader code requires the cache to prove atomic writes especially the {@link #putIfAbsentAtomically(Object, CompletableFuture)} + * method. + *

+ * The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your + * own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is + * returned always. *

* This is really a cache of completed {@link CompletableFuture}<V> values in memory. It is used, when caching is enabled, to * give back the same future to any code that may call it. If you need a cache of the underlying values that is possible external to the JVM @@ -42,7 +48,7 @@ */ @PublicSpi @NullMarked -public interface CacheMap { +public interface CacheMap { /** * Creates a new cache map, using the default implementation that is based on a {@link java.util.LinkedHashMap}. @@ -84,14 +90,21 @@ static CacheMap simpleMap() { Collection> getAll(); /** - * Creates a new cache map entry with the specified key and value, or updates the value if the key already exists. + * Atomically creates a new cache map entry with the specified key and value, or updates the value if the key already exists. + *

+ * The data loader code using a Compare and Swap (CAS) algorithm to decide if a cache entry is present or not. If you write your + * own {@link CacheMap} implementation then you MUST provide atomic writes in this method to ensure that the same promise for a key is + * returned always. + *

+ * The default implementation of this method uses {@link java.util.concurrent.ConcurrentHashMap} has its implementation so these CAS + * writes work as expected. * * @param key the key to cache * @param value the value to cache * - * @return the cache map for fluent coding + * @return atomically the previous value for the key or null if the value is not present. */ - CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value); + @Nullable CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value); /** * Deletes the entry with the specified key from the cache map, if it exists. @@ -114,7 +127,7 @@ static CacheMap simpleMap() { * and intended for testing and debugging. * If a cache doesn't support it, it can throw an Exception. * - * @return + * @return the size of the cache */ int size(); } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 249c1f2..feb6184 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -49,7 +49,7 @@ @Internal class DataLoaderHelper { - static class LoaderQueueEntry { + private static class LoaderQueueEntry { final K key; final CompletableFuture value; @@ -155,11 +155,8 @@ CompletableFuture load(K key, Object loadContext) { try { CompletableFuture cachedFuture = futureCache.get(cacheKey); if (cachedFuture != null) { - // We already have a promise for this key, no need to check value cache or queue up load - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); - ctx.onDispatched(); - cachedFuture.whenComplete(ctx::onCompleted); - return cachedFuture; + // We already have a promise for this key, no need to check value cache or queue this load + return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); } } catch (Exception ignored) { } @@ -170,11 +167,8 @@ CompletableFuture load(K key, Object loadContext) { if (futureCachingEnabled) { CompletableFuture cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture); if (cachedFuture != null) { - // another thread was faster and created a matching CF ... hence this is really a cachehit and we are done - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); - ctx.onDispatched(); - cachedFuture.whenComplete(ctx::onCompleted); - return cachedFuture; + // another thread was faster and created a matching CF ... hence this is really a cache hit and we are done + return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); } } addEntryToLoaderQueue(key, loadCallFuture, loadContext); @@ -186,12 +180,9 @@ CompletableFuture load(K key, Object loadContext) { CompletableFuture cachedFuture = futureCache.putIfAbsentAtomically(cacheKey, loadCallFuture); if (cachedFuture != null) { // another thread was faster and the loader was invoked twice with the same key - // we are disregarding the resul of our dispatch call and use the already cached value + // we are disregarding the result of our dispatch call and use the already cached value // meaning this is a cache hit and we are done - stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); - ctx.onDispatched(); - cachedFuture.whenComplete(ctx::onCompleted); - return cachedFuture; + return incrementCacheHitAndReturnCF(ctx, key, loadContext, cachedFuture); } } } @@ -201,6 +192,13 @@ CompletableFuture load(K key, Object loadContext) { return loadCallFuture; } + private CompletableFuture incrementCacheHitAndReturnCF(DataLoaderInstrumentationContext ctx, K key, Object loadContext, CompletableFuture cachedFuture) { + stats.incrementCacheHitCount(new IncrementCacheHitCountStatisticsContext<>(key, loadContext)); + ctx.onDispatched(); + cachedFuture.whenComplete(ctx::onCompleted); + return cachedFuture; + } + private void addEntryToLoaderQueue(K key, CompletableFuture future, Object loadContext) { while (true) { LoaderQueueEntry prev = loaderQueue.get(); @@ -223,6 +221,7 @@ Object getCacheKeyWithContext(K key, Object context) { loaderOptions.cacheKeyFunction().get().getKeyWithContext(key, context) : key; } + @SuppressWarnings("unchecked") DispatchResult dispatch() { DataLoaderInstrumentationContext> instrCtx = ctxOrNoopCtx(instrumentation().beginDispatch(dataLoader)); @@ -232,6 +231,8 @@ DispatchResult dispatch() { while (true) { loaderQueueEntryHead = loaderQueue.get(); if (loaderQueue.compareAndSet(loaderQueueEntryHead, null)) { + // one or more threads competed and this one won. This thread holds + // the loader queue root in the local variable loaderQueueEntryHead break; } } diff --git a/src/main/java/org/dataloader/impl/DefaultCacheMap.java b/src/main/java/org/dataloader/impl/DefaultCacheMap.java index b6c811f..e8db681 100644 --- a/src/main/java/org/dataloader/impl/DefaultCacheMap.java +++ b/src/main/java/org/dataloader/impl/DefaultCacheMap.java @@ -18,6 +18,8 @@ import org.dataloader.CacheMap; import org.dataloader.annotations.Internal; +import org.jspecify.annotations.NullMarked; +import org.jspecify.annotations.Nullable; import java.util.Collection; import java.util.concurrent.CompletableFuture; @@ -32,6 +34,7 @@ * @author Arnold Schrijver */ @Internal +@NullMarked public class DefaultCacheMap implements CacheMap { private final ConcurrentHashMap> cache; @@ -56,7 +59,7 @@ public boolean containsKey(K key) { * {@inheritDoc} */ @Override - public CompletableFuture get(K key) { + public @Nullable CompletableFuture get(K key) { return cache.get(key); } @@ -72,7 +75,7 @@ public Collection> getAll() { * {@inheritDoc} */ @Override - public CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value) { + public @Nullable CompletableFuture putIfAbsentAtomically(K key, CompletableFuture value) { return cache.putIfAbsent(key, value); } diff --git a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java index df364a2..d3de4aa 100644 --- a/src/test/java/org/dataloader/DataLoaderCacheMapTest.java +++ b/src/test/java/org/dataloader/DataLoaderCacheMapTest.java @@ -14,6 +14,7 @@ /** * Tests for cacheMap functionality.. */ +@SuppressWarnings("NullableProblems") public class DataLoaderCacheMapTest { private BatchLoader keysAsValues() { @@ -24,12 +25,33 @@ private BatchLoader keysAsValues() { public void should_provide_all_futures_from_cache() { DataLoader dataLoader = newDataLoader(keysAsValues()); - dataLoader.load(1); - dataLoader.load(2); - dataLoader.load(1); + CompletableFuture cf1 = dataLoader.load(1); + CompletableFuture cf2 = dataLoader.load(2); + CompletableFuture cf3 = dataLoader.load(3); + + CacheMap cacheMap = dataLoader.getCacheMap(); + Collection> futures = cacheMap.getAll(); + assertThat(futures.size(), equalTo(3)); + + + assertThat(cacheMap.get(1), equalTo(cf1)); + assertThat(cacheMap.get(2), equalTo(cf2)); + assertThat(cacheMap.get(3), equalTo(cf3)); + assertThat(cacheMap.containsKey(1), equalTo(true)); + assertThat(cacheMap.containsKey(2), equalTo(true)); + assertThat(cacheMap.containsKey(3), equalTo(true)); + assertThat(cacheMap.containsKey(4), equalTo(false)); + + cacheMap.delete(1); + assertThat(cacheMap.containsKey(1), equalTo(false)); + assertThat(cacheMap.containsKey(2), equalTo(true)); + + cacheMap.clear(); + assertThat(cacheMap.containsKey(1), equalTo(false)); + assertThat(cacheMap.containsKey(2), equalTo(false)); + assertThat(cacheMap.containsKey(3), equalTo(false)); + assertThat(cacheMap.containsKey(4), equalTo(false)); - Collection> futures = dataLoader.getCacheMap().getAll(); - assertThat(futures.size(), equalTo(2)); } @Test