1818import jakarta .persistence .EntityManager ;
1919
2020import java .time .Instant ;
21+ import java .util .ArrayList ;
2122import java .util .List ;
2223import java .util .Objects ;
2324import java .util .Optional ;
2425import java .util .UUID ;
26+ import java .util .function .Function ;
2527import java .util .stream .IntStream ;
2628
2729import org .jspecify .annotations .Nullable ;
30+ import org .springframework .modulith .events .EventPublication .Status ;
2831import org .springframework .modulith .events .core .EventPublicationRepository ;
2932import org .springframework .modulith .events .core .EventSerializer ;
3033import org .springframework .modulith .events .core .PublicationTargetIdentifier ;
3134import org .springframework .modulith .events .core .TargetEventPublication ;
35+ import org .springframework .modulith .events .jpa .updating .DefaultJpaEventPublication ;
3236import org .springframework .modulith .events .support .CompletionMode ;
3337import org .springframework .stereotype .Repository ;
3438import org .springframework .transaction .annotation .Transactional ;
@@ -93,7 +97,8 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
9397
9498 private static final String MARK_COMPLETED_BY_ID = """
9599 update DefaultJpaEventPublication p
96- set p.completionDate = ?2
100+ set p.status = org.springframework.modulith.events.EventPublication$Status.COMPLETED,
101+ p.completionDate = ?2
97102 where p.id = ?1
98103 """ ;
99104
@@ -117,16 +122,49 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
117122
118123 private static final String DELETE_COMPLETED = """
119124 delete
120- from %s p
121- where
122- p.completionDate is not null
125+ from %s p
126+ where p.completionDate is not null
123127 """ ;
124128
125129 private static final String DELETE_COMPLETED_BEFORE = """
126130 delete
127- from %s p
128- where
129- p.completionDate < ?1
131+ from %s p
132+ where p.completionDate < ?1
133+ """ ;
134+
135+ private static final String UPDATE = """
136+ update DefaultJpaEventPublication p
137+ set p.status = ?1
138+ where p.id = ?2
139+ and status != ?1
140+ """ ;
141+
142+ private static final String MARK_RESUBMITTED = """
143+ update DefaultJpaEventPublication p
144+ set p.status = org.springframework.modulith.events.EventPublication$Status.RESUBMITTED,
145+ p.completionAttempts = p.completionAttempts + 1,
146+ p.lastResubmissionDate = ?1
147+ where p.id = ?2
148+ and p.status != org.springframework.modulith.events.EventPublication$Status.RESUBMITTED
149+ """ ;
150+
151+ private static final String COUNT_BY_STATUS = """
152+ select count(p.id)
153+ from %s p
154+ where p.status = ?1
155+ """ ;
156+
157+ private static final String FIND_BY_STATUS = """
158+ select p
159+ from %s p
160+ where p.status = ?1
161+ """ ;
162+
163+ private static final String FIND_FAILED = """
164+ select p
165+ from DefaultJpaEventPublication p
166+ where p.status = org.springframework.modulith.events.EventPublication$Status.FAILED
167+ or (p.status is null and p.completionDate is null)
130168 """ ;
131169
132170 private static final int DELETE_BATCH_SIZE = 100 ;
@@ -136,6 +174,7 @@ class JpaEventPublicationRepository implements EventPublicationRepository {
136174 private final CompletionMode completionMode ;
137175
138176 private final String getCompleted , deleteCompleted , deleteCompletedBefore ;
177+ private final Function <Status , String > entityNameByStatus ;
139178
140179 /**
141180 * Creates a new {@link JpaEventPublicationRepository} for the given {@link EntityManager} and
@@ -157,6 +196,10 @@ public JpaEventPublicationRepository(EntityManager entityManager, EventSerialize
157196
158197 var archiveEntityName = getCompletedEntityType ().getSimpleName ();
159198
199+ this .entityNameByStatus = status -> status == Status .COMPLETED
200+ ? archiveEntityName
201+ : JpaEventPublication .getIncompleteType ().getSimpleName ();
202+
160203 this .getCompleted = COMPLETE .formatted (archiveEntityName );
161204 this .deleteCompleted = DELETE_COMPLETED .formatted (archiveEntityName );
162205 this .deleteCompletedBefore = DELETE_COMPLETED_BEFORE .formatted (archiveEntityName );
@@ -174,6 +217,15 @@ public TargetEventPublication create(TargetEventPublication publication) {
174217 return publication ;
175218 }
176219
220+ @ Override
221+ public void markProcessing (UUID identifier ) {
222+
223+ entityManager .createQuery (UPDATE )
224+ .setParameter (1 , Status .PROCESSING )
225+ .setParameter (2 , identifier )
226+ .executeUpdate ();
227+ }
228+
177229 /*
178230 * (non-Javadoc)
179231 * @see org.springframework.modulith.events.EventPublicationRepository#markCompleted(java.lang.Object, org.springframework.modulith.events.PublicationTargetIdentifier, java.time.Instant)
@@ -240,6 +292,34 @@ public void markCompleted(UUID identifier, Instant completionDate) {
240292 }
241293 }
242294
295+ /*
296+ * (non-Javadoc)
297+ * @see org.springframework.modulith.events.core.EventPublicationRepository#markFailed(java.util.UUID)
298+ */
299+ @ Override
300+ public void markFailed (UUID identifier ) {
301+
302+ entityManager .createQuery (UPDATE )
303+ .setParameter (1 , Status .FAILED )
304+ .setParameter (2 , identifier )
305+ .executeUpdate ();
306+ }
307+
308+ /*
309+ * (non-Javadoc)
310+ * @see org.springframework.modulith.events.core.EventPublicationRepository#markResubmitted(java.util.UUID, java.time.Instant)
311+ */
312+ @ Override
313+ public boolean markResubmitted (UUID identifier , Instant resubmissionDate ) {
314+
315+ var result = entityManager .createQuery (MARK_RESUBMITTED )
316+ .setParameter (1 , resubmissionDate )
317+ .setParameter (2 , identifier )
318+ .executeUpdate ();
319+
320+ return result == 1 ;
321+ }
322+
243323 /*
244324 * (non-Javadoc)
245325 * @see org.springframework.modulith.events.EventPublicationRepository#findIncompletePublications()
@@ -292,8 +372,43 @@ public List<TargetEventPublication> findCompletedPublications() {
292372 var type = getCompletedEntityType ();
293373
294374 return entityManager .createQuery (getCompleted , type )
295- .getResultList ()
296- .stream ()
375+ .getResultStream ()
376+ .map (this ::entityToDomain )
377+ .toList ();
378+ }
379+
380+ /*
381+ * (non-Javadoc)
382+ * @see org.springframework.modulith.events.core.EventPublicationRepository#findFailedPublications(org.springframework.modulith.events.core.EventPublicationRepository.FailedCriteria)
383+ */
384+ @ Override
385+ public List <TargetEventPublication > findFailedPublications (FailedCriteria criteria ) {
386+
387+ var query = FIND_FAILED ;
388+
389+ var instant = criteria .getPublicationDateReference ();
390+ var args = new ArrayList <>();
391+
392+ if (instant != null ) {
393+ query += " and p.publicationDate < ?1" ;
394+ args .add (instant );
395+ }
396+
397+ query += " order by p.publicationDate asc" ;
398+
399+ var jpaQuery = entityManager .createQuery (query , DefaultJpaEventPublication .class );
400+
401+ for (int i = 0 ; i < args .size (); i ++) {
402+ jpaQuery = jpaQuery .setParameter (i + 1 , args .get (i ));
403+ }
404+
405+ var itemsToRead = criteria .getMaxItemsToRead ();
406+
407+ if (itemsToRead != -1 ) {
408+ jpaQuery .setMaxResults (((Long ) itemsToRead ).intValue ());
409+ }
410+
411+ return jpaQuery .getResultStream ()
297412 .map (this ::entityToDomain )
298413 .toList ();
299414 }
@@ -333,6 +448,36 @@ public void deleteCompletedPublicationsBefore(Instant instant) {
333448 .executeUpdate ();
334449 }
335450
451+ /*
452+ * (non-Javadoc)
453+ * @see org.springframework.modulith.events.core.EventPublicationRepository#findByStatus(org.springframework.modulith.events.EventPublication.Status)
454+ */
455+ @ Override
456+ public List <TargetEventPublication > findByStatus (Status status ) {
457+
458+ var query = entityNameByStatus .andThen (FIND_BY_STATUS ::formatted ).apply (status );
459+
460+ return entityManager .createQuery (query , JpaEventPublication .class )
461+ .setParameter (1 , status )
462+ .getResultStream ()
463+ .map (this ::entityToDomain )
464+ .toList ();
465+ }
466+
467+ /*
468+ * (non-Javadoc)
469+ * @see org.springframework.modulith.events.core.EventPublicationRepository#countByStatus(org.springframework.modulith.events.EventPublication.Status)
470+ */
471+ @ Override
472+ public int countByStatus (Status status ) {
473+
474+ var query = entityNameByStatus .andThen (COUNT_BY_STATUS ::formatted ).apply (status );
475+
476+ return ((Long ) entityManager .createQuery (query )
477+ .setParameter (1 , status )
478+ .getSingleResult ()).intValue ();
479+ }
480+
336481 /**
337482 * Returns the type representing completed event publications.
338483 *
@@ -364,7 +509,8 @@ private JpaEventPublication domainToEntity(TargetEventPublication domain) {
364509 var event = domain .getEvent ();
365510
366511 return JpaEventPublication .of (domain .getIdentifier (), domain .getPublicationDate (),
367- domain .getTargetIdentifier ().getValue (), serializeEvent (event ), event .getClass ());
512+ domain .getTargetIdentifier ().getValue (), serializeEvent (event ), event .getClass (), domain .getStatus (),
513+ domain .getLastResubmissionDate (), domain .getCompletionAttempts ());
368514 }
369515
370516 private TargetEventPublication entityToDomain (JpaEventPublication entity ) {
0 commit comments