1515 */
1616package org .springframework .modulith .events .neo4j ;
1717
18+ import static org .neo4j .cypherdsl .core .Cypher .*;
19+
1820import java .time .Instant ;
1921import java .time .ZoneOffset ;
2022import java .util .ArrayList ;
2325import java .util .Objects ;
2426import java .util .Optional ;
2527import java .util .UUID ;
28+ import java .util .function .Function ;
2629
2730import org .neo4j .cypherdsl .core .Cypher ;
2831import org .neo4j .cypherdsl .core .Node ;
4851 *
4952 * @author Gerrit Meier
5053 * @author Oliver Drotbohm
54+ * @author Cora Iberkleid
5155 * @since 1.1
5256 */
5357@ Transactional
@@ -61,85 +65,115 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
6165 private static final String PUBLICATION_DATE = "publicationDate" ;
6266 private static final String COMPLETION_DATE = "completionDate" ;
6367
64- private static final Node EVENT_PUBLICATION_NODE = Cypher . node ("Neo4jEventPublication" )
68+ private static final Node EVENT_PUBLICATION_NODE = node ("Neo4jEventPublication" )
6569 .named ("neo4jEventPublication" );
6670
67- private static final Statement INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT = Cypher
68- .match (EVENT_PUBLICATION_NODE )
69- .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (Cypher .parameter (EVENT_HASH )))
70- .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (Cypher .parameter (LISTENER_ID )))
71+ private static final Node EVENT_PUBLICATION_COMPLETED_NODE = node ("Neo4jEventPublicationCompleted" )
72+ .named ("neo4jEventPublicationCompleted" );
73+
74+ private static final Statement INCOMPLETE_BY_EVENT_AND_TARGET_IDENTIFIER_STATEMENT = match (EVENT_PUBLICATION_NODE )
75+ .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
76+ .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
7177 .and (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).isNull ())
7278 .returning (EVENT_PUBLICATION_NODE )
7379 .build ();
7480
75- private static final Statement DELETE_BY_EVENT_AND_LISTENER_ID = Cypher . match (EVENT_PUBLICATION_NODE )
76- .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (Cypher . parameter (EVENT_HASH )))
77- .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (Cypher . parameter (LISTENER_ID )))
81+ private static final Statement DELETE_BY_EVENT_AND_LISTENER_ID = match (EVENT_PUBLICATION_NODE )
82+ .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
83+ .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
7884 .delete (EVENT_PUBLICATION_NODE )
7985 .build ();
8086
81- private static final Statement DELETE_BY_ID_STATEMENT = Cypher . match (EVENT_PUBLICATION_NODE )
82- .where (EVENT_PUBLICATION_NODE .property (ID ).in (Cypher . parameter (ID )))
87+ private static final Statement DELETE_BY_ID_STATEMENT = match (EVENT_PUBLICATION_NODE )
88+ .where (EVENT_PUBLICATION_NODE .property (ID ).in (parameter (ID )))
8389 .delete (EVENT_PUBLICATION_NODE )
8490 .build ();
8591
86- private static final Statement DELETE_COMPLETED_STATEMENT = Cypher . match (EVENT_PUBLICATION_NODE )
87- .where (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).isNotNull ())
88- .delete (EVENT_PUBLICATION_NODE )
92+ private static final Function < Node , Statement > DELETE_COMPLETED_STATEMENT = node -> match (node )
93+ .where (node .property (COMPLETION_DATE ).isNotNull ())
94+ .delete (node )
8995 .build ();
9096
91- private static final Statement DELETE_COMPLETED_BEFORE_STATEMENT = Cypher . match (EVENT_PUBLICATION_NODE )
92- .where (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ).lt (Cypher . parameter (PUBLICATION_DATE )))
93- .and (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).isNotNull ())
94- .delete (EVENT_PUBLICATION_NODE )
97+ private static final Function < Node , Statement > DELETE_COMPLETED_BEFORE_STATEMENT = node -> match (node )
98+ .where (node .property (PUBLICATION_DATE ).lt (parameter (PUBLICATION_DATE )))
99+ .and (node .property (COMPLETION_DATE ).isNotNull ())
100+ .delete (node )
95101 .build ();
96102
97- private static final Statement INCOMPLETE_PUBLISHED_BEFORE_STATEMENT = Cypher
98- .match (EVENT_PUBLICATION_NODE )
99- .where (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ).lt (Cypher .parameter (PUBLICATION_DATE )))
103+ private static final Statement INCOMPLETE_PUBLISHED_BEFORE_STATEMENT = match (EVENT_PUBLICATION_NODE )
104+ .where (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ).lt (parameter (PUBLICATION_DATE )))
100105 .and (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).isNull ())
101106 .returning (EVENT_PUBLICATION_NODE )
102107 .orderBy (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ))
103108 .build ();
104109
105110 private static final Statement CREATE_STATEMENT = Cypher .create (EVENT_PUBLICATION_NODE )
106- .set (EVENT_PUBLICATION_NODE .property (ID ).to (Cypher . parameter (ID )))
107- .set (EVENT_PUBLICATION_NODE .property (EVENT_SERIALIZED ).to (Cypher . parameter (EVENT_SERIALIZED )))
108- .set (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).to (Cypher . parameter (EVENT_HASH )))
109- .set (EVENT_PUBLICATION_NODE .property (EVENT_TYPE ).to (Cypher . parameter (EVENT_TYPE )))
110- .set (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).to (Cypher . parameter (LISTENER_ID )))
111- .set (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ).to (Cypher . parameter (PUBLICATION_DATE )))
111+ .set (EVENT_PUBLICATION_NODE .property (ID ).to (parameter (ID )))
112+ .set (EVENT_PUBLICATION_NODE .property (EVENT_SERIALIZED ).to (parameter (EVENT_SERIALIZED )))
113+ .set (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).to (parameter (EVENT_HASH )))
114+ .set (EVENT_PUBLICATION_NODE .property (EVENT_TYPE ).to (parameter (EVENT_TYPE )))
115+ .set (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).to (parameter (LISTENER_ID )))
116+ .set (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ).to (parameter (PUBLICATION_DATE )))
112117 .build ();
113118
114- private static final Statement COMPLETE_STATEMENT = Cypher . match (EVENT_PUBLICATION_NODE )
115- .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (Cypher . parameter (EVENT_HASH )))
116- .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (Cypher . parameter (LISTENER_ID )))
119+ private static final Statement COMPLETE_STATEMENT = match (EVENT_PUBLICATION_NODE )
120+ .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
121+ .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
117122 .and (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).isNull ())
118- .set (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).to (Cypher .parameter (COMPLETION_DATE )))
123+ .set (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
124+ .build ();
125+
126+ private static final Statement COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT = match (EVENT_PUBLICATION_NODE )
127+ .where (EVENT_PUBLICATION_NODE .property (ID ).eq (parameter (ID )))
128+ .and (not (exists (match (EVENT_PUBLICATION_COMPLETED_NODE )
129+ .where (EVENT_PUBLICATION_COMPLETED_NODE .property (ID ).eq (parameter (ID )))
130+ .returning (literalTrue ()).build ())))
131+ .with (EVENT_PUBLICATION_NODE )
132+ .create (EVENT_PUBLICATION_COMPLETED_NODE )
133+ .set (EVENT_PUBLICATION_COMPLETED_NODE .property (ID ).to (EVENT_PUBLICATION_NODE .property (ID )))
134+ .set (EVENT_PUBLICATION_COMPLETED_NODE .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
135+ .build ();
136+
137+ private static final Statement COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT = match (EVENT_PUBLICATION_NODE )
138+ .where (EVENT_PUBLICATION_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
139+ .and (EVENT_PUBLICATION_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
140+ .and (not (exists (match (EVENT_PUBLICATION_COMPLETED_NODE )
141+ .where (EVENT_PUBLICATION_COMPLETED_NODE .property (EVENT_HASH ).eq (parameter (EVENT_HASH )))
142+ .and (EVENT_PUBLICATION_COMPLETED_NODE .property (LISTENER_ID ).eq (parameter (LISTENER_ID )))
143+ .returning (literalTrue ()).build ())))
144+ .with (EVENT_PUBLICATION_NODE )
145+ .create (EVENT_PUBLICATION_COMPLETED_NODE )
146+ .set (EVENT_PUBLICATION_COMPLETED_NODE .property (ID ).to (EVENT_PUBLICATION_NODE .property (ID )))
147+ .set (EVENT_PUBLICATION_COMPLETED_NODE .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
119148 .build ();
120149
121- private static final Statement COMPLETE_BY_ID_STATEMENT = Cypher . match (EVENT_PUBLICATION_NODE )
122- .where (EVENT_PUBLICATION_NODE .property (ID ).eq (Cypher . parameter (ID )))
123- .set (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).to (Cypher . parameter (COMPLETION_DATE )))
150+ private static final Function < Node , Statement > COMPLETE_BY_ID_STATEMENT = node -> match (node )
151+ .where (node .property (ID ).eq (parameter (ID )))
152+ .set (node .property (COMPLETION_DATE ).to (parameter (COMPLETION_DATE )))
124153 .build ();
125154
126- private static final ResultStatement INCOMPLETE_STATEMENT = Cypher . match (EVENT_PUBLICATION_NODE )
155+ private static final ResultStatement INCOMPLETE_STATEMENT = match (EVENT_PUBLICATION_NODE )
127156 .where (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).isNull ())
128157 .returning (EVENT_PUBLICATION_NODE )
129158 .orderBy (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ))
130159 .build ();
131160
132- private static final ResultStatement ALL_COMPLETED_STATEMENT = Cypher . match (EVENT_PUBLICATION_NODE )
133- .where (EVENT_PUBLICATION_NODE .property (COMPLETION_DATE ).isNotNull ())
134- .returning (EVENT_PUBLICATION_NODE )
135- .orderBy (EVENT_PUBLICATION_NODE .property (PUBLICATION_DATE ))
161+ private static final Function < Node , ResultStatement > ALL_COMPLETED_STATEMENT = node -> match (node )
162+ .where (node .property (COMPLETION_DATE ).isNotNull ())
163+ .returning (node )
164+ .orderBy (node .property (PUBLICATION_DATE ))
136165 .build ();
137166
138167 private final Neo4jClient neo4jClient ;
139168 private final Renderer renderer ;
140169 private final EventSerializer eventSerializer ;
141170 private final CompletionMode completionMode ;
142171
172+ private final Statement deleteCompletedStatement ;
173+ private final Statement deleteCompletedBeforeStatement ;
174+ private final Statement completedByIdStatement ;
175+ private final ResultStatement allCompletedStatement ;
176+
143177 Neo4jEventPublicationRepository (Neo4jClient neo4jClient , Configuration cypherDslConfiguration ,
144178 EventSerializer eventSerializer , CompletionMode completionMode ) {
145179
@@ -152,6 +186,13 @@ class Neo4jEventPublicationRepository implements EventPublicationRepository {
152186 this .renderer = Renderer .getRenderer (cypherDslConfiguration );
153187 this .eventSerializer = eventSerializer ;
154188 this .completionMode = completionMode ;
189+
190+ var archiveNode = completionMode == CompletionMode .ARCHIVE ? EVENT_PUBLICATION_COMPLETED_NODE : EVENT_PUBLICATION_NODE ;
191+
192+ this .deleteCompletedStatement = DELETE_COMPLETED_STATEMENT .apply (archiveNode );
193+ this .deleteCompletedBeforeStatement = DELETE_COMPLETED_BEFORE_STATEMENT .apply (archiveNode );
194+ this .completedByIdStatement = COMPLETE_BY_ID_STATEMENT .apply (archiveNode );
195+ this .allCompletedStatement = ALL_COMPLETED_STATEMENT .apply (archiveNode );
155196 }
156197
157198 /*
@@ -201,6 +242,18 @@ public void markCompleted(Object event, PublicationTargetIdentifier identifier,
201242 .bind (identifier .getValue ()).to (LISTENER_ID )
202243 .run ();
203244
245+ } else if (completionMode == CompletionMode .ARCHIVE ) {
246+
247+ neo4jClient .query (renderer .render (COMPLETE_IN_ARCHIVE_BY_EVENT_AND_LISTENER_ID_STATEMENT ))
248+ .bind (eventHash ).to (EVENT_HASH )
249+ .bind (identifier .getValue ()).to (LISTENER_ID )
250+ .bind (Values .value (completionDate .atOffset (ZoneOffset .UTC ))).to (COMPLETION_DATE )
251+ .run ();
252+ neo4jClient .query (renderer .render (DELETE_BY_EVENT_AND_LISTENER_ID ))
253+ .bind (eventHash ).to (EVENT_HASH )
254+ .bind (identifier .getValue ()).to (LISTENER_ID )
255+ .run ();
256+
204257 } else {
205258
206259 neo4jClient .query (renderer .render (COMPLETE_STATEMENT ))
@@ -223,13 +276,22 @@ public void markCompleted(UUID identifier, Instant completionDate) {
223276
224277 deletePublications (List .of (identifier ));
225278
279+ } else if (completionMode == CompletionMode .ARCHIVE ) {
280+
281+ neo4jClient .query (renderer .render (COMPLETE_IN_ARCHIVE_BY_ID_STATEMENT ))
282+ .bind ("" ).to (ID )
283+ .bind (Values .value (completionDate .atOffset (ZoneOffset .UTC ))).to (COMPLETION_DATE )
284+ .run ();
285+ deletePublications (List .of (identifier ));
286+
226287 } else {
227288
228- neo4jClient .query (renderer .render (COMPLETE_BY_ID_STATEMENT ))
289+ neo4jClient .query (renderer .render (completedByIdStatement ))
229290 .bind (Values .value (identifier .toString ())).to (ID )
230291 .bind (Values .value (completionDate .atOffset (ZoneOffset .UTC ))).to (COMPLETION_DATE )
231292 .run ();
232293 }
294+
233295 }
234296
235297 /*
@@ -287,7 +349,7 @@ public Optional<TargetEventPublication> findIncompletePublicationsByEventAndTarg
287349 @ Override
288350 public List <TargetEventPublication > findCompletedPublications () {
289351
290- return new ArrayList <>(neo4jClient .query (renderer .render (ALL_COMPLETED_STATEMENT ))
352+ return new ArrayList <>(neo4jClient .query (renderer .render (allCompletedStatement ))
291353 .fetchAs (TargetEventPublication .class )
292354 .mappedBy (this ::mapRecordToPublication )
293355 .all ());
@@ -313,7 +375,7 @@ public void deletePublications(List<UUID> identifiers) {
313375 @ Override
314376 @ Transactional
315377 public void deleteCompletedPublications () {
316- neo4jClient .query (renderer .render (DELETE_COMPLETED_STATEMENT )).run ();
378+ neo4jClient .query (renderer .render (deleteCompletedStatement )).run ();
317379 }
318380
319381 /*
@@ -324,7 +386,7 @@ public void deleteCompletedPublications() {
324386 @ Transactional
325387 public void deleteCompletedPublicationsBefore (Instant instant ) {
326388
327- neo4jClient .query (renderer .render (DELETE_COMPLETED_BEFORE_STATEMENT ))
389+ neo4jClient .query (renderer .render (deleteCompletedBeforeStatement ))
328390 .bind (Values .value (instant .atOffset (ZoneOffset .UTC ))).to (PUBLICATION_DATE )
329391 .run ();
330392 }
0 commit comments