-
Notifications
You must be signed in to change notification settings - Fork 3
Conversation
funcx_forwarder/forwarder.py
Outdated
| ''' | ||
| try: | ||
| b_ep_id, reg_message = self.tasks_q.get(timeout=0) # timeout in ms # Update to 0ms | ||
| b_ep_id, reg_message = self.tasks_q.get(block=False, timeout=0) # timeout in ms # Update to 0ms |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code before was flawed because block=True is the default. However, it was not blocking endlessly I believe only because the task_q RCVTIMEO=1 was set in initialization (1ms of blocking before there is a raise ). We should do this here instead to make it clear that this get call is not blocking in the main forwarder loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I believe the logic was to block, but upto the RCVTIMEO, after which we get a zmq.Again exception. Switching to a non-blocking poll of 0ms makes sense given how infrequently there are registration messages.
| logger.debug("Configuring server") | ||
| self.zmq_socket = self.context.socket(zmq.ROUTER) | ||
| self.zmq_socket.set(zmq.ROUTER_MANDATORY, 1) | ||
| self.zmq_socket.set(zmq.ROUTER_HANDOVER, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need this because we want new zmq socket ids (we use endpoint id) to replace the old sockets with the same id. Not doing so means an old socket with a given id can block a new socket from communicating that is given that same id on a new registration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this was not an issue with the tasks_q from the forwarder because since it was outgoing the forwarder could realize the client was not reachable, so when it sends heartbeats over this channel it simply thinks the same client socket has reconnected. This is why heartbeats were still reaching newly connected sockets, since they were going over this channel.
However, with the results_q, the forwarder has no way of realizing that the socket is unreachable since it is not sending messages over this channel, only receiving messages. So it still thinks the old socket is connected when a new socket tries to connect.
b5587e2 to
31f87d0
Compare
yadudoc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Loonride, The changes to the ordering and setting of socket options looks correct. The change to blocking behavior for the TaskQueue for registration messages seem small but these can often be troublesome in practice. So, I would recommend splitting this change out to a separate PR.
7e7d64c to
445267a
Compare
yadudoc
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to go.
Related PR: globus/globus-compute#565