Skip to content
This repository was archived by the owner on Feb 22, 2022. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions funcx_forwarder/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,12 @@ def initialize_endpoint_queues(self):
''' Initialize the three queues over which the forwarder communicates with endpoints
TaskQueue in mode='server' binds to all interfaces by default
'''
# SNDTIMEO for the tasks_q should be 0 because we do not want to allow
# send blocking in the main forwarder loop (an exception will occur instead
# if the send can't happen immediately)
self.tasks_q = TaskQueue('127.0.0.1',
port=self.tasks_port,
RCVTIMEO=1,
SNDTIMEO=0,
keys_dir=self.keys_dir,
mode='server')
self.results_q = TaskQueue('127.0.0.1',
Expand Down Expand Up @@ -316,7 +319,7 @@ def handle_endpoint_connection(self):
are sent from the interchange -> forwarder on the task_q
'''
try:
b_ep_id, reg_message = self.tasks_q.get(timeout=0) # timeout in ms # Update to 0ms
b_ep_id, reg_message = self.tasks_q.get(block=False, timeout=0) # timeout in ms # Update to 0ms
# At this point ep_id is authenticated by means having the client keys.
ep_id = b_ep_id.decode('utf-8')

Expand Down