[#3250] feat: Persistent Streams aligned with Axon Framework 5 API#4039
[#3250] feat: Persistent Streams aligned with Axon Framework 5 API#4039MateuszNaKodach wants to merge 10 commits intomainfrom
Conversation
…ction` and `AggregateEventConverter`
…verter for easier testing
…ing connection with converter and filtering events by aggregate
… definition,factory,builder
…stentStreamMessageSource`
…tStreamMessageSource` - wait for gRPC call to end
…ntional close
Add closing flag to PersistentStreamConnection to distinguish between
intentional close (user called close()) and unexpected disconnection
(network error, server disconnect).
When close() is called:
- Set closing flag to true before closing the stream
- streamClosed() callback checks flag and skips reconnection scheduling
- messageAvailable() callback checks flag and skips task submission
This prevents RejectedExecutionException when gRPC callbacks arrive
after the scheduler has been shut down during intentional close.
…ersistentStreamMessageSource` - wait for gRPC call to end" This reverts commit 3a41b2d.
…`PersistentStreamSettings`
…`PersistentStreamSettings` - SequentialPolicy as default
| if (!events.isEmpty()) { | ||
| // Pass null ProcessingContext - the SubscribingEventProcessor will create its own UnitOfWork. | ||
| // fixme: We lose TrackingToken so we cannot determine if it's replay or not! | ||
| FutureUtils.joinAndUnwrap(eventsBatchConsumer.apply(events, null)); |
There was a problem hiding this comment.
There we have a significant problem (maybe require even a change of the API). There is no way how to pass TrackingToken from Message.Entry Context to be procesessed along with the events. It'd be the best to have eventsBatchConsumer which accepts MessageStream.Entry(ies).
Now if we pass null, we cannot determine if it's a replay of not.
Before we had here EventWithToken, so the TrackingToken was passed to consumer along with each event. Thanks to the PersistentStreamEvent#isReplay we knew which one is a replay.
| private void streamClosed(Throwable throwable) { | ||
| persistentStreamHolder.set(null); | ||
| if (throwable != null) { | ||
| if (throwable != null && !closing.get()) { |
There was a problem hiding this comment.
I think it should be introduced to Axon Framework 4 as well.
We check if we ONLY reconnect if it's an exception, not intentional closing.
| */ | ||
| @Override | ||
| @Nonnull | ||
| public MessageStream.Entry<EventMessage> apply(PersistentStreamEvent persistentStreamEvent) { |
There was a problem hiding this comment.
In AF4 we were convertin to TrackedEventMessage - we don't have it anymore, so in order to preserve TrackingToken, we return here the entry so EventMessage along with Context.
|


This project shows current progress and may be the starting point for futher development.
What I have done:
expanded interface
SubscribableEventSourceto support filtering byEventCriteriaported
PersistentStreamConnection- with fix for reconnecting after intentional stream closednew class
PersistentStreamEventConverter- converting PersistentStreamEvents to MessageStream.Entryported
PersistentStreamMessageSource- with inital support for filtering, but there is a problem with passing TrackingToken (check comment for this PR)ported
PersistentStreamEventSourceDefinitionported
PersistentStreamMessageSourceFactoryported
PersistentStreamScheduledExecutorBuildernew class
PersistentStreamSequencingPolicies- with a dictionaryAggregateEventConverter- new class that converts legacy Axon Server events to AF5AxonServerConfiguration- restoredPersistentStreamSettings- with default sequencing policy changed toSequentialPolicyadded tests, especially IntegrationTest for persistent streams
TODO: