Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions src/mktl/protocol/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,21 +329,70 @@ def __init__(self, port=None, avoid=set()):

self.port = trial

# Experiments with a multiprocessing.SimpleQueue and an ipc socket
# for notifications result in something like a 35% slowdown compared
# to using a queue.SimpleQueue and an inproc socket.

try:
# Available in Python 3.7+.
self.broadcasts = queue.SimpleQueue()
except AttributeError:
self.broadcasts = queue.Queue()

internal = "inproc://publish.Server.signal:%d" % (port)
self.broadcast_address = internal
self.broadcast_receive = zmq_context.socket(zmq.PAIR)
self.broadcast_receive.bind(internal)

self.broadcast_signal = zmq_context.socket(zmq.PAIR)
self.broadcast_signal.connect(internal)

self.publishing_thread = threading.Thread(target=self.run)
self.publishing_thread.daemon = True
self.publishing_thread.start()


def publish(self, message):
""" A *message* is a :class:`mktl.protocol.message.Broadcast` instance
intended for broadcast to any/all subscribers.
"""

self.broadcasts.put(message)
self.broadcast_signal.send(b'')


def _pub_outgoing(self):
""" Clear one broadcast notification and send one pending broadcast.
"""

self.broadcast_receive.recv(flags=zmq.NOBLOCK)
message = self.broadcasts.get(block=False)

parts = tuple(message)

# The lock around the ZeroMQ socket is necessary in a multithreaded
# A lock around the ZeroMQ socket is necessary in a multithreaded
# application; otherwise, if two different threads both invoke
# send_multipart(), the message parts can and will get mixed together.
# send_multipart(), the message parts can and will get mixed
# together. However, this send_multipart() call is now only called
# from a single thread handling all send/recv calls, so the
# lock is no longer in place.

self.socket_lock.acquire()
self.socket.send_multipart(parts)
self.socket_lock.release()


def run(self):
""" All send() calls are sequestered to this thread. Though not as
problematic as the REQ/REP case, it seems like good practice to
mirror the same structure here.
"""

poller = zmq.Poller()
poller.register(self.broadcast_receive, zmq.POLLIN)

while True:
sockets = poller.poll(10000) # milliseconds
for ignored in sockets:
self._pub_outgoing()


# end of class Server
Expand Down
Loading