Fix QueueMessageStream isCompleted (#4356)#4389
Fix QueueMessageStream isCompleted (#4356)#4389wadhwaroh-lang wants to merge 6 commits intoAxonIQ:mainfrom
Conversation
…Framework#4356) Remove override that tied isCompleted to queue.isEmpty(), so completion matches AbstractMessageStream (complete/completeExceptionally/close only). - Document behavior on QueueMessageStream; align isClosed javadoc - Extend MessageStreamTest with hook for peek expectations - Adjust QueueMessageStreamTest and add regression test
…ramework#4356) When the producer signals completion before the queue is drained, use hasNextAvailable() to decide concat, error-continue, and completion callbacks.
|
Could a maintainer please add Type: Bug, Priority 2: Should, and milestone Release 5.1.1 to match #4356? |
invokeCompletionHandlerIfCompleted() previously called hasNextAvailable() first, which can advance peek/next state on the delegate. For failed streams (e.g. ContinuousMessageStream with onComplete + first().asCompletableFuture), that could leave FirstResult completing normally instead of exceptionally. Reorder guards so error() is evaluated first; logic is unchanged for successful streams.
…lbackMessageStream - Read delegate failure once in error() and skip invokeCompletionHandlerIfCompleted when the delegate already failed, so completion handlers never run on error paths. - Structure invokeCompletionHandlerIfCompleted as isCompleted, then failure, then hasNextAvailable, matching successful-completion semantics for AxonIQ#4356. - In hasNextAvailable, only call invoke when the delegate is completed with no error, avoiding redundant invoke work on failed streams.
…g entries - FlowControlledResponseSender: emit ReplyChannel complete/error only when upstream is completed and the queue is drained; align responseSendingLoop so we do not spin when completion is pending but demand is zero. - FluxUtils.FluxStreamAdapter: emit terminal Flux signal at most once when bridging MessageStream to Flux.
…entries Aligns FluxUtils.of with QueueMessageStream semantics where isCompleted() can be true while the queue is non-empty; avoids completing the Flux before all entries are emitted under backpressure.
Done @wadhwaroh-lang, would you mind explaining in the description of the PR what you've done and why you've done it? |
|
We're also looking into this bug, and it is quite complicated. We're hoping to move more of the base stream functionality to the I'm pretty sure there is a problem with The |
|
Hi @smcvb — thanks for the feedback, and sorry the scope wasn’t clear from the description alone. What we’re fixing (#4356) Why the PR isn’t only QueueMessageStream + one test There are also small adjustments around completion callbacks and error handling on delegating streams so that failed streams don’t run success-style completion logic or advance state in a way that masks failures (relevant for tests and usage that combine completion callbacks with MessageStream). Tests I’m happy to tighten the PR (e.g. split commits or trim anything you’d rather handle in a follow-up) once you’ve had a look. Thanks again :-) |
|
Hi @hjohn — thanks for the context; it helps to know you’re consolidating behavior on AbstractMessageStream and unifying implementations. That direction makes sense, especially for subtle cases like this. I’m fine with you taking a close look at ConcatenatingMessageStream (and any related paths) in this PR — if something should be done differently once things live more centrally on AbstractMessageStream, I’m happy to adjust or drop those changes in favour of your approach. On isCompletedWhenPeekedWithoutConsumingFromCompletedSubject: I agree with the goal that all streams should behave the same from the test suite’s point of view. I added that hook only to express queue-specific “completed but not drained” behaviour in the shared MessageStreamTest matrix. If you’d rather not special-case it there, I can remove the hook and cover that only in QueueMessageStream-focused tests (or whatever structure you prefer), so the abstract tests stay uniform. Let me know what fits best with the refactor you have in mind — I’m glad to align the PR with that. |
| if (first.hasNextAvailable()) { | ||
| return first.next(); | ||
| } |
There was a problem hiding this comment.
This is excellent, and something we also discovered a while back. The isCompleted is a stateless function, and a stream may be completed but has not transitioned to this state yet after the last call to next (it must transition after that call as otherwise it may call completion handlers before it delivered the last element).
In order to do so, we wait for another interaction with the stream that requires knowledge of a potential next element (hasNextAvailable, next or peek).
By calling hasNextAvailable here, you trigger this state change to completed (if it was on the verge of completion) meaning that the later isCompleted check here will work as expected now!
|
I've addressed this now in #4396 |
|
Thanks for your efforts here, @wadhwaroh-lang. I know that @hjohn, while working on #4396, made sure to validate your solution as well to incorporate it whenever it was applicable. Note that this does not mean we do not value your contribution, @wadhwaroh-lang! |
Fixes #4356
What we’re fixing (#4356)
QueueMessageStream had completion semantics that didn’t match AbstractMessageStream: completion is tied to the producer calling complete / completeExceptionally / close, not to the queue being drained. After aligning QueueMessageStream with that, isCompleted() can be true while hasNextAvailable() is still true (buffered entries remain).
Why the PR isn’t only QueueMessageStream + one test
Several components assume something like “if the stream is completed, we’re done / we can emit a terminal signal,” without also checking that nothing is left to read. With the corrected QueueMessageStream semantics, those paths had to be updated so we don’t finish the reply channel / Flux too early or emit completion more than once — e.g. FlowControlledResponseSender (Axon Server query responses) and FluxUtils when bridging a MessageStream to Reactor.
There are also small adjustments around completion callbacks and error handling on delegating streams so that failed streams don’t run success-style completion logic or advance state in a way that masks failures (relevant for tests and usage that combine completion callbacks with MessageStream).
Tests
Tests were extended where we assert the new completion vs “still data in the queue” behavior and the downstream effects above.