Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.axonframework.eventsourcing.snapshot.api.Snapshot;
import org.axonframework.eventsourcing.snapshot.store.SnapshotStore;
import org.axonframework.messaging.core.QualifiedName;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.jspecify.annotations.Nullable;

import java.time.Instant;
Expand Down Expand Up @@ -71,7 +72,8 @@ private ByteString makeKey(QualifiedName qn, Object identifier) {
}

@Override
public CompletableFuture<Void> store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot) {
public CompletableFuture<Void> store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot,
@Nullable ProcessingContext context) {
Objects.requireNonNull(qualifiedName, "The qualifiedName parameter must not be null.");
Objects.requireNonNull(identifier, "The identifier parameter must not be null.");
Objects.requireNonNull(snapshot, "The snapshot parameter must not be null.");
Expand Down Expand Up @@ -106,7 +108,8 @@ public CompletableFuture<Void> store(QualifiedName qualifiedName, Object identif
}

@Override
public CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier) {
public CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier,
@Nullable ProcessingContext context) {
Objects.requireNonNull(qualifiedName, "The qualifiedName parameter must not be null.");
Objects.requireNonNull(identifier, "The identifier parameter must not be null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public CompletableFuture<E> source(I identifier, InitializingEntityEvolver<I, E>
EventCriteria criteria = criteriaResolver.resolve(identifier, pc);
long startTime = System.currentTimeMillis();

return snapshotter.load(identifier)
return snapshotter.load(identifier, pc)
.exceptionally(e -> {
LOGGER.warn("Snapshot loading failed, falling back to full reconstruction for: {} ({})", messageType, identifier, e);

Expand Down Expand Up @@ -148,7 +148,7 @@ private CompletableFuture<E> sourceAndEvolve(

// Snapshot is made when specifically triggered by an event, or based on the statistics:
if (triggerSnapshot.get() || snapshotPolicy.shouldSnapshot(new EvolutionResult(ec, sourcingTime))) {
snapshotter.store(identifier, entity, postionRef.get());
snapshotter.store(identifier, entity, postionRef.get(), pc);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.axonframework.common.annotation.Internal;
import org.axonframework.eventsourcing.eventstore.Position;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;

import java.util.concurrent.CompletableFuture;

Expand All @@ -39,18 +40,20 @@ public interface Snapshotter<I, E> {
* or the snapshot cannot be used. It may complete exceptionally if snapshot retrieval fails.
*
* @param identifier the identifier of the entity, cannot be {@code null}
* @param context the current {@link ProcessingContext}, cannot be {@code null}
* @return a {@link CompletableFuture} that completes with the entity's snapshot or {@code null}
* @throws NullPointerException if {@code identifier} is {@code null}
*/
CompletableFuture<Snapshot> load(I identifier);
CompletableFuture<Snapshot> load(I identifier, ProcessingContext context);

/**
* Stores the given entity as a snapshot asynchronously.
*
* @param identifier the identifier of the entity, cannot be {@code null}
* @param entity the entity state, cannot be {@code null}
* @param position the position in the event stream for this entity state, cannot be {@code null}
* @param context the current {@link ProcessingContext}, cannot be {@code null}
* @throws NullPointerException if any argument is {@code null}
*/
void store(I identifier, E entity, Position position);
void store(I identifier, E entity, Position position, ProcessingContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.axonframework.eventsourcing.snapshot.api.Snapshot;
import org.axonframework.eventsourcing.snapshot.store.SnapshotStore;
import org.axonframework.messaging.core.QualifiedName;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.jspecify.annotations.Nullable;

import java.util.Map;
Expand All @@ -44,7 +45,8 @@ public class InMemorySnapshotStore implements SnapshotStore {
private final Map<QualifiedName, Map<Object, Snapshot>> entitiesByIdentifierByName = new ConcurrentHashMap<>();

@Override
public CompletableFuture<Void> store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot) {
public CompletableFuture<Void> store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot,
@Nullable ProcessingContext context) {
Objects.requireNonNull(qualifiedName, "The qualifiedName parameter must not be null.");
Objects.requireNonNull(identifier, "The identifier parameter must not be null.");
Objects.requireNonNull(snapshot, "The snapshot parameter must not be null.");
Expand All @@ -57,7 +59,8 @@ public CompletableFuture<Void> store(QualifiedName qualifiedName, Object identif
}

@Override
public CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier) {
public CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier,
@Nullable ProcessingContext context) {
Objects.requireNonNull(qualifiedName, "The qualifiedName parameter must not be null.");
Objects.requireNonNull(identifier, "The identifier parameter must not be null.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.axonframework.common.annotation.Internal;
import org.axonframework.eventsourcing.snapshot.api.Snapshot;
import org.axonframework.messaging.core.QualifiedName;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.jspecify.annotations.Nullable;

import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -48,10 +49,12 @@ public interface SnapshotStore {
* @param qualifiedName the name of the snapshot to persist, cannot be {@code null}
* @param identifier the identifier of the snapshot to persist, cannot be {@code null}
* @param snapshot the snapshot to persist, cannot be {@code null}
* @param context the current {@link ProcessingContext}, if any
* @return a {@link CompletableFuture} that completes when the snapshot has been stored
* @throws NullPointerException if any argument is {@code null}
* @throws NullPointerException if {@code qualifiedName}, {@code identifier}, or {@code snapshot} is {@code null}
*/
CompletableFuture<Void> store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot);
CompletableFuture<Void> store(QualifiedName qualifiedName, Object identifier, Snapshot snapshot,
@Nullable ProcessingContext context);

/**
* Loads the latest snapshot for a given name and identifier.
Expand All @@ -65,8 +68,10 @@ public interface SnapshotStore {
*
* @param qualifiedName the name of the snapshot, cannot be {@code null}
* @param identifier the identifier of the snapshot, cannot be {@code null}
* @param context the current {@link ProcessingContext}, if any
* @return a {@link CompletableFuture} containing the snapshot, or containing {@code null} if no matching snapshot exists
* @throws NullPointerException if any argument is {@code null}
* @throws NullPointerException if {@code qualifiedName} or {@code identifier} is {@code null}
*/
CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier);
CompletableFuture<@Nullable Snapshot> load(QualifiedName qualifiedName, Object identifier,
@Nullable ProcessingContext context);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.axonframework.eventsourcing.snapshot.api.SnapshotPolicy;
import org.axonframework.eventsourcing.snapshot.api.Snapshotter;
import org.axonframework.messaging.core.MessageType;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.axonframework.messaging.eventhandling.GenericEventMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -93,12 +94,12 @@ public StoreBackedSnapshotter(
}

@Override
public CompletableFuture<Snapshot> load(I identifier) {
public CompletableFuture<Snapshot> load(I identifier, ProcessingContext context) {
Objects.requireNonNull(identifier, "The identifier parameter must not be null.");

Snapshot inFlightSnapshot = inFlightSnapshots.get(identifier);
CompletableFuture<Snapshot> loadedSnapshot = inFlightSnapshot == null
? store.load(type.qualifiedName(), identifier)
? store.load(type.qualifiedName(), identifier, context)
: CompletableFuture.completedFuture(inFlightSnapshot);

return loadedSnapshot
Expand All @@ -122,7 +123,8 @@ public CompletableFuture<Snapshot> load(I identifier) {
public void store(
I identifier,
E entity,
Position position
Position position,
ProcessingContext context
) {
Objects.requireNonNull(identifier, "The identifier parameter must not be null.");
Objects.requireNonNull(entity, "The entity parameter must not be null.");
Expand All @@ -131,7 +133,7 @@ public void store(
Snapshot newSnapshot = new Snapshot(position, type.version(), entity, GenericEventMessage.clock.instant(), Map.of());

inFlightSnapshots.put(identifier, newSnapshot);
store.store(type.qualifiedName(), identifier, newSnapshot).whenComplete((voidResult, ex) -> {
store.store(type.qualifiedName(), identifier, newSnapshot, context).whenComplete((voidResult, ex) -> {
// note: only remove inflight snapshot from cache if not replaced concurrently
inFlightSnapshots.remove(identifier, newSnapshot);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,18 @@ protected void shouldIgnoreSnapshotIfVersionUnsupported(@Named("TestAppender") L

QualifiedName qualifiedName = new QualifiedName(Account.class);

snapshotStore.store(
qualifiedName,
ACCOUNT_ID,
new Snapshot(
new GlobalIndexPosition(3),
"42", // unsupported version which leads to this snapshot being ignored
new Account(ACCOUNT_ID, "My Account", 262144),
Instant.now(),
Map.of()
unitOfWorkFactory.create().executeWithResult(pc ->
snapshotStore.store(
qualifiedName,
ACCOUNT_ID,
new Snapshot(
new GlobalIndexPosition(3),
"42", // unsupported version which leads to this snapshot being ignored
new Account(ACCOUNT_ID, "My Account", 262144),
Instant.now(),
Map.of()
),
pc
)
).join();

Expand Down Expand Up @@ -291,15 +294,18 @@ protected void shouldIgnoreExceptionsWhileLoadingSnapshot(@Named("TestAppender")

QualifiedName qualifiedName = new QualifiedName(Account.class);

snapshotStore.store(
qualifiedName,
ACCOUNT_ID,
new Snapshot(
new GlobalIndexPosition(3),
"0.0.1", // correct version
"Junk", // incorrect data, which leads to deserialization error
Instant.now(),
Map.of()
unitOfWorkFactory.create().executeWithResult(pc ->
snapshotStore.store(
qualifiedName,
ACCOUNT_ID,
new Snapshot(
new GlobalIndexPosition(3),
"0.0.1", // correct version
"Junk", // incorrect data, which leads to deserialization error
Instant.now(),
Map.of()
),
pc
)
).join();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.axonframework.eventsourcing.eventstore.Position;
import org.axonframework.eventsourcing.snapshot.api.Snapshot;
import org.axonframework.messaging.core.MessageType;
import org.axonframework.messaging.core.unitofwork.ProcessingContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -53,6 +54,7 @@ class StoreBackedSnapshotterTest {

@Mock private SnapshotStore store;
@Mock private Converter converter;
@Mock private ProcessingContext processingContext;

private StoreBackedSnapshotter<String, String> snapshotter;

Expand All @@ -65,9 +67,9 @@ void beforeEach() {
void loadingSnapshotShouldWork() {
Snapshot expectedSnapshot = new Snapshot(Position.START, "0.1", "payload", Instant.now(), Map.of("answer", "42"));

when(store.load(TYPE.qualifiedName(), "1")).thenReturn(CompletableFuture.completedFuture(expectedSnapshot));
when(store.load(eq(TYPE.qualifiedName()), eq("1"), any(ProcessingContext.class))).thenReturn(CompletableFuture.completedFuture(expectedSnapshot));

assertThat(snapshotter.load("1").join()).isEqualTo(expectedSnapshot);
assertThat(snapshotter.load("1", processingContext).join()).isEqualTo(expectedSnapshot);
}

@Test
Expand All @@ -76,25 +78,25 @@ void loadingSnapshotRequiringConversionShouldWork() {
Snapshot expectedSnapshot = rawSnapshot.payload("payload");

when(converter.convert(0xDEADBEEF, String.class)).thenReturn("payload");
when(store.load(TYPE.qualifiedName(), "1")).thenReturn(CompletableFuture.completedFuture(rawSnapshot));
when(store.load(eq(TYPE.qualifiedName()), eq("1"), any(ProcessingContext.class))).thenReturn(CompletableFuture.completedFuture(rawSnapshot));

assertThat(snapshotter.load("1").join()).isEqualTo(expectedSnapshot);
assertThat(snapshotter.load("1", processingContext).join()).isEqualTo(expectedSnapshot);
}

@Test
void whenSnapshotMissingLoadShouldReturnNull() {
assertThat(snapshotter.load("1").join()).isNull();
assertThat(snapshotter.load("1", processingContext).join()).isNull();
}

@Test
void whenSnapshotHasMismatchingVersionShouldLogMessageAndReturnNull(@Named("TestAppender") ListAppender appender) {
Snapshot snapshot = new Snapshot(Position.START, "42.0", 0xDEADBEEF, Instant.now(), Map.of("answer", "42"));

when(store.load(TYPE.qualifiedName(), "1")).thenReturn(CompletableFuture.completedFuture(snapshot));
when(store.load(eq(TYPE.qualifiedName()), eq("1"), any(ProcessingContext.class))).thenReturn(CompletableFuture.completedFuture(snapshot));

appender.clear();

assertThat(snapshotter.load("1").join()).isNull();
assertThat(snapshotter.load("1", processingContext).join()).isNull();

assertThat(appender.getEvents())
.extracting(LogEvent::getMessage)
Expand All @@ -106,19 +108,19 @@ void whenSnapshotHasMismatchingVersionShouldLogMessageAndReturnNull(@Named("Test
void successfulSnapshotStoreShouldLogNothing(@Named("TestAppender") ListAppender appender) {
appender.clear();

snapshotter.store("1", "payload", Position.START);
snapshotter.store("1", "payload", Position.START, processingContext);

assertThat(appender.getEvents()).isEmpty();
}

@Test
void failureToStoreSnapshotShouldOnlyLogWarning(@Named("TestAppender") ListAppender appender) {
when(store.store(eq(TYPE.qualifiedName()), eq("1"), any(Snapshot.class)))
when(store.store(eq(TYPE.qualifiedName()), eq("1"), any(Snapshot.class), any(ProcessingContext.class)))
.thenReturn(CompletableFuture.failedFuture(new IOException("busy")));

appender.clear();

snapshotter.store("1", "payload", Position.START);
snapshotter.store("1", "payload", Position.START, processingContext);

assertThat(appender.getEvents())
.extracting(LogEvent::getMessage)
Expand Down
Loading