diff --git a/ducktape/tests/runner.py b/ducktape/tests/runner.py index f19c8a295..14ea7e75c 100644 --- a/ducktape/tests/runner.py +++ b/ducktape/tests/runner.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple, defaultdict +from collections import namedtuple, defaultdict, deque import copy import logging import multiprocessing @@ -38,11 +38,13 @@ from ducktape.errors import TimeoutError DEFAULT_MP_JOIN_TIMEOUT = 30 +SLOW_EVENT_THRESHOLD_S = 5 # Used in a log line to indicate issues are are potentially "corrupting" in the sense # that they may cause tests that have nothing to do with the original issue to fail. # After such an issues occurs, later test results should be treated with suspicion. CORRUPTING_FAILURE_TAG = "CORRUPTING_FAILURE" +FATAL_ERROR_TAG = "FATAL_ERROR" class Receiver(object): def __init__(self, min_port, max_port): @@ -131,6 +133,7 @@ def __init__(self, cluster, session_context, session_logger, tests, deflake_num, self._client_procs = {} # track client processes running tests self.active_tests = {} self.finished_tests = {} + self._recent_messages = deque(maxlen=100) self.test_schedule_log = [] self.finish_join_timeout = finish_join_timeout @@ -266,10 +269,22 @@ def run_all_tests(self): if self._expect_client_requests: try: event = self.receiver.recv(timeout=int(int(self.session_context.test_runner_timeout) * 1.2)) # test_runner_timeout is handled in the client. adding 20% seconds on top, to guard against client not being able to report to the server - self._handle(event) + handle_start = time.time() + try: + self._handle(event) + finally: + handle_elapsed = time.time() - handle_start + if handle_elapsed > SLOW_EVENT_THRESHOLD_S: + self._log(logging.WARNING, + f"Event processing took {handle_elapsed:.1f}s for " + f"event_type={event.get('event_type')}, " + f"test_id={event.get('test_id')}, test_index={event.get('test_index')}") except Exception as e: - err_str = "Exception receiving message: %s: %s, active_tests: \n %s \n" % (str(type(e)), str(e), self.active_tests_debug()) + err_str = "%s Exception receiving message: %s: %s, active_tests: \n %s \n" % (FATAL_ERROR_TAG, str(type(e)), str(e), self.active_tests_debug()) err_str += "\n" + traceback.format_exc(limit=16) + err_str += "\nRecent messages:\n" + for msg in self._recent_messages: + err_str += f" {msg}\n" self._log(logging.ERROR, err_str) # All processes are on the same machine, so treat communication failure as a fatal error @@ -353,6 +368,12 @@ def _preallocate_subcluster(self, test_context): self._test_cluster[TestKey(test_context.test_id, self.test_counter)] = FiniteSubcluster(allocated) def _handle(self, event): + self._recent_messages.append({ + "test_key": TestKey(event.get("test_id"), event.get("test_index")), + "event_type": event.get("event_type"), + "event_id": event.get("event_id"), + "message_id": event.get("message_id"), + }) self._log(logging.DEBUG, str(event)) if event["event_type"] == ClientEventFactory.READY: @@ -383,6 +404,18 @@ def _handle_finished(self, event): test_key = TestKey(event["test_id"], event["test_index"]) self.receiver.send(self.event_response.finished(event)) + if test_key not in self.active_tests: + if test_key in self.finished_tests: + elapsed = time.time() - self.finished_tests[test_key]["event_time"] + detail = f"already FINISHED {elapsed:.1f}s ago" + else: + detail = "never seen before" + self._log(logging.WARNING, + f"{CORRUPTING_FAILURE_TAG} received FINISHED for {test_key} not in active_tests " + f"({detail}, event_id={event.get('event_id')}, " + f"message_id={event.get('message_id')}). Ignoring.") + return + result = event['result'] if result.test_status == FAIL and self.exit_first: self.stop_testing = True diff --git a/ducktape/tests/runner_client.py b/ducktape/tests/runner_client.py index 84fe8311c..aedf6da1a 100644 --- a/ducktape/tests/runner_client.py +++ b/ducktape/tests/runner_client.py @@ -43,7 +43,10 @@ def run_client(*args, **kwargs): class Sender(object): - REQUEST_TIMEOUT_MS = 3000 + # Must be greater than DEFAULT_MP_JOIN_TIMEOUT (30s) in runner.py, + # since the driver blocks on _join_test_process during FINISHED handling + # and non-FINISHED events are not idempotent (can't safely retry). + REQUEST_TIMEOUT_MS = 35000 NUM_RETRIES = 5 serde: SerDe @@ -59,13 +62,15 @@ def __init__( server_host: str, server_port: int, message_supplier: ClientEventFactory, - logger: logging.Logger + logger: logging.Logger, + request_timeout_ms: int = None, ): self.serde = SerDe() self.server_endpoint = "tcp://%s:%s" % (str(server_host), str(server_port)) self.zmq_context = zmq.Context() self.socket = None self.poller = zmq.Poller() + self.request_timeout_ms = request_timeout_ms if request_timeout_ms is not None else self.REQUEST_TIMEOUT_MS self.message_supplier = message_supplier self.logger = logger @@ -88,7 +93,7 @@ def send(self, event, blocking=True): waiting_for_reply = True while waiting_for_reply: - sockets = dict(self.poller.poll(Sender.REQUEST_TIMEOUT_MS)) + sockets = dict(self.poller.poll(self.request_timeout_ms)) if sockets.get(self.socket) == zmq.POLLIN: reply = self.socket.recv() diff --git a/tests/runner/check_sender_receiver.py b/tests/runner/check_sender_receiver.py index 7db0caeb7..f9838421a 100644 --- a/tests/runner/check_sender_receiver.py +++ b/tests/runner/check_sender_receiver.py @@ -27,10 +27,11 @@ class CheckSenderReceiver(object): - def ready_response(self, client_id, port): + def ready_response(self, client_id, port, request_timeout_ms=None): sender_event_factory = ClientEventFactory("test_1", 0, client_id) sender = Sender(server_host='localhost', server_port=port, - message_supplier=sender_event_factory, logger=logging) + message_supplier=sender_event_factory, logger=logging, + request_timeout_ms=request_timeout_ms) sender.send(sender_event_factory.ready()) def check_simple_messaging(self): @@ -64,7 +65,7 @@ def check_timeout(self): port = receiver.port try: - p = mp.Process(target=self.ready_response, args=(client_id, port)) + p = mp.Process(target=self.ready_response, args=(client_id, port, 100)) p.start() with pytest.raises(TimeoutError): receiver.recv(timeout=0)