Skip to content

Commit b5dfb70

Browse files
committed
GH-1321 - Fix application of FailedCriteria in failed publication lookups.
We now correctly build up limit clauses for database requiring the ANSI syntax. Also, we convert the reference Instant into a Timestamp before binding it as parameter value.
1 parent 37dbae8 commit b5dfb70

File tree

5 files changed

+76
-32
lines changed

5 files changed

+76
-32
lines changed

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistry.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import java.time.Clock;
1919
import java.time.Duration;
20-
import java.time.Instant;
2120
import java.util.Collection;
2221
import java.util.Iterator;
2322
import java.util.Map;
@@ -35,6 +34,7 @@
3534
import org.springframework.modulith.events.EventPublication;
3635
import org.springframework.modulith.events.EventPublication.Status;
3736
import org.springframework.modulith.events.ResubmissionOptions;
37+
import org.springframework.modulith.events.core.EventPublicationRepository.FailedCriteria;
3838
import org.springframework.transaction.annotation.Propagation;
3939
import org.springframework.transaction.annotation.Transactional;
4040
import org.springframework.util.Assert;
@@ -234,18 +234,9 @@ public void processFailedPublications(ResubmissionOptions options, Consumer<Targ
234234
return;
235235
}
236236

237-
var criteria = new EventPublicationRepository.FailedCriteria() {
238-
239-
@Override
240-
public Instant getPublicationDateReference() {
241-
return clock.instant().minus(options.getMinAge());
242-
}
243-
244-
@Override
245-
public int getMaxItemsToRead() {
246-
return Math.min(options.getBatchSize(), options.getBatchSize() - currentlyResubmitted);
247-
}
248-
};
237+
var criteria = FailedCriteria.ALL
238+
.withPublicationsPublishedBefore(clock.instant().minus(options.getMinAge()))
239+
.withItemsToRead(Math.min(options.getBatchSize(), options.getBatchSize() - currentlyResubmitted));
249240

250241
processPublications(events.findFailedPublications(criteria), options.getFilter(), consumer);
251242
}

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/EventPublicationRepository.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -209,20 +209,17 @@ default int countByStatus(Status status) {
209209
return 0;
210210
}
211211

212-
interface FailedCriteria {
212+
static class FailedCriteria {
213213

214-
public static FailedCriteria ALL = new FailedCriteria() {
214+
public static FailedCriteria ALL = new FailedCriteria(-1, null);
215215

216-
@Override
217-
public int getMaxItemsToRead() {
218-
return -1;
219-
}
216+
private final long maxItemsToRead;
217+
private final @Nullable Instant publicationDateReference;
220218

221-
@Override
222-
public @Nullable Instant getPublicationDateReference() {
223-
return null;
224-
}
225-
};
219+
private FailedCriteria(long maxItemsToRead, @Nullable Instant publicationDateReference) {
220+
this.maxItemsToRead = maxItemsToRead;
221+
this.publicationDateReference = publicationDateReference;
222+
}
226223

227224
/**
228225
* The reference date to use as cutoff when selecting failed
@@ -231,12 +228,24 @@ public int getMaxItemsToRead() {
231228
* @return can be {@literal null}.
232229
*/
233230
@Nullable
234-
Instant getPublicationDateReference();
231+
public Instant getPublicationDateReference() {
232+
return publicationDateReference;
233+
}
234+
235+
public FailedCriteria withPublicationsPublishedBefore(Instant reference) {
236+
return new FailedCriteria(maxItemsToRead, reference);
237+
}
235238

236239
/**
237240
* The number of {@link org.springframework.modulith.events.EventPublication}s to read. Return -1 to indicate you
238241
* want to read all items.
239242
*/
240-
int getMaxItemsToRead();
243+
public long getMaxItemsToRead() {
244+
return maxItemsToRead;
245+
}
246+
247+
public FailedCriteria withItemsToRead(long itemsToRead) {
248+
return new FailedCriteria(itemsToRead, publicationDateReference);
249+
}
241250
}
242251
}

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/DatabaseType.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,11 @@ UUID databaseToUUID(Object id) {
8787
boolean isSchemaSupported() {
8888
return false;
8989
}
90+
91+
@Override
92+
String getLimitClause(long limit) {
93+
return " OFFSET 0 ROWS FETCH NEXT " + limit + " ROWS ONLY";
94+
}
9095
},
9196

9297
ORACLE("oracle", "Oracle") {
@@ -105,6 +110,11 @@ UUID databaseToUUID(Object id) {
105110
boolean isSchemaSupported() {
106111
return false;
107112
}
113+
114+
@Override
115+
String getLimitClause(long limit) {
116+
return " FETCH FIRST " + limit + " ROWS ONLY";
117+
}
108118
};
109119

110120
static final String SCHEMA_NOT_SUPPORTED = "Setting the schema name is not supported!";
@@ -143,6 +153,10 @@ String getArchiveSchemaResourceFilename(boolean legacy) {
143153
return getSchemaBase(legacy) + "-archive.sql";
144154
}
145155

156+
String getLimitClause(long limit) {
157+
return " LIMIT " + limit;
158+
}
159+
146160
private String getSchemaBase(boolean legacy) {
147161
return "/schemas/" + (legacy ? "v1" : "v2") + "/schema-" + value;
148162
}

spring-modulith-events/spring-modulith-events-jdbc/src/main/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryV2.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -533,19 +533,18 @@ public List<TargetEventPublication> findFailedPublications(FailedCriteria criter
533533
if (instant != null) {
534534

535535
sql += """
536-
AND PUBLICATION_DATE < ?
536+
AND PUBLICATION_DATE < ?
537537
""";
538538

539-
args.add(instant);
539+
args.add(Timestamp.from(instant));
540540
}
541541

542+
sql += " ORDER BY PUBLICATION_DATE ASC";
543+
542544
var itemsToRead = criteria.getMaxItemsToRead();
543545

544546
if (itemsToRead != -1) {
545-
546-
sql += """
547-
LIMIT %s
548-
""".formatted(itemsToRead);
547+
sql += settings.getDatabaseType().getLimitClause(itemsToRead);
549548
}
550549

551550
var result = operations.query(asOneLine(sql), this::resultSetToPublications, args.toArray());

spring-modulith-events/spring-modulith-events-jdbc/src/test/java/org/springframework/modulith/events/jdbc/JdbcEventPublicationRepositoryV2IntegrationTests.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,37 @@ void marksPublicationAsProcessing() {
439439
repository.markProcessing(publication.getIdentifier());
440440
}
441441

442+
@Test // GH-1321
443+
void looksUpFailedPublicationInBatch() {
444+
445+
var event = new TestEvent("first");
446+
var publication = createPublication(event);
447+
448+
repository.markFailed(publication.getIdentifier());
449+
450+
assertThat(repository.findFailedPublications(FailedCriteria.ALL.withItemsToRead(10)))
451+
.extracting(TargetEventPublication::getIdentifier)
452+
.containsExactly(publication.getIdentifier());
453+
}
454+
455+
@Test // GH-1321
456+
void looksUpFailedPublicationWithReferenceDate() throws Exception {
457+
458+
var event = new TestEvent("first");
459+
var publication = createPublication(event);
460+
461+
repository.markFailed(publication.getIdentifier());
462+
463+
Thread.sleep(200);
464+
465+
var criteria = FailedCriteria.ALL
466+
.withPublicationsPublishedBefore(publication.getPublicationDate().plusMillis(50));
467+
468+
assertThat(repository.findFailedPublications(criteria))
469+
.extracting(TargetEventPublication::getIdentifier)
470+
.containsExactly(publication.getIdentifier());
471+
}
472+
442473
private void assertOneByStatus(Status reference) {
443474

444475
for (var status : Status.values()) {

0 commit comments

Comments
 (0)