From 17b0fe0dc54fe3e14cbaa0022c64581deee45c8d Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Tue, 3 Mar 2026 12:59:41 -0300 Subject: [PATCH 1/2] runner: handle duplicate FINISHED messages, improve diagnostics The driver's _handle_finished calls _join_test_process inline, which blocks the single-threaded ZMQ REP loop for up to 30s. During that block, clients retry their FINISHED messages, which queue up on the REP socket. When the driver unblocks, duplicate FINISHED messages cause a KeyError on `del self.active_tests[test_key]` that kills the entire session. Changes: - Add idempotency guard in _handle_finished: check active_tests and finished_tests before processing. If the test was already finished, log elapsed time since original event_time. Return early instead of crashing with KeyError. - Track last 100 received messages in a deque ring buffer. On fatal exception in the main event loop, dump the recent message history (test_key, event_type, event_id, message_id). - Add FATAL_ERROR_TAG to the exception handler log line. - Log a warning when event processing exceeds SLOW_EVENT_THRESHOLD_S (5s), using try/finally around _handle() so it fires on both success and exception paths. - Increase Sender.REQUEST_TIMEOUT_MS from 3s to 35s so the first send attempt outlasts the 30s join timeout. This is needed because non-FINISHED events are not idempotent and cannot safely be retried. --- ducktape/tests/runner.py | 39 ++++++++++++++++++++++++++++++--- ducktape/tests/runner_client.py | 5 ++++- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/ducktape/tests/runner.py b/ducktape/tests/runner.py index f19c8a295..14ea7e75c 100644 --- a/ducktape/tests/runner.py +++ b/ducktape/tests/runner.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple, defaultdict +from collections import namedtuple, defaultdict, deque import copy import logging import multiprocessing @@ -38,11 +38,13 @@ from ducktape.errors import TimeoutError DEFAULT_MP_JOIN_TIMEOUT = 30 +SLOW_EVENT_THRESHOLD_S = 5 # Used in a log line to indicate issues are are potentially "corrupting" in the sense # that they may cause tests that have nothing to do with the original issue to fail. # After such an issues occurs, later test results should be treated with suspicion. CORRUPTING_FAILURE_TAG = "CORRUPTING_FAILURE" +FATAL_ERROR_TAG = "FATAL_ERROR" class Receiver(object): def __init__(self, min_port, max_port): @@ -131,6 +133,7 @@ def __init__(self, cluster, session_context, session_logger, tests, deflake_num, self._client_procs = {} # track client processes running tests self.active_tests = {} self.finished_tests = {} + self._recent_messages = deque(maxlen=100) self.test_schedule_log = [] self.finish_join_timeout = finish_join_timeout @@ -266,10 +269,22 @@ def run_all_tests(self): if self._expect_client_requests: try: event = self.receiver.recv(timeout=int(int(self.session_context.test_runner_timeout) * 1.2)) # test_runner_timeout is handled in the client. adding 20% seconds on top, to guard against client not being able to report to the server - self._handle(event) + handle_start = time.time() + try: + self._handle(event) + finally: + handle_elapsed = time.time() - handle_start + if handle_elapsed > SLOW_EVENT_THRESHOLD_S: + self._log(logging.WARNING, + f"Event processing took {handle_elapsed:.1f}s for " + f"event_type={event.get('event_type')}, " + f"test_id={event.get('test_id')}, test_index={event.get('test_index')}") except Exception as e: - err_str = "Exception receiving message: %s: %s, active_tests: \n %s \n" % (str(type(e)), str(e), self.active_tests_debug()) + err_str = "%s Exception receiving message: %s: %s, active_tests: \n %s \n" % (FATAL_ERROR_TAG, str(type(e)), str(e), self.active_tests_debug()) err_str += "\n" + traceback.format_exc(limit=16) + err_str += "\nRecent messages:\n" + for msg in self._recent_messages: + err_str += f" {msg}\n" self._log(logging.ERROR, err_str) # All processes are on the same machine, so treat communication failure as a fatal error @@ -353,6 +368,12 @@ def _preallocate_subcluster(self, test_context): self._test_cluster[TestKey(test_context.test_id, self.test_counter)] = FiniteSubcluster(allocated) def _handle(self, event): + self._recent_messages.append({ + "test_key": TestKey(event.get("test_id"), event.get("test_index")), + "event_type": event.get("event_type"), + "event_id": event.get("event_id"), + "message_id": event.get("message_id"), + }) self._log(logging.DEBUG, str(event)) if event["event_type"] == ClientEventFactory.READY: @@ -383,6 +404,18 @@ def _handle_finished(self, event): test_key = TestKey(event["test_id"], event["test_index"]) self.receiver.send(self.event_response.finished(event)) + if test_key not in self.active_tests: + if test_key in self.finished_tests: + elapsed = time.time() - self.finished_tests[test_key]["event_time"] + detail = f"already FINISHED {elapsed:.1f}s ago" + else: + detail = "never seen before" + self._log(logging.WARNING, + f"{CORRUPTING_FAILURE_TAG} received FINISHED for {test_key} not in active_tests " + f"({detail}, event_id={event.get('event_id')}, " + f"message_id={event.get('message_id')}). Ignoring.") + return + result = event['result'] if result.test_status == FAIL and self.exit_first: self.stop_testing = True diff --git a/ducktape/tests/runner_client.py b/ducktape/tests/runner_client.py index 84fe8311c..fb87917c5 100644 --- a/ducktape/tests/runner_client.py +++ b/ducktape/tests/runner_client.py @@ -43,7 +43,10 @@ def run_client(*args, **kwargs): class Sender(object): - REQUEST_TIMEOUT_MS = 3000 + # Must be greater than DEFAULT_MP_JOIN_TIMEOUT (30s) in runner.py, + # since the driver blocks on _join_test_process during FINISHED handling + # and non-FINISHED events are not idempotent (can't safely retry). + REQUEST_TIMEOUT_MS = 35000 NUM_RETRIES = 5 serde: SerDe From 3ea5e7248de91d28c6fc42ce46c67b31e89dd9df Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Tue, 3 Mar 2026 13:10:50 -0300 Subject: [PATCH 2/2] sender: make request_timeout_ms configurable per-instance Allow overriding REQUEST_TIMEOUT_MS via constructor param, defaulting to the class constant (35s). This lets the check_timeout test pass a small timeout (100ms) so the spawned Sender subprocess finishes in ~0.5s instead of ~175s (5 retries * 35s). --- ducktape/tests/runner_client.py | 6 ++++-- tests/runner/check_sender_receiver.py | 7 ++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/ducktape/tests/runner_client.py b/ducktape/tests/runner_client.py index fb87917c5..aedf6da1a 100644 --- a/ducktape/tests/runner_client.py +++ b/ducktape/tests/runner_client.py @@ -62,13 +62,15 @@ def __init__( server_host: str, server_port: int, message_supplier: ClientEventFactory, - logger: logging.Logger + logger: logging.Logger, + request_timeout_ms: int = None, ): self.serde = SerDe() self.server_endpoint = "tcp://%s:%s" % (str(server_host), str(server_port)) self.zmq_context = zmq.Context() self.socket = None self.poller = zmq.Poller() + self.request_timeout_ms = request_timeout_ms if request_timeout_ms is not None else self.REQUEST_TIMEOUT_MS self.message_supplier = message_supplier self.logger = logger @@ -91,7 +93,7 @@ def send(self, event, blocking=True): waiting_for_reply = True while waiting_for_reply: - sockets = dict(self.poller.poll(Sender.REQUEST_TIMEOUT_MS)) + sockets = dict(self.poller.poll(self.request_timeout_ms)) if sockets.get(self.socket) == zmq.POLLIN: reply = self.socket.recv() diff --git a/tests/runner/check_sender_receiver.py b/tests/runner/check_sender_receiver.py index 7db0caeb7..f9838421a 100644 --- a/tests/runner/check_sender_receiver.py +++ b/tests/runner/check_sender_receiver.py @@ -27,10 +27,11 @@ class CheckSenderReceiver(object): - def ready_response(self, client_id, port): + def ready_response(self, client_id, port, request_timeout_ms=None): sender_event_factory = ClientEventFactory("test_1", 0, client_id) sender = Sender(server_host='localhost', server_port=port, - message_supplier=sender_event_factory, logger=logging) + message_supplier=sender_event_factory, logger=logging, + request_timeout_ms=request_timeout_ms) sender.send(sender_event_factory.ready()) def check_simple_messaging(self): @@ -64,7 +65,7 @@ def check_timeout(self): port = receiver.port try: - p = mp.Process(target=self.ready_response, args=(client_id, port)) + p = mp.Process(target=self.ready_response, args=(client_id, port, 100)) p.start() with pytest.raises(TimeoutError): receiver.recv(timeout=0)