diff --git a/funcx_forwarder/forwarder.py b/funcx_forwarder/forwarder.py index 731fd70..96eee37 100644 --- a/funcx_forwarder/forwarder.py +++ b/funcx_forwarder/forwarder.py @@ -243,9 +243,12 @@ def initialize_endpoint_queues(self): ''' Initialize the three queues over which the forwarder communicates with endpoints TaskQueue in mode='server' binds to all interfaces by default ''' + # SNDTIMEO for the tasks_q should be 0 because we do not want to allow + # send blocking in the main forwarder loop (an exception will occur instead + # if the send can't happen immediately) self.tasks_q = TaskQueue('127.0.0.1', port=self.tasks_port, - RCVTIMEO=1, + SNDTIMEO=0, keys_dir=self.keys_dir, mode='server') self.results_q = TaskQueue('127.0.0.1', @@ -316,7 +319,7 @@ def handle_endpoint_connection(self): are sent from the interchange -> forwarder on the task_q ''' try: - b_ep_id, reg_message = self.tasks_q.get(timeout=0) # timeout in ms # Update to 0ms + b_ep_id, reg_message = self.tasks_q.get(block=False, timeout=0) # timeout in ms # Update to 0ms # At this point ep_id is authenticated by means having the client keys. ep_id = b_ep_id.decode('utf-8')