Skip to content

Commit 52ee064

Browse files
committed
GH-1336 - Event Publication Repository V2 implementation for MongoDB.
1 parent b5dfb70 commit 52ee064

File tree

3 files changed

+197
-18
lines changed

3 files changed

+197
-18
lines changed

spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublication.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.jspecify.annotations.Nullable;
2222
import org.springframework.data.annotation.PersistenceCreator;
2323
import org.springframework.data.mongodb.core.mapping.Document;
24+
import org.springframework.modulith.events.EventPublication.Status;
2425
import org.springframework.util.Assert;
2526

2627
/**
@@ -36,8 +37,11 @@ class MongoDbEventPublication {
3637
final Instant publicationDate;
3738
final String listenerId;
3839
final Object event;
40+
final @Nullable Instant lastResubmissionDate;
41+
final int completionAttempts;
3942

4043
@Nullable Instant completionDate;
44+
Status status;
4145

4246
/**
4347
* Creates a new {@link MongoDbEventPublication} for the given id, publication date, listener id, event and completion
@@ -51,7 +55,8 @@ class MongoDbEventPublication {
5155
*/
5256
@PersistenceCreator
5357
MongoDbEventPublication(UUID id, Instant publicationDate, String listenerId, Object event,
54-
@Nullable Instant completionDate) {
58+
@Nullable Instant completionDate, @Nullable Status status, @Nullable Instant lastResubmissionDate,
59+
int completionAttempts) {
5560

5661
Assert.notNull(id, "Id must not be null!");
5762
Assert.notNull(publicationDate, "Publication date must not be null!");
@@ -63,17 +68,9 @@ class MongoDbEventPublication {
6368
this.listenerId = listenerId;
6469
this.event = event;
6570
this.completionDate = completionDate;
66-
}
67-
68-
/**
69-
* Creates a new {@link MongoDbEventPublication} for the given publication date, listener id and event.
70-
*
71-
* @param publicationDate must not be {@literal null}.
72-
* @param listenerId must not be {@literal null}.
73-
* @param event must not be {@literal null}.
74-
*/
75-
MongoDbEventPublication(UUID id, Instant publicationDate, String listenerId, Object event) {
76-
this(id, publicationDate, listenerId, event, null);
71+
this.status = status != null ? status : completionDate != null ? Status.COMPLETED : Status.PROCESSING;
72+
this.lastResubmissionDate = lastResubmissionDate;
73+
this.completionAttempts = completionAttempts;
7774
}
7875

7976
/**
@@ -85,7 +82,10 @@ class MongoDbEventPublication {
8582
MongoDbEventPublication markCompleted(Instant instant) {
8683

8784
Assert.notNull(instant, "Instant must not be null!");
85+
8886
this.completionDate = instant;
87+
this.status = Status.COMPLETED;
88+
8989
return this;
9090
}
9191
}

spring-modulith-events/spring-modulith-events-mongodb/src/main/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepository.java

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.data.mongodb.core.query.Query;
3737
import org.springframework.data.mongodb.core.query.Update;
3838
import org.springframework.data.util.TypeInformation;
39+
import org.springframework.modulith.events.EventPublication.Status;
3940
import org.springframework.modulith.events.core.EventPublicationRepository;
4041
import org.springframework.modulith.events.core.PublicationTargetIdentifier;
4142
import org.springframework.modulith.events.core.TargetEventPublication;
@@ -58,6 +59,10 @@ class MongoDbEventPublicationRepository implements EventPublicationRepository {
5859
private static final String ID = "id";
5960
private static final String LISTENER_ID = "listenerId";
6061
private static final String PUBLICATION_DATE = "publicationDate";
62+
private static final String STATUS = "status";
63+
private static final String COMPLETION_ATTEMPTS = "completionAttempts";
64+
private static final String LAST_RESUBMISSION_DATE = "lastResubmissionDate";
65+
6166
private static final Sort DEFAULT_SORT = Sort.by(PUBLICATION_DATE).ascending();
6267

6368
static final String ARCHIVE_COLLECTION = "event_publication_archive";
@@ -144,6 +149,36 @@ public void markCompleted(UUID identifier, Instant completionDate) {
144149
}
145150
}
146151

152+
/*
153+
* (non-Javadoc)
154+
* @see org.springframework.modulith.events.core.EventPublicationRepository#markFailed(java.util.UUID)
155+
*/
156+
@Override
157+
public void markFailed(UUID identifier) {
158+
159+
var query = query(where(ID).is(identifier).and(STATUS).ne(Status.FAILED));
160+
var update = Update.update(STATUS, Status.FAILED);
161+
162+
mongoTemplate.findAndModify(query, update, MongoDbEventPublication.class, collection);
163+
}
164+
165+
/*
166+
* (non-Javadoc)
167+
* @see org.springframework.modulith.events.core.EventPublicationRepository#markResubmitted(java.util.UUID, java.time.Instant)
168+
*/
169+
@Override
170+
public boolean markResubmitted(UUID identifier, Instant resubmissionDate) {
171+
172+
var query = query(where(ID).is(identifier).and(STATUS).ne(Status.RESUBMITTED));
173+
var update = Update.update(STATUS, Status.RESUBMITTED)
174+
.inc(COMPLETION_ATTEMPTS, 1)
175+
.set(LAST_RESUBMISSION_DATE, resubmissionDate);
176+
177+
var result = mongoTemplate.updateFirst(query, update, MongoDbEventPublication.class, collection);
178+
179+
return result.getModifiedCount() == 1;
180+
}
181+
147182
/*
148183
* (non-Javadoc)
149184
* @see org.springframework.modulith.events.core.EventPublicationRepository#findIncompletePublications()
@@ -188,6 +223,48 @@ public List<TargetEventPublication> findCompletedPublications() {
188223
return readMapped(defaultQuery(where(COMPLETION_DATE).ne(null)), archiveCollection);
189224
}
190225

226+
/*
227+
* (non-Javadoc)
228+
* @see org.springframework.modulith.events.core.EventPublicationRepository#findFailedPublications(org.springframework.modulith.events.core.EventPublicationRepository.FailedCriteria)
229+
*/
230+
@Override
231+
public List<TargetEventPublication> findFailedPublications(FailedCriteria criteria) {
232+
233+
var statusFailed = where(STATUS).is(Status.FAILED);
234+
var noStatusAndCompletionDate = where(STATUS).isNull().and(COMPLETION_DATE).isNull();
235+
var baseCriteria = new Criteria().orOperator(statusFailed, noStatusAndCompletionDate);
236+
237+
// Apply date delimiter
238+
var reference = criteria.getPublicationDateReference();
239+
240+
if (reference != null) {
241+
baseCriteria.and(PUBLICATION_DATE).lt(reference);
242+
}
243+
244+
// Apply limit
245+
var limit = criteria.getMaxItemsToRead();
246+
247+
if (limit > Integer.MAX_VALUE) {
248+
throw new IllegalArgumentException("Number of items to read needs to fit into an integer!");
249+
}
250+
251+
return readMapped(defaultQuery(baseCriteria).limit((int) limit));
252+
}
253+
254+
/*
255+
* (non-Javadoc)
256+
* @see org.springframework.modulith.events.core.EventPublicationRepository#countByStatus(org.springframework.modulith.events.EventPublication.Status)
257+
*/
258+
@Override
259+
public int countByStatus(Status status) {
260+
261+
var collection = status == Status.COMPLETED && completionMode == CompletionMode.ARCHIVE
262+
? archiveCollection
263+
: this.collection;
264+
265+
return (int) mongoTemplate.count(query(where(STATUS).is(status)), MongoDbEventPublication.class, collection);
266+
}
267+
191268
/*
192269
* (non-Javadoc)
193270
* @see org.springframework.modulith.events.core.EventPublicationRepository#deletePublications(java.util.List)
@@ -250,7 +327,11 @@ private static MongoDbEventPublication domainToDocument(TargetEventPublication p
250327
publication.getIdentifier(), //
251328
publication.getPublicationDate(), //
252329
publication.getTargetIdentifier().getValue(), //
253-
publication.getEvent());
330+
publication.getEvent(), //
331+
publication.getCompletionDate().orElse(null), //
332+
publication.getStatus(), //
333+
publication.getLastResubmissionDate(), //
334+
publication.getCompletionAttempts());
254335
}
255336

256337
private static TargetEventPublication documentToDomain(MongoDbEventPublication document) {
@@ -323,22 +404,27 @@ public boolean isPublicationCompleted() {
323404

324405
@Override
325406
public void markCompleted(Instant instant) {
326-
this.publication.completionDate = instant;
407+
this.publication.markCompleted(instant);
327408
}
328409

329410
@Override
330411
public Status getStatus() {
412+
413+
if (publication.status != null) {
414+
return publication.status;
415+
}
416+
331417
return publication.completionDate != null ? Status.COMPLETED : Status.PUBLISHED;
332418
}
333419

334420
@Override
335421
public int getCompletionAttempts() {
336-
return 1;
422+
return publication.completionAttempts;
337423
}
338424

339425
@Override
340426
public @Nullable Instant getLastResubmissionDate() {
341-
return null;
427+
return publication.lastResubmissionDate;
342428
}
343429

344430
/*

spring-modulith-events/spring-modulith-events-mongodb/src/test/java/org/springframework/modulith/events/mongodb/MongoDbEventPublicationRepositoryTest.java

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.springframework.context.annotation.Import;
3636
import org.springframework.core.env.Environment;
3737
import org.springframework.data.mongodb.core.MongoTemplate;
38+
import org.springframework.modulith.events.EventPublication.Status;
39+
import org.springframework.modulith.events.core.EventPublicationRepository.FailedCriteria;
3840
import org.springframework.modulith.events.core.PublicationTargetIdentifier;
3941
import org.springframework.modulith.events.core.TargetEventPublication;
4042
import org.springframework.modulith.events.support.CompletionMode;
@@ -299,6 +301,88 @@ void deletesPublicationsByIdentifier() {
299301
.matches(it -> it.getEvent().equals(second.getEvent()));
300302
}
301303

304+
@Test // GH-1336
305+
void looksUpFailedPublication() {
306+
307+
var event = new TestEvent("first");
308+
var publication = createPublication(event);
309+
310+
repository.markFailed(publication.getIdentifier());
311+
312+
assertThat(repository.findFailedPublications(FailedCriteria.ALL))
313+
.extracting(TargetEventPublication::getIdentifier)
314+
.containsExactly(publication.getIdentifier());
315+
}
316+
317+
@Test // GH-1336
318+
void claimsResubmissionOnce() {
319+
320+
var event = new TestEvent("first");
321+
var publication = createPublication(event);
322+
323+
repository.markFailed(publication.getIdentifier());
324+
325+
var now = Instant.now();
326+
327+
assertThat(repository.markResubmitted(publication.getIdentifier(), now)).isTrue();
328+
assertThat(repository.markResubmitted(publication.getIdentifier(), now)).isFalse();
329+
}
330+
331+
@Test // GH-1336
332+
void countsByStatus() {
333+
334+
var event = new TestEvent("first");
335+
var publication = createPublication(event);
336+
337+
assertOneByStatus(Status.PUBLISHED);
338+
339+
repository.markFailed(publication.getIdentifier());
340+
assertOneByStatus(Status.FAILED);
341+
342+
repository.markResubmitted(publication.getIdentifier(), Instant.now());
343+
assertOneByStatus(Status.RESUBMITTED);
344+
}
345+
346+
@Test // GH-1336
347+
void marksPublicationAsProcessing() {
348+
349+
var event = new TestEvent("first");
350+
var publication = createPublication(event);
351+
352+
repository.markProcessing(publication.getIdentifier());
353+
}
354+
355+
@Test // GH-1336
356+
void looksUpFailedPublicationInBatch() {
357+
358+
var event = new TestEvent("first");
359+
var publication = createPublication(event);
360+
361+
repository.markFailed(publication.getIdentifier());
362+
363+
assertThat(repository.findFailedPublications(FailedCriteria.ALL.withItemsToRead(10)))
364+
.extracting(TargetEventPublication::getIdentifier)
365+
.containsExactly(publication.getIdentifier());
366+
}
367+
368+
@Test // GH-1321
369+
void looksUpFailedPublicationWithReferenceDate() throws Exception {
370+
371+
var event = new TestEvent("first");
372+
var publication = createPublication(event);
373+
374+
repository.markFailed(publication.getIdentifier());
375+
376+
Thread.sleep(200);
377+
378+
var criteria = FailedCriteria.ALL
379+
.withPublicationsPublishedBefore(publication.getPublicationDate().plusMillis(50));
380+
381+
assertThat(repository.findFailedPublications(criteria))
382+
.extracting(TargetEventPublication::getIdentifier)
383+
.containsExactly(publication.getIdentifier());
384+
}
385+
302386
private TargetEventPublication createPublication(Object event) {
303387
return createPublication(event, TARGET_IDENTIFIER);
304388
}
@@ -309,8 +393,17 @@ private TargetEventPublication createPublication(Object event, PublicationTarget
309393

310394
private void savePublicationAt(LocalDateTime date) {
311395

312-
mongoTemplate.save(
313-
new MongoDbEventPublication(UUID.randomUUID(), date.toInstant(ZoneOffset.UTC), "", "", null));
396+
var now = date.toInstant(ZoneOffset.UTC);
397+
var publication = new MongoDbEventPublication(UUID.randomUUID(), now, "", "", null, Status.PUBLISHED, now, 1);
398+
399+
mongoTemplate.save(publication);
400+
}
401+
402+
private void assertOneByStatus(Status reference) {
403+
404+
for (var status : Status.values()) {
405+
assertThat(repository.countByStatus(status)).isEqualTo(status == reference ? 1 : 0);
406+
}
314407
}
315408
}
316409

0 commit comments

Comments
 (0)