Skip to content

Commit 5788855

Browse files
meistermeierodrotbohm
authored andcommitted
GH-1337 - Add Neo4j support for revamped event publication registry.
Signed-off-by: Gerrit Meier <meistermeier@gmail.com>
1 parent f8c5f36 commit 5788855

File tree

4 files changed

+283
-19
lines changed

4 files changed

+283
-19
lines changed

spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublication.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.UUID;
2020

2121
import org.jspecify.annotations.Nullable;
22+
import org.springframework.modulith.events.EventPublication.Status;
2223

2324
/**
2425
* The event publication entity definition.
@@ -33,17 +34,24 @@ class Neo4jEventPublication {
3334
public final String listenerId;
3435
public final Object event;
3536
public final String eventHash;
37+
public final int completionAttempts;
3638

3739
public @Nullable Instant completionDate;
40+
public @Nullable Instant lastResubmissionDate;
41+
public Status status;
3842

3943
public Neo4jEventPublication(UUID identifier, Instant publicationDate, String listenerId, Object event,
40-
String eventHash, @Nullable Instant completionDate) {
44+
String eventHash, @Nullable Instant completionDate, Status status, int completionAttempts,
45+
@Nullable Instant lastResubmissionDate) {
4146

4247
this.identifier = identifier;
4348
this.publicationDate = publicationDate;
4449
this.listenerId = listenerId;
4550
this.event = event;
4651
this.eventHash = eventHash;
4752
this.completionDate = completionDate;
53+
this.status = status;
54+
this.lastResubmissionDate = lastResubmissionDate;
55+
this.completionAttempts = completionAttempts;
4856
}
4957
}

spring-modulith-events/spring-modulith-events-neo4j/src/main/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepository.java

Lines changed: 161 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.time.ZoneOffset;
2222
import java.util.ArrayList;
2323
import java.util.Collection;
24+
import java.util.HashMap;
2425
import java.util.List;
2526
import java.util.Map;
2627
import java.util.Objects;
@@ -41,6 +42,7 @@
4142
import org.neo4j.driver.types.TypeSystem;
4243
import org.springframework.data.neo4j.core.Neo4jClient;
4344
import org.springframework.data.util.Lazy;
45+
import org.springframework.modulith.events.EventPublication.Status;
4446
import org.springframework.modulith.events.core.EventPublicationRepository;
4547
import org.springframework.modulith.events.core.EventSerializer;
4648
import org.springframework.modulith.events.core.PublicationTargetIdentifier;
@@ -68,9 +70,15 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
6870
private static final String LISTENER_ID = "listenerId";
6971
private static final String PUBLICATION_DATE = "publicationDate";
7072
private static final String COMPLETION_DATE = "completionDate";
73+
private static final String STATUS = "status";
74+
private static final String COMPLETION_ATTEMPTS = "completionAttempts";
75+
private static final String LAST_RESUBMISSION_DATE = "lastResubmissionDate";
76+
77+
// return references
78+
private static final String STATUS_COUNT = "statusCount";
7179

7280
private static final Collection<String> ALL_PROPERTIES = List.of(ID, EVENT_SERIALIZED, EVENT_HASH, EVENT_TYPE,
73-
LISTENER_ID, PUBLICATION_DATE, COMPLETION_DATE);
81+
LISTENER_ID, PUBLICATION_DATE, COMPLETION_DATE, STATUS, COMPLETION_ATTEMPTS, LAST_RESUBMISSION_DATE);
7482

7583
private static final Node EVENT_PUBLICATION_NODE = node("Neo4jEventPublication")
7684
.named("neo4jEventPublication");
@@ -121,7 +129,7 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
121129
.set(EVENT_PUBLICATION_NODE.property(EVENT_TYPE).to(parameter(EVENT_TYPE)))
122130
.set(EVENT_PUBLICATION_NODE.property(LISTENER_ID).to(parameter(LISTENER_ID)))
123131
.set(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).to(parameter(PUBLICATION_DATE)))
124-
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
132+
.set(EVENT_PUBLICATION_NODE.property(STATUS).to(parameter(STATUS)))
125133
.build();
126134

127135
private static final Statement COMPLETE_STATEMENT = match(EVENT_PUBLICATION_NODE)
@@ -131,24 +139,48 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
131139
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).to(parameter(COMPLETION_DATE)))
132140
.build();
133141

134-
private static final Lazy<Statement> COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = Lazy
135-
.of(() -> applyProperties(match(EVENT_PUBLICATION_NODE)
136-
.where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID)))
142+
private static final Statement FIND_BY_STATUS_STATEMENT = match(EVENT_PUBLICATION_NODE)
143+
.where(EVENT_PUBLICATION_NODE.property(STATUS).eq(parameter(STATUS)))
144+
.returning(EVENT_PUBLICATION_NODE)
145+
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
146+
.build();
147+
148+
private static final Statement COUNT_BY_STATUS_STATEMENT = match(EVENT_PUBLICATION_NODE)
149+
.where(EVENT_PUBLICATION_NODE.property(STATUS).eq(parameter(STATUS)))
150+
.returning(count(EVENT_PUBLICATION_NODE).as(STATUS_COUNT))
151+
.build();
152+
153+
private static final Statement UPDATE_STATUS_STATEMENT = match(EVENT_PUBLICATION_NODE)
154+
.where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID)))
155+
.and(EVENT_PUBLICATION_NODE.property(STATUS).ne(parameter(STATUS)))
156+
.set(EVENT_PUBLICATION_NODE.property(STATUS).to(parameter(STATUS)))
157+
.build();
158+
159+
private static final Statement RESUBMIT_STATEMENT = match(EVENT_PUBLICATION_NODE)
160+
.where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID)))
161+
.and(EVENT_PUBLICATION_NODE.property(STATUS).ne(literalOf(Status.RESUBMITTED.name())))
162+
.set(EVENT_PUBLICATION_NODE.property(STATUS).to(parameter(STATUS)))
163+
.set(EVENT_PUBLICATION_NODE.property(COMPLETION_ATTEMPTS)
164+
.to(EVENT_PUBLICATION_NODE.property(COMPLETION_ATTEMPTS).add(literalOf(1))))
165+
.set(EVENT_PUBLICATION_NODE.property(LAST_RESUBMISSION_DATE).to(parameter(LAST_RESUBMISSION_DATE)))
166+
.build();
167+
168+
private static final Lazy<Statement> COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = Lazy.of(
169+
() -> applyProperties(match(EVENT_PUBLICATION_NODE).where(EVENT_PUBLICATION_NODE.property(ID).eq(parameter(ID)))
137170
.and(not(exists(match(EVENT_PUBLICATION_ARCHIVE_NODE)
138171
.where(EVENT_PUBLICATION_ARCHIVE_NODE.property(ID).eq(parameter(ID)))
139172
.returning(literalTrue()).build())))
140173
.with(EVENT_PUBLICATION_NODE)));
141174

142175
private static final Lazy<Statement> COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT = Lazy
143-
.of(() -> applyProperties(
144-
match(EVENT_PUBLICATION_NODE)
145-
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
146-
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
147-
.and(not(exists(match(EVENT_PUBLICATION_ARCHIVE_NODE)
148-
.where(EVENT_PUBLICATION_ARCHIVE_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
149-
.and(EVENT_PUBLICATION_ARCHIVE_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
150-
.returning(literalTrue()).build())))
151-
.with(EVENT_PUBLICATION_NODE)));
176+
.of(() -> applyProperties(match(EVENT_PUBLICATION_NODE)
177+
.where(EVENT_PUBLICATION_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
178+
.and(EVENT_PUBLICATION_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
179+
.and(not(exists(match(EVENT_PUBLICATION_ARCHIVE_NODE)
180+
.where(EVENT_PUBLICATION_ARCHIVE_NODE.property(EVENT_HASH).eq(parameter(EVENT_HASH)))
181+
.and(EVENT_PUBLICATION_ARCHIVE_NODE.property(LISTENER_ID).eq(parameter(LISTENER_ID)))
182+
.returning(literalTrue()).build())))
183+
.with(EVENT_PUBLICATION_NODE)));
152184

153185
private static Statement applyProperties(OrderableOngoingReadingAndWithWithoutWhere source) {
154186

@@ -238,7 +270,7 @@ public TargetEventPublication create(TargetEventPublication publication) {
238270
EVENT_TYPE, eventType,
239271
LISTENER_ID, listenerId,
240272
PUBLICATION_DATE, Values.value(publicationDate.atOffset(ZoneOffset.UTC)),
241-
COMPLETION_DATE, Values.NULL))
273+
STATUS, publication.getStatus().name()))
242274
.run();
243275

244276
return publication;
@@ -396,7 +428,9 @@ public void deletePublications(List<UUID> identifiers) {
396428
@Override
397429
@Transactional
398430
public void deleteCompletedPublications() {
399-
neo4jClient.query(renderer.render(deleteCompletedStatement)).run();
431+
432+
neo4jClient.query(renderer.render(deleteCompletedStatement))
433+
.run();
400434
}
401435

402436
/*
@@ -412,6 +446,110 @@ public void deleteCompletedPublicationsBefore(Instant instant) {
412446
.run();
413447
}
414448

449+
/*
450+
* (non-Javadoc)
451+
* @see org.springframework.modulith.events.core.EventPublicationRepository#countByStatus(org.springframework.modulith.events.EventPublication.Status)
452+
*/
453+
@Override
454+
public int countByStatus(Status status) {
455+
456+
return neo4jClient.query(renderer.render(COUNT_BY_STATUS_STATEMENT))
457+
.bind(status.name()).to(STATUS)
458+
.fetchAs(Integer.class)
459+
.one()
460+
.get();
461+
}
462+
463+
/*
464+
* (non-Javadoc)
465+
* @see org.springframework.modulith.events.core.EventPublicationRepository#findByStatus(org.springframework.modulith.events.EventPublication.Status)
466+
*/
467+
@Override
468+
public List<TargetEventPublication> findByStatus(Status status) {
469+
470+
return List.copyOf(neo4jClient.query(renderer.render(FIND_BY_STATUS_STATEMENT))
471+
.bind(status.name()).to(STATUS)
472+
.fetchAs(TargetEventPublication.class)
473+
.mappedBy(status == Status.COMPLETED ? completeMapping() : incompleteMapping())
474+
.all());
475+
}
476+
477+
@Override
478+
public List<TargetEventPublication> findFailedPublications(FailedCriteria criteria) {
479+
480+
var parameters = new HashMap<String, Object>();
481+
482+
// in place CypherDSL usage because of conditional
483+
var match = match(EVENT_PUBLICATION_NODE)
484+
.where(EVENT_PUBLICATION_NODE.property(STATUS).eq(literalOf(Status.FAILED.name())))
485+
.or(EVENT_PUBLICATION_NODE.property(STATUS).isNull()
486+
.and(EVENT_PUBLICATION_NODE.property(COMPLETION_DATE).isNull()));
487+
488+
var instant = criteria.getPublicationDateReference();
489+
490+
if (instant != null) {
491+
492+
match = match.and(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE).lt(parameter(PUBLICATION_DATE)));
493+
494+
parameters.put(PUBLICATION_DATE, Values.value(instant.atOffset(ZoneOffset.UTC)));
495+
}
496+
497+
var limit = criteria.getMaxItemsToRead();
498+
var builder = match.returning(EVENT_PUBLICATION_NODE)
499+
.orderBy(EVENT_PUBLICATION_NODE.property(PUBLICATION_DATE))
500+
.ascending();
501+
502+
var statement = limit != -1 ? builder.limit(limit) : builder;
503+
504+
return List.copyOf(neo4jClient.query(renderer.render(statement.build()))
505+
.bindAll(parameters)
506+
.fetchAs(TargetEventPublication.class)
507+
.mappedBy(incompleteMapping())
508+
.all());
509+
}
510+
511+
/*
512+
* (non-Javadoc)
513+
* @see org.springframework.modulith.events.core.EventPublicationRepository#markFailed(java.util.UUID)
514+
*/
515+
@Override
516+
public void markFailed(UUID identifier) {
517+
518+
neo4jClient.query(renderer.render(UPDATE_STATUS_STATEMENT))
519+
.bind(Values.value(identifier.toString())).to(ID)
520+
.bind(Status.FAILED.name()).to(STATUS)
521+
.run();
522+
}
523+
524+
/*
525+
* (non-Javadoc)
526+
* @see org.springframework.modulith.events.core.EventPublicationRepository#markProcessing(java.util.UUID)
527+
*/
528+
@Override
529+
public void markProcessing(UUID identifier) {
530+
531+
neo4jClient.query(renderer.render(UPDATE_STATUS_STATEMENT))
532+
.bind(Values.value(identifier.toString())).to(ID)
533+
.bind(Status.PROCESSING.name()).to(STATUS)
534+
.run();
535+
}
536+
537+
/*
538+
* (non-Javadoc)
539+
* @see org.springframework.modulith.events.core.EventPublicationRepository#markResubmitted(java.util.UUID, java.time.Instant)
540+
*/
541+
@Override
542+
public boolean markResubmitted(UUID identifier, Instant resubmissionDate) {
543+
544+
var update = neo4jClient.query(renderer.render(RESUBMIT_STATEMENT))
545+
.bind(Values.value(identifier.toString())).to(ID)
546+
.bind(Status.RESUBMITTED.name()).to(STATUS)
547+
.bind(Values.value(resubmissionDate.atOffset(ZoneOffset.UTC))).to(LAST_RESUBMISSION_DATE)
548+
.run();
549+
550+
return update.counters().propertiesSet() > 1;
551+
}
552+
415553
private BiFunction<TypeSystem, org.neo4j.driver.Record, TargetEventPublication> incompleteMapping() {
416554
return (typeSystem, driverRecord) -> mapRecordToPublication(typeSystem, driverRecord, EVENT_PUBLICATION_NODE);
417555
}
@@ -431,12 +569,17 @@ private Neo4jEventPublicationAdapter mapRecordToPublication(TypeSystem typeSyste
431569
var eventHash = publicationNode.get(EVENT_HASH).asString();
432570
var eventType = publicationNode.get(EVENT_TYPE).asString();
433571
var completionDate = publicationNode.get(COMPLETION_DATE);
572+
var status = publicationNode.get(STATUS).asString();
573+
var completionAttempts = publicationNode.get(COMPLETION_ATTEMPTS);
574+
var lastResubmissionDate = publicationNode.get(LAST_RESUBMISSION_DATE);
434575

435576
try {
436577

437578
var event = eventSerializer.deserialize(eventSerialized, Class.forName(eventType));
438-
var publication = new Neo4jEventPublication(identifier, publicationDate, listenerId, event,
439-
eventHash, completionDate.isNull() ? null : completionDate.asZonedDateTime().toInstant());
579+
var publication = new Neo4jEventPublication(identifier, publicationDate, listenerId, event, eventHash,
580+
completionDate.isNull() ? null : completionDate.asZonedDateTime().toInstant(),
581+
Status.valueOf(status), completionAttempts != Values.NULL ? completionAttempts.asInt() : 0,
582+
lastResubmissionDate.isNull() ? null : lastResubmissionDate.asZonedDateTime().toInstant());
440583

441584
return new Neo4jEventPublicationAdapter(publication);
442585

spring-modulith-events/spring-modulith-events-neo4j/src/test/java/org/springframework/modulith/events/neo4j/Neo4jEventPublicationRepositoryTest.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import org.springframework.context.annotation.Configuration;
3838
import org.springframework.context.annotation.Import;
3939
import org.springframework.core.env.Environment;
40+
import org.springframework.modulith.events.EventPublication;
41+
import org.springframework.modulith.events.core.EventPublicationRepository;
4042
import org.springframework.modulith.events.core.EventSerializer;
4143
import org.springframework.modulith.events.core.PublicationTargetIdentifier;
4244
import org.springframework.modulith.events.core.TargetEventPublication;
@@ -295,6 +297,101 @@ void marksPublicationAsCompletedById() {
295297
}
296298
}
297299

300+
@Test // GH-1337
301+
void countsByStatus() {
302+
303+
var event = new TestEvent("first");
304+
var publication = createPublication(event);
305+
306+
assertOneByStatus(EventPublication.Status.PUBLISHED);
307+
308+
repository.markFailed(publication.getIdentifier());
309+
assertOneByStatus(EventPublication.Status.FAILED);
310+
311+
var resubmitted = repository.markResubmitted(publication.getIdentifier(), Instant.now());
312+
313+
assertThat(resubmitted).isTrue();
314+
assertOneByStatus(EventPublication.Status.RESUBMITTED);
315+
}
316+
317+
@Test // GH-1337
318+
void looksUpFailedPublication() {
319+
320+
var event = new TestEvent("first");
321+
var publication = createPublication(event);
322+
323+
repository.markFailed(publication.getIdentifier());
324+
325+
assertThat(repository.findFailedPublications(EventPublicationRepository.FailedCriteria.ALL))
326+
.extracting(TargetEventPublication::getIdentifier)
327+
.containsExactly(publication.getIdentifier());
328+
}
329+
330+
@Test // GH-1337
331+
void claimsResubmissionOnce() {
332+
333+
var event = new TestEvent("first");
334+
var publication = createPublication(event);
335+
336+
repository.markFailed(publication.getIdentifier());
337+
338+
var now = Instant.now();
339+
340+
assertThat(repository.markResubmitted(publication.getIdentifier(), now)).isTrue();
341+
assertThat(repository.markResubmitted(publication.getIdentifier(), now)).isFalse();
342+
}
343+
344+
@Test // GH-1337
345+
void marksPublicationAsProcessing() {
346+
347+
var event = new TestEvent("first");
348+
var publication = createPublication(event);
349+
350+
repository.markProcessing(publication.getIdentifier());
351+
}
352+
353+
@Test // GH-1337
354+
void looksUpFailedPublicationInBatch() {
355+
356+
var event = new TestEvent("first");
357+
var publication = createPublication(event);
358+
359+
repository.markFailed(publication.getIdentifier());
360+
361+
assertThat(repository.findFailedPublications(EventPublicationRepository.FailedCriteria.ALL.withItemsToRead(10)))
362+
.extracting(TargetEventPublication::getIdentifier)
363+
.containsExactly(publication.getIdentifier());
364+
}
365+
366+
@Test // GH-1337
367+
void looksUpFailedPublicationWithReferenceDate() throws Exception {
368+
369+
var event = new TestEvent("first");
370+
var publication = createPublication(event);
371+
372+
repository.markFailed(publication.getIdentifier());
373+
374+
Thread.sleep(200);
375+
376+
var criteria = EventPublicationRepository.FailedCriteria.ALL
377+
.withPublicationsPublishedBefore(publication.getPublicationDate().plusMillis(50));
378+
379+
assertThat(repository.findFailedPublications(criteria))
380+
.extracting(TargetEventPublication::getIdentifier)
381+
.containsExactly(publication.getIdentifier());
382+
}
383+
384+
private void assertOneByStatus(EventPublication.Status reference) {
385+
386+
for (var status : EventPublication.Status.values()) {
387+
388+
var expected = status == reference ? 1 : 0;
389+
390+
assertThat(repository.findByStatus(status).size()).isEqualTo(expected);
391+
assertThat(repository.countByStatus(status)).isEqualTo(expected);
392+
}
393+
}
394+
298395
private TargetEventPublication createPublication(Object event) {
299396

300397
var token = event.toString();

0 commit comments

Comments
 (0)