From 0396f82d31aa3c03e7c4518db4620fc7f9c54680 Mon Sep 17 00:00:00 2001 From: Loonride <22580625+Loonride@users.noreply.github.com> Date: Mon, 23 Aug 2021 12:10:28 -0500 Subject: [PATCH] change rcvtimeo for tasks_q and prevent blocking --- funcx_forwarder/forwarder.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/funcx_forwarder/forwarder.py b/funcx_forwarder/forwarder.py index 731fd70..96eee37 100644 --- a/funcx_forwarder/forwarder.py +++ b/funcx_forwarder/forwarder.py @@ -243,9 +243,12 @@ def initialize_endpoint_queues(self): ''' Initialize the three queues over which the forwarder communicates with endpoints TaskQueue in mode='server' binds to all interfaces by default ''' + # SNDTIMEO for the tasks_q should be 0 because we do not want to allow + # send blocking in the main forwarder loop (an exception will occur instead + # if the send can't happen immediately) self.tasks_q = TaskQueue('127.0.0.1', port=self.tasks_port, - RCVTIMEO=1, + SNDTIMEO=0, keys_dir=self.keys_dir, mode='server') self.results_q = TaskQueue('127.0.0.1', @@ -316,7 +319,7 @@ def handle_endpoint_connection(self): are sent from the interchange -> forwarder on the task_q ''' try: - b_ep_id, reg_message = self.tasks_q.get(timeout=0) # timeout in ms # Update to 0ms + b_ep_id, reg_message = self.tasks_q.get(block=False, timeout=0) # timeout in ms # Update to 0ms # At this point ep_id is authenticated by means having the client keys. ep_id = b_ep_id.decode('utf-8')