diff --git a/src/mktl/protocol/publish.py b/src/mktl/protocol/publish.py index cb44828..4c2ea93 100644 --- a/src/mktl/protocol/publish.py +++ b/src/mktl/protocol/publish.py @@ -329,21 +329,70 @@ def __init__(self, port=None, avoid=set()): self.port = trial + # Experiments with a multiprocessing.SimpleQueue and an ipc socket + # for notifications result in something like a 35% slowdown compared + # to using a queue.SimpleQueue and an inproc socket. + + try: + # Available in Python 3.7+. + self.broadcasts = queue.SimpleQueue() + except AttributeError: + self.broadcasts = queue.Queue() + + internal = "inproc://publish.Server.signal:%d" % (port) + self.broadcast_address = internal + self.broadcast_receive = zmq_context.socket(zmq.PAIR) + self.broadcast_receive.bind(internal) + + self.broadcast_signal = zmq_context.socket(zmq.PAIR) + self.broadcast_signal.connect(internal) + + self.publishing_thread = threading.Thread(target=self.run) + self.publishing_thread.daemon = True + self.publishing_thread.start() + def publish(self, message): """ A *message* is a :class:`mktl.protocol.message.Broadcast` instance intended for broadcast to any/all subscribers. """ + self.broadcasts.put(message) + self.broadcast_signal.send(b'') + + + def _pub_outgoing(self): + """ Clear one broadcast notification and send one pending broadcast. + """ + + self.broadcast_receive.recv(flags=zmq.NOBLOCK) + message = self.broadcasts.get(block=False) + parts = tuple(message) - # The lock around the ZeroMQ socket is necessary in a multithreaded + # A lock around the ZeroMQ socket is necessary in a multithreaded # application; otherwise, if two different threads both invoke - # send_multipart(), the message parts can and will get mixed together. + # send_multipart(), the message parts can and will get mixed + # together. However, this send_multipart() call is now only called + # from a single thread handling all send/recv calls, so the + # lock is no longer in place. - self.socket_lock.acquire() self.socket.send_multipart(parts) - self.socket_lock.release() + + + def run(self): + """ All send() calls are sequestered to this thread. Though not as + problematic as the REQ/REP case, it seems like good practice to + mirror the same structure here. + """ + + poller = zmq.Poller() + poller.register(self.broadcast_receive, zmq.POLLIN) + + while True: + sockets = poller.poll(10000) # milliseconds + for ignored in sockets: + self._pub_outgoing() # end of class Server