Refactor AbstractMessageStream and implementations#4396
Refactor AbstractMessageStream and implementations#4396hjohn wants to merge 2 commits intoaxon-5.1.xfrom
Conversation
4415738 to
4809dc0
Compare
smcvb
left a comment
There was a problem hiding this comment.
First and foremost: tremendous simplification of the concrete stream implementations! On the other end, the AbstractMessageStream became a tad more complex, let alone concretely validating the shift in a PR. I do have a bunch of concerns and some questions. Honestly not 100% sure if they all merit changes, but I am going to be protective here given the role of the MessageStream.
4809dc0 to
467d49c
Compare
7b9184f to
a9faf4e
Compare
smcvb
left a comment
There was a problem hiding this comment.
I think the use of synchronized could be documented, as it's a well thought off decision. Furthermore, the before-each assertion seems off to me. Lastly, the flux-pre-subscribe is AFAIK not the typical approach of Project Reactor-like components. Hence, I think it's best if we stick to what users expect when using this.
a9faf4e to
307cb20
Compare
smcvb
left a comment
There was a problem hiding this comment.
My concerns have been addressed, hence I'm approving this pull request.
68fa755 to
92ccac0
Compare
a4b160f to
f405fd2
Compare
a229e81 to
89199bf
Compare
- Move most of the complicated logic to AbstractMessageStream for streams deriving from it - Made AbstractMessageStream safe to extend without breaking its invariants - Based FluxMessageStream on AbstractMessageStream - Based ConcatenatingMessageStream on AbstractMessageStream - Simplified ConcatenatingMessageStream using active stream logic - Removed unused code from QueueMessageStream and renamed methods for clarity
89199bf to
97eb7d3
Compare
8f1d6f6 to
4923a30
Compare
| invokeCallbackSafely(); | ||
| } | ||
| }); | ||
| this.source.thenAccept(e -> signalProgress()); |
There was a problem hiding this comment.
bug: missing progress signal on exceptional future completion
| this.source.thenAccept(e -> signalProgress()); | |
| this.source.whenComplete((entry, throwable) -> signalProgress()); |
thenAccept only fires on successful completion. When the CompletableFuture completes exceptionally, signalProgress() is never called. If a consumer has registered a callback (via setCallback or reduce()), awaitingData = true, and
the callback will never fire — causing the stream to hang permanently.
fetchNext() already handles isCompletedExceptionally() correctly, so just the signal is missing.
| @Override | ||
| public void onError(Throwable t) { | ||
| peeked.add(FetchResult.error(t)); | ||
| seal(); |
There was a problem hiding this comment.
bug: missing signalProgress() call
in onComplete we have signalProgress() at the end of the method.
Should we have it here as well?
| if (!Boolean.TRUE.equals(processingContext.getResource(prepareCommitExecuted))) { | ||
| updateAppendPosition(marker); | ||
| } |
There was a problem hiding this comment.
Is it correct description of this change?
The Bug Fix in DefaultEventStoreTransaction
The source() method attaches a handler that updates the append position when a stream completes. But if source() was called multiple times (for multiple sourcing conditions), the onComplete callback fired multiple times, updating
the position each time — causing incorrect behavior.
Fix: a prepareCommitExecuted flag on the ProcessingContext:
Before:
source() call 1 ──► onComplete ──► updateAppendPosition() ✓
source() call 2 ──► onComplete ──► updateAppendPosition() ✗ (ran again!)
After:
source() call 1 ──► onComplete ──► prepareCommitExecuted? NO → updateAppendPosition()
set prepareCommitExecuted = true
source() call 2 ──► onComplete ──► prepareCommitExecuted? YES → skip ✓
By doing the above, this PR resolves #4356
This also fixes a bug in
DefaultEventStoreTransactionthat I ran into while fixing streams:Resolves #4200