From 9674ad5fd9f03b8da21fb2be525cabf989c397de Mon Sep 17 00:00:00 2001 From: Theo Emanuelsson Date: Sat, 21 Mar 2026 17:49:26 +0100 Subject: [PATCH] Propagate ProcessingContext to SnapshotStore and Snapshotter - Add @Nullable ProcessingContext parameter to SnapshotStore.store() and SnapshotStore.load(), aligned with TokenStore's signature - Add ProcessingContext parameter to Snapshotter.store() and Snapshotter.load() (non-null, internal interface always called from within a processing context) - Pass ProcessingContext through from SnapshottingSourcingHandler to StoreBackedSnapshotter to SnapshotStore - Update InMemorySnapshotStore and AxonServerSnapshotStore implementations - Update StoreBackedSnapshotterTest and StoreBackedSnapshotterTestSuite --- .../snapshot/AxonServerSnapshotStore.java | 7 +++- .../handler/SnapshottingSourcingHandler.java | 4 +- .../snapshot/api/Snapshotter.java | 7 +++- .../inmemory/InMemorySnapshotStore.java | 7 +++- .../snapshot/store/SnapshotStore.java | 13 ++++-- .../store/StoreBackedSnapshotter.java | 10 +++-- .../StoreBackedSnapshotterTestSuite.java | 42 +++++++++++-------- .../store/StoreBackedSnapshotterTest.java | 22 +++++----- 8 files changed, 68 insertions(+), 44 deletions(-) diff --git a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/snapshot/AxonServerSnapshotStore.java b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/snapshot/AxonServerSnapshotStore.java index 1d6aaa105d..5198657b86 100644 --- a/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/snapshot/AxonServerSnapshotStore.java +++ b/axon-server-connector/src/main/java/org/axonframework/axonserver/connector/snapshot/AxonServerSnapshotStore.java @@ -29,6 +29,7 @@ import org.axonframework.eventsourcing.snapshot.api.Snapshot; import org.axonframework.eventsourcing.snapshot.store.SnapshotStore; import org.axonframework.messaging.core.QualifiedName; +import org.axonframework.messaging.core.unitofwork.ProcessingContext; import org.jspecify.annotations.Nullable; import java.time.Instant; @@ -71,7 +72,8 @@ private ByteString makeKey(QualifiedName qn, Object identifier) { } @Override - public CompletableFuture store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot) { + public CompletableFuture store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot, + @Nullable ProcessingContext context) { Objects.requireNonNull(qualifiedName, "The qualifiedName parameter must not be null."); Objects.requireNonNull(identifier, "The identifier parameter must not be null."); Objects.requireNonNull(snapshot, "The snapshot parameter must not be null."); @@ -106,7 +108,8 @@ public CompletableFuture store(QualifiedName qualifiedName, Object identif } @Override - public CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier) { + public CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier, + @Nullable ProcessingContext context) { Objects.requireNonNull(qualifiedName, "The qualifiedName parameter must not be null."); Objects.requireNonNull(identifier, "The identifier parameter must not be null."); diff --git a/eventsourcing/src/main/java/org/axonframework/eventsourcing/handler/SnapshottingSourcingHandler.java b/eventsourcing/src/main/java/org/axonframework/eventsourcing/handler/SnapshottingSourcingHandler.java index bc8c25d8ff..6f43e249bb 100644 --- a/eventsourcing/src/main/java/org/axonframework/eventsourcing/handler/SnapshottingSourcingHandler.java +++ b/eventsourcing/src/main/java/org/axonframework/eventsourcing/handler/SnapshottingSourcingHandler.java @@ -101,7 +101,7 @@ public CompletableFuture source(I identifier, InitializingEntityEvolver EventCriteria criteria = criteriaResolver.resolve(identifier, pc); long startTime = System.currentTimeMillis(); - return snapshotter.load(identifier) + return snapshotter.load(identifier, pc) .exceptionally(e -> { LOGGER.warn("Snapshot loading failed, falling back to full reconstruction for: {} ({})", messageType, identifier, e); @@ -148,7 +148,7 @@ private CompletableFuture sourceAndEvolve( // Snapshot is made when specifically triggered by an event, or based on the statistics: if (triggerSnapshot.get() || snapshotPolicy.shouldSnapshot(new EvolutionResult(ec, sourcingTime))) { - snapshotter.store(identifier, entity, postionRef.get()); + snapshotter.store(identifier, entity, postionRef.get(), pc); } } diff --git a/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/api/Snapshotter.java b/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/api/Snapshotter.java index bffe6bf260..d962a26c21 100644 --- a/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/api/Snapshotter.java +++ b/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/api/Snapshotter.java @@ -18,6 +18,7 @@ import org.axonframework.common.annotation.Internal; import org.axonframework.eventsourcing.eventstore.Position; +import org.axonframework.messaging.core.unitofwork.ProcessingContext; import java.util.concurrent.CompletableFuture; @@ -39,10 +40,11 @@ public interface Snapshotter { * or the snapshot cannot be used. It may complete exceptionally if snapshot retrieval fails. * * @param identifier the identifier of the entity, cannot be {@code null} + * @param context the current {@link ProcessingContext}, cannot be {@code null} * @return a {@link CompletableFuture} that completes with the entity's snapshot or {@code null} * @throws NullPointerException if {@code identifier} is {@code null} */ - CompletableFuture load(I identifier); + CompletableFuture load(I identifier, ProcessingContext context); /** * Stores the given entity as a snapshot asynchronously. @@ -50,7 +52,8 @@ public interface Snapshotter { * @param identifier the identifier of the entity, cannot be {@code null} * @param entity the entity state, cannot be {@code null} * @param position the position in the event stream for this entity state, cannot be {@code null} + * @param context the current {@link ProcessingContext}, cannot be {@code null} * @throws NullPointerException if any argument is {@code null} */ - void store(I identifier, E entity, Position position); + void store(I identifier, E entity, Position position, ProcessingContext context); } \ No newline at end of file diff --git a/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/inmemory/InMemorySnapshotStore.java b/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/inmemory/InMemorySnapshotStore.java index 1aadec22f7..f0030fa701 100644 --- a/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/inmemory/InMemorySnapshotStore.java +++ b/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/inmemory/InMemorySnapshotStore.java @@ -19,6 +19,7 @@ import org.axonframework.eventsourcing.snapshot.api.Snapshot; import org.axonframework.eventsourcing.snapshot.store.SnapshotStore; import org.axonframework.messaging.core.QualifiedName; +import org.axonframework.messaging.core.unitofwork.ProcessingContext; import org.jspecify.annotations.Nullable; import java.util.Map; @@ -44,7 +45,8 @@ public class InMemorySnapshotStore implements SnapshotStore { private final Map> entitiesByIdentifierByName = new ConcurrentHashMap<>(); @Override - public CompletableFuture store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot) { + public CompletableFuture store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot, + @Nullable ProcessingContext context) { Objects.requireNonNull(qualifiedName, "The qualifiedName parameter must not be null."); Objects.requireNonNull(identifier, "The identifier parameter must not be null."); Objects.requireNonNull(snapshot, "The snapshot parameter must not be null."); @@ -57,7 +59,8 @@ public CompletableFuture store(QualifiedName qualifiedName, Object identif } @Override - public CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier) { + public CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier, + @Nullable ProcessingContext context) { Objects.requireNonNull(qualifiedName, "The qualifiedName parameter must not be null."); Objects.requireNonNull(identifier, "The identifier parameter must not be null."); diff --git a/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/store/SnapshotStore.java b/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/store/SnapshotStore.java index 9f16cadb1c..0088f824b8 100644 --- a/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/store/SnapshotStore.java +++ b/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/store/SnapshotStore.java @@ -19,6 +19,7 @@ import org.axonframework.common.annotation.Internal; import org.axonframework.eventsourcing.snapshot.api.Snapshot; import org.axonframework.messaging.core.QualifiedName; +import org.axonframework.messaging.core.unitofwork.ProcessingContext; import org.jspecify.annotations.Nullable; import java.util.concurrent.CompletableFuture; @@ -48,10 +49,12 @@ public interface SnapshotStore { * @param qualifiedName the name of the snapshot to persist, cannot be {@code null} * @param identifier the identifier of the snapshot to persist, cannot be {@code null} * @param snapshot the snapshot to persist, cannot be {@code null} + * @param context the current {@link ProcessingContext}, if any * @return a {@link CompletableFuture} that completes when the snapshot has been stored - * @throws NullPointerException if any argument is {@code null} + * @throws NullPointerException if {@code qualifiedName}, {@code identifier}, or {@code snapshot} is {@code null} */ - CompletableFuture store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot); + CompletableFuture store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot, + @Nullable ProcessingContext context); /** * Loads the latest snapshot for a given name and identifier. @@ -65,8 +68,10 @@ public interface SnapshotStore { * * @param qualifiedName the name of the snapshot, cannot be {@code null} * @param identifier the identifier of the snapshot, cannot be {@code null} + * @param context the current {@link ProcessingContext}, if any * @return a {@link CompletableFuture} containing the snapshot, or containing {@code null} if no matching snapshot exists - * @throws NullPointerException if any argument is {@code null} + * @throws NullPointerException if {@code qualifiedName} or {@code identifier} is {@code null} */ - CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier); + CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier, + @Nullable ProcessingContext context); } diff --git a/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/store/StoreBackedSnapshotter.java b/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/store/StoreBackedSnapshotter.java index d9da2389d1..6c8fe86bd8 100644 --- a/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/store/StoreBackedSnapshotter.java +++ b/eventsourcing/src/main/java/org/axonframework/eventsourcing/snapshot/store/StoreBackedSnapshotter.java @@ -24,6 +24,7 @@ import org.axonframework.eventsourcing.snapshot.api.SnapshotPolicy; import org.axonframework.eventsourcing.snapshot.api.Snapshotter; import org.axonframework.messaging.core.MessageType; +import org.axonframework.messaging.core.unitofwork.ProcessingContext; import org.axonframework.messaging.eventhandling.GenericEventMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,12 +94,12 @@ public StoreBackedSnapshotter( } @Override - public CompletableFuture load(I identifier) { + public CompletableFuture load(I identifier, ProcessingContext context) { Objects.requireNonNull(identifier, "The identifier parameter must not be null."); Snapshot inFlightSnapshot = inFlightSnapshots.get(identifier); CompletableFuture loadedSnapshot = inFlightSnapshot == null - ? store.load(type.qualifiedName(), identifier) + ? store.load(type.qualifiedName(), identifier, context) : CompletableFuture.completedFuture(inFlightSnapshot); return loadedSnapshot @@ -122,7 +123,8 @@ public CompletableFuture load(I identifier) { public void store( I identifier, E entity, - Position position + Position position, + ProcessingContext context ) { Objects.requireNonNull(identifier, "The identifier parameter must not be null."); Objects.requireNonNull(entity, "The entity parameter must not be null."); @@ -131,7 +133,7 @@ public void store( Snapshot newSnapshot = new Snapshot(position, type.version(), entity, GenericEventMessage.clock.instant(), Map.of()); inFlightSnapshots.put(identifier, newSnapshot); - store.store(type.qualifiedName(), identifier, newSnapshot).whenComplete((voidResult, ex) -> { + store.store(type.qualifiedName(), identifier, newSnapshot, context).whenComplete((voidResult, ex) -> { // note: only remove inflight snapshot from cache if not replaced concurrently inFlightSnapshots.remove(identifier, newSnapshot); diff --git a/eventsourcing/src/test/java/org/axonframework/eventsourcing/StoreBackedSnapshotterTestSuite.java b/eventsourcing/src/test/java/org/axonframework/eventsourcing/StoreBackedSnapshotterTestSuite.java index ab9c846fdb..a464420f3b 100644 --- a/eventsourcing/src/test/java/org/axonframework/eventsourcing/StoreBackedSnapshotterTestSuite.java +++ b/eventsourcing/src/test/java/org/axonframework/eventsourcing/StoreBackedSnapshotterTestSuite.java @@ -229,15 +229,18 @@ protected void shouldIgnoreSnapshotIfVersionUnsupported(@Named("TestAppender") L QualifiedName qualifiedName = new QualifiedName(Account.class); - snapshotStore.store( - qualifiedName, - ACCOUNT_ID, - new Snapshot( - new GlobalIndexPosition(3), - "42", // unsupported version which leads to this snapshot being ignored - new Account(ACCOUNT_ID, "My Account", 262144), - Instant.now(), - Map.of() + unitOfWorkFactory.create().executeWithResult(pc -> + snapshotStore.store( + qualifiedName, + ACCOUNT_ID, + new Snapshot( + new GlobalIndexPosition(3), + "42", // unsupported version which leads to this snapshot being ignored + new Account(ACCOUNT_ID, "My Account", 262144), + Instant.now(), + Map.of() + ), + pc ) ).join(); @@ -291,15 +294,18 @@ protected void shouldIgnoreExceptionsWhileLoadingSnapshot(@Named("TestAppender") QualifiedName qualifiedName = new QualifiedName(Account.class); - snapshotStore.store( - qualifiedName, - ACCOUNT_ID, - new Snapshot( - new GlobalIndexPosition(3), - "0.0.1", // correct version - "Junk", // incorrect data, which leads to deserialization error - Instant.now(), - Map.of() + unitOfWorkFactory.create().executeWithResult(pc -> + snapshotStore.store( + qualifiedName, + ACCOUNT_ID, + new Snapshot( + new GlobalIndexPosition(3), + "0.0.1", // correct version + "Junk", // incorrect data, which leads to deserialization error + Instant.now(), + Map.of() + ), + pc ) ).join(); diff --git a/eventsourcing/src/test/java/org/axonframework/eventsourcing/snapshot/store/StoreBackedSnapshotterTest.java b/eventsourcing/src/test/java/org/axonframework/eventsourcing/snapshot/store/StoreBackedSnapshotterTest.java index a39ad17045..725297d37b 100644 --- a/eventsourcing/src/test/java/org/axonframework/eventsourcing/snapshot/store/StoreBackedSnapshotterTest.java +++ b/eventsourcing/src/test/java/org/axonframework/eventsourcing/snapshot/store/StoreBackedSnapshotterTest.java @@ -25,6 +25,7 @@ import org.axonframework.eventsourcing.eventstore.Position; import org.axonframework.eventsourcing.snapshot.api.Snapshot; import org.axonframework.messaging.core.MessageType; +import org.axonframework.messaging.core.unitofwork.ProcessingContext; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -53,6 +54,7 @@ class StoreBackedSnapshotterTest { @Mock private SnapshotStore store; @Mock private Converter converter; + @Mock private ProcessingContext processingContext; private StoreBackedSnapshotter snapshotter; @@ -65,9 +67,9 @@ void beforeEach() { void loadingSnapshotShouldWork() { Snapshot expectedSnapshot = new Snapshot(Position.START, "0.1", "payload", Instant.now(), Map.of("answer", "42")); - when(store.load(TYPE.qualifiedName(), "1")).thenReturn(CompletableFuture.completedFuture(expectedSnapshot)); + when(store.load(eq(TYPE.qualifiedName()), eq("1"), any(ProcessingContext.class))).thenReturn(CompletableFuture.completedFuture(expectedSnapshot)); - assertThat(snapshotter.load("1").join()).isEqualTo(expectedSnapshot); + assertThat(snapshotter.load("1", processingContext).join()).isEqualTo(expectedSnapshot); } @Test @@ -76,25 +78,25 @@ void loadingSnapshotRequiringConversionShouldWork() { Snapshot expectedSnapshot = rawSnapshot.payload("payload"); when(converter.convert(0xDEADBEEF, String.class)).thenReturn("payload"); - when(store.load(TYPE.qualifiedName(), "1")).thenReturn(CompletableFuture.completedFuture(rawSnapshot)); + when(store.load(eq(TYPE.qualifiedName()), eq("1"), any(ProcessingContext.class))).thenReturn(CompletableFuture.completedFuture(rawSnapshot)); - assertThat(snapshotter.load("1").join()).isEqualTo(expectedSnapshot); + assertThat(snapshotter.load("1", processingContext).join()).isEqualTo(expectedSnapshot); } @Test void whenSnapshotMissingLoadShouldReturnNull() { - assertThat(snapshotter.load("1").join()).isNull(); + assertThat(snapshotter.load("1", processingContext).join()).isNull(); } @Test void whenSnapshotHasMismatchingVersionShouldLogMessageAndReturnNull(@Named("TestAppender") ListAppender appender) { Snapshot snapshot = new Snapshot(Position.START, "42.0", 0xDEADBEEF, Instant.now(), Map.of("answer", "42")); - when(store.load(TYPE.qualifiedName(), "1")).thenReturn(CompletableFuture.completedFuture(snapshot)); + when(store.load(eq(TYPE.qualifiedName()), eq("1"), any(ProcessingContext.class))).thenReturn(CompletableFuture.completedFuture(snapshot)); appender.clear(); - assertThat(snapshotter.load("1").join()).isNull(); + assertThat(snapshotter.load("1", processingContext).join()).isNull(); assertThat(appender.getEvents()) .extracting(LogEvent::getMessage) @@ -106,19 +108,19 @@ void whenSnapshotHasMismatchingVersionShouldLogMessageAndReturnNull(@Named("Test void successfulSnapshotStoreShouldLogNothing(@Named("TestAppender") ListAppender appender) { appender.clear(); - snapshotter.store("1", "payload", Position.START); + snapshotter.store("1", "payload", Position.START, processingContext); assertThat(appender.getEvents()).isEmpty(); } @Test void failureToStoreSnapshotShouldOnlyLogWarning(@Named("TestAppender") ListAppender appender) { - when(store.store(eq(TYPE.qualifiedName()), eq("1"), any(Snapshot.class))) + when(store.store(eq(TYPE.qualifiedName()), eq("1"), any(Snapshot.class), any(ProcessingContext.class))) .thenReturn(CompletableFuture.failedFuture(new IOException("busy"))); appender.clear(); - snapshotter.store("1", "payload", Position.START); + snapshotter.store("1", "payload", Position.START, processingContext); assertThat(appender.getEvents()) .extracting(LogEvent::getMessage)