Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions ducktape/tests/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import namedtuple, defaultdict
from collections import namedtuple, defaultdict, deque
import copy
import logging
import multiprocessing
Expand All @@ -38,11 +38,13 @@
from ducktape.errors import TimeoutError

DEFAULT_MP_JOIN_TIMEOUT = 30
SLOW_EVENT_THRESHOLD_S = 5

# Used in a log line to indicate issues are are potentially "corrupting" in the sense
# that they may cause tests that have nothing to do with the original issue to fail.
# After such an issues occurs, later test results should be treated with suspicion.
CORRUPTING_FAILURE_TAG = "CORRUPTING_FAILURE"
FATAL_ERROR_TAG = "FATAL_ERROR"

class Receiver(object):
def __init__(self, min_port, max_port):
Expand Down Expand Up @@ -131,6 +133,7 @@ def __init__(self, cluster, session_context, session_logger, tests, deflake_num,
self._client_procs = {} # track client processes running tests
self.active_tests = {}
self.finished_tests = {}
self._recent_messages = deque(maxlen=100)
self.test_schedule_log = []
self.finish_join_timeout = finish_join_timeout

Expand Down Expand Up @@ -266,10 +269,22 @@ def run_all_tests(self):
if self._expect_client_requests:
try:
event = self.receiver.recv(timeout=int(int(self.session_context.test_runner_timeout) * 1.2)) # test_runner_timeout is handled in the client. adding 20% seconds on top, to guard against client not being able to report to the server
self._handle(event)
handle_start = time.time()
try:
self._handle(event)
finally:
handle_elapsed = time.time() - handle_start
if handle_elapsed > SLOW_EVENT_THRESHOLD_S:
self._log(logging.WARNING,
f"Event processing took {handle_elapsed:.1f}s for "
f"event_type={event.get('event_type')}, "
f"test_id={event.get('test_id')}, test_index={event.get('test_index')}")
except Exception as e:
err_str = "Exception receiving message: %s: %s, active_tests: \n %s \n" % (str(type(e)), str(e), self.active_tests_debug())
err_str = "%s Exception receiving message: %s: %s, active_tests: \n %s \n" % (FATAL_ERROR_TAG, str(type(e)), str(e), self.active_tests_debug())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should CI on this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gousteris sorry I didn't understand the comment. Are you saying to do a CI run before checking in?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I meant if CI should fail when a fatal error is raised

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it should and it will as there is a raise below in this same exception handler.

err_str += "\n" + traceback.format_exc(limit=16)
err_str += "\nRecent messages:\n"
for msg in self._recent_messages:
err_str += f" {msg}\n"
self._log(logging.ERROR, err_str)

# All processes are on the same machine, so treat communication failure as a fatal error
Expand Down Expand Up @@ -353,6 +368,12 @@ def _preallocate_subcluster(self, test_context):
self._test_cluster[TestKey(test_context.test_id, self.test_counter)] = FiniteSubcluster(allocated)

def _handle(self, event):
self._recent_messages.append({
"test_key": TestKey(event.get("test_id"), event.get("test_index")),
"event_type": event.get("event_type"),
"event_id": event.get("event_id"),
"message_id": event.get("message_id"),
})
self._log(logging.DEBUG, str(event))

if event["event_type"] == ClientEventFactory.READY:
Expand Down Expand Up @@ -383,6 +404,18 @@ def _handle_finished(self, event):
test_key = TestKey(event["test_id"], event["test_index"])
self.receiver.send(self.event_response.finished(event))

if test_key not in self.active_tests:
if test_key in self.finished_tests:
elapsed = time.time() - self.finished_tests[test_key]["event_time"]
detail = f"already FINISHED {elapsed:.1f}s ago"
else:
detail = "never seen before"
self._log(logging.WARNING,
f"{CORRUPTING_FAILURE_TAG} received FINISHED for {test_key} not in active_tests "
f"({detail}, event_id={event.get('event_id')}, "
f"message_id={event.get('message_id')}). Ignoring.")
return

result = event['result']
if result.test_status == FAIL and self.exit_first:
self.stop_testing = True
Expand Down
11 changes: 8 additions & 3 deletions ducktape/tests/runner_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ def run_client(*args, **kwargs):


class Sender(object):
REQUEST_TIMEOUT_MS = 3000
# Must be greater than DEFAULT_MP_JOIN_TIMEOUT (30s) in runner.py,
# since the driver blocks on _join_test_process during FINISHED handling
# and non-FINISHED events are not idempotent (can't safely retry).
REQUEST_TIMEOUT_MS = 35000
NUM_RETRIES = 5

serde: SerDe
Expand All @@ -59,13 +62,15 @@ def __init__(
server_host: str,
server_port: int,
message_supplier: ClientEventFactory,
logger: logging.Logger
logger: logging.Logger,
request_timeout_ms: int = None,
):
self.serde = SerDe()
self.server_endpoint = "tcp://%s:%s" % (str(server_host), str(server_port))
self.zmq_context = zmq.Context()
self.socket = None
self.poller = zmq.Poller()
self.request_timeout_ms = request_timeout_ms if request_timeout_ms is not None else self.REQUEST_TIMEOUT_MS

self.message_supplier = message_supplier
self.logger = logger
Expand All @@ -88,7 +93,7 @@ def send(self, event, blocking=True):
waiting_for_reply = True

while waiting_for_reply:
sockets = dict(self.poller.poll(Sender.REQUEST_TIMEOUT_MS))
sockets = dict(self.poller.poll(self.request_timeout_ms))

if sockets.get(self.socket) == zmq.POLLIN:
reply = self.socket.recv()
Expand Down
7 changes: 4 additions & 3 deletions tests/runner/check_sender_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@


class CheckSenderReceiver(object):
def ready_response(self, client_id, port):
def ready_response(self, client_id, port, request_timeout_ms=None):
sender_event_factory = ClientEventFactory("test_1", 0, client_id)
sender = Sender(server_host='localhost', server_port=port,
message_supplier=sender_event_factory, logger=logging)
message_supplier=sender_event_factory, logger=logging,
request_timeout_ms=request_timeout_ms)
sender.send(sender_event_factory.ready())

def check_simple_messaging(self):
Expand Down Expand Up @@ -64,7 +65,7 @@ def check_timeout(self):
port = receiver.port

try:
p = mp.Process(target=self.ready_response, args=(client_id, port))
p = mp.Process(target=self.ready_response, args=(client_id, port, 100))
p.start()
with pytest.raises(TimeoutError):
receiver.recv(timeout=0)
Expand Down