Handle duplicate FINISHED messages, improve driver diagnostics#54
Open
travisdowns wants to merge 2 commits into0.12.x-redpandafrom
Open
Handle duplicate FINISHED messages, improve driver diagnostics#54travisdowns wants to merge 2 commits into0.12.x-redpandafrom
travisdowns wants to merge 2 commits into0.12.x-redpandafrom
Conversation
The driver's _handle_finished calls _join_test_process inline, which blocks the single-threaded ZMQ REP loop for up to 30s. During that block, clients retry their FINISHED messages, which queue up on the REP socket. When the driver unblocks, duplicate FINISHED messages cause a KeyError on `del self.active_tests[test_key]` that kills the entire session. Changes: - Add idempotency guard in _handle_finished: check active_tests and finished_tests before processing. If the test was already finished, log elapsed time since original event_time. Return early instead of crashing with KeyError. - Track last 100 received messages in a deque ring buffer. On fatal exception in the main event loop, dump the recent message history (test_key, event_type, event_id, message_id). - Add FATAL_ERROR_TAG to the exception handler log line. - Log a warning when event processing exceeds SLOW_EVENT_THRESHOLD_S (5s), using try/finally around _handle() so it fires on both success and exception paths. - Increase Sender.REQUEST_TIMEOUT_MS from 3s to 35s so the first send attempt outlasts the 30s join timeout. This is needed because non-FINISHED events are not idempotent and cannot safely be retried.
734900b to
17b0fe0
Compare
Allow overriding REQUEST_TIMEOUT_MS via constructor param, defaulting to the class constant (35s). This lets the check_timeout test pass a small timeout (100ms) so the spawned Sender subprocess finishes in ~0.5s instead of ~175s (5 retries * 35s).
Member
Author
|
@gousteris take a peek if you can. |
gousteris
reviewed
Mar 6, 2026
| f"test_id={event.get('test_id')}, test_index={event.get('test_index')}") | ||
| except Exception as e: | ||
| err_str = "Exception receiving message: %s: %s, active_tests: \n %s \n" % (str(type(e)), str(e), self.active_tests_debug()) | ||
| err_str = "%s Exception receiving message: %s: %s, active_tests: \n %s \n" % (FATAL_ERROR_TAG, str(type(e)), str(e), self.active_tests_debug()) |
Member
Author
There was a problem hiding this comment.
@gousteris sorry I didn't understand the comment. Are you saying to do a CI run before checking in?
There was a problem hiding this comment.
Sorry I meant if CI should fail when a fatal error is raised
Member
Author
There was a problem hiding this comment.
Yes, I think it should and it will as there is a raise below in this same exception handler.
gousteris
approved these changes
Mar 6, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
_handle_finishedby checkingactive_testsandfinished_testsbefore processing. Logs elapsed time since original and returns early instead of crashing withKeyError.FATAL_ERROR_TAGandSLOW_EVENT_THRESHOLD_S(5s) logging to the main event loop.Sender.REQUEST_TIMEOUT_MSfrom 3s to 35s so the first send attempt outlasts the 30s_join_test_processtimeout. Non-FINISHED events are not idempotent, so retries can cause duplicate processing.Depends on #53.
Test plan
check_runner.py+check_sender_receiver.py)