From 48c9716b4520f073883be81e14f82a0e99d10848 Mon Sep 17 00:00:00 2001 From: Sakari Date: Sun, 23 Jun 2013 09:33:46 +0200 Subject: [PATCH 1/3] Add multipart support, remove custom framing --- txzmq/pubsub.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/txzmq/pubsub.py b/txzmq/pubsub.py index 154325b..623633b 100644 --- a/txzmq/pubsub.py +++ b/txzmq/pubsub.py @@ -12,7 +12,7 @@ class ZmqPubConnection(ZmqConnection): """ socketType = constants.PUB - def publish(self, message, tag=''): + def publish(self, tag, message): """ Broadcast L{message} with specified L{tag}. @@ -21,7 +21,17 @@ def publish(self, message, tag=''): @param tag: message tag @type tag: C{str} """ - self.send(tag + '\0' + message) + self.publish_multipart([tag, message]) + + def publish_multipart(self, message): + """ + Broadcast L{message} with specified L{tag} + + @param message: message data + @type message: C{list} + """ + + self.send(message) class ZmqSubConnection(ZmqConnection): @@ -54,14 +64,9 @@ def messageReceived(self, message): @param message: message data """ - if len(message) == 2: - # compatibility receiving of tag as first part - # of multi-part message - self.gotMessage(message[1], message[0]) - else: - self.gotMessage(*reversed(message[0].split('\0', 1))) - - def gotMessage(self, message, tag): + self.gotMessage(*message) + + def gotMessage(self, *args): """ Called on incoming message recevied by subscriber From f0e1a111d9ff2038009ff2009b012463f2af720b Mon Sep 17 00:00:00 2001 From: Sakari Date: Sun, 23 Jun 2013 10:12:32 +0200 Subject: [PATCH 2/3] Ensure API level compatibility (Wire level is NOT preserved) --- txzmq/pubsub.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txzmq/pubsub.py b/txzmq/pubsub.py index 623633b..b2496ff 100644 --- a/txzmq/pubsub.py +++ b/txzmq/pubsub.py @@ -12,7 +12,7 @@ class ZmqPubConnection(ZmqConnection): """ socketType = constants.PUB - def publish(self, tag, message): + def publish(self, message, tag=''): """ Broadcast L{message} with specified L{tag}. From e39bb99899dd79af5b5eafdf52d9e753a970831d Mon Sep 17 00:00:00 2001 From: Sakari Date: Sun, 23 Jun 2013 16:47:52 +0200 Subject: [PATCH 3/3] Fix tests --- .project | 17 +++++++++++++++++ .pydevproject | 8 ++++++++ txzmq/test/test_pubsub.py | 6 +++--- txzmq/test/test_reqrep.py | 4 ++-- 4 files changed, 30 insertions(+), 5 deletions(-) create mode 100644 .project create mode 100644 .pydevproject diff --git a/.project b/.project new file mode 100644 index 0000000..1e6e561 --- /dev/null +++ b/.project @@ -0,0 +1,17 @@ + + + txZMQ + + + + + + org.python.pydev.PyDevBuilder + + + + + + org.python.pydev.pythonNature + + diff --git a/.pydevproject b/.pydevproject new file mode 100644 index 0000000..6144e7a --- /dev/null +++ b/.pydevproject @@ -0,0 +1,8 @@ + + + +/txZMQ + +python 2.7 +Default + diff --git a/txzmq/test/test_pubsub.py b/txzmq/test/test_pubsub.py index 2010c01..d42f5cd 100644 --- a/txzmq/test/test_pubsub.py +++ b/txzmq/test/test_pubsub.py @@ -16,7 +16,7 @@ class ZmqTestSubConnection(ZmqSubConnection): - def gotMessage(self, message, tag): + def gotMessage(self, tag, message): if not hasattr(self, 'messages'): self.messages = [] @@ -64,10 +64,10 @@ def tearDown(self): def test_send_recv(self): r = ZmqTestSubConnection( - self.factory, ZmqEndpoint(ZmqEndpointType.bind, "ipc://test-sock")) + self.factory, ZmqEndpoint(ZmqEndpointType.bind, "inproc://test-sock")) s = ZmqPubConnection( self.factory, ZmqEndpoint(ZmqEndpointType.connect, - "ipc://test-sock")) + "inproc://test-sock")) r.subscribe('tag') diff --git a/txzmq/test/test_reqrep.py b/txzmq/test/test_reqrep.py index fde1713..d53b4be 100644 --- a/txzmq/test/test_reqrep.py +++ b/txzmq/test/test_reqrep.py @@ -25,9 +25,9 @@ class ZmqREQREPConnectionTestCase(unittest.TestCase): def setUp(self): self.factory = ZmqFactory() - b = ZmqEndpoint(ZmqEndpointType.bind, "ipc://#3") + b = ZmqEndpoint(ZmqEndpointType.bind, "inproc://#3") self.r = ZmqTestREPConnection(self.factory, b) - c = ZmqEndpoint(ZmqEndpointType.connect, "ipc://#3") + c = ZmqEndpoint(ZmqEndpointType.connect, "inproc://#3") self.s = ZmqREQConnection(self.factory, c, identity='client') def tearDown(self):