From 69d6bfbc7db1810209769fdc8291b10b50b3a35f Mon Sep 17 00:00:00 2001 From: Eleni Alexi Date: Fri, 22 Nov 2024 10:51:43 -0500 Subject: [PATCH 1/5] Codec classes for Message encoding & decoding. MessageCodec ( base class) / JsonMessageCodec ( for JSON format) / BinaryMessageCodec ( for binary) --- src/compas_eve/core.py | 141 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/src/compas_eve/core.py b/src/compas_eve/core.py index 768f254..5b3bc85 100644 --- a/src/compas_eve/core.py +++ b/src/compas_eve/core.py @@ -58,6 +58,147 @@ def unadvertise(self, topic): pass +class MessageCodec: + """ + Base class for message codecs. + + This class defines the interface for message encoding and decoding. + Subclasses should implement the `encode` and `decode` methods to handle + specific serialization formats. + """ + def encode(self, message): + """ + Encode a message into a serialized format. + + Parameters + ---------- + message : Message or dict + The message to encode. + + Returns + ------- + bytes or str + The encoded message. + + Raises + ------ + NotImplementedError + If the method is not implemented by a subclass. + """ + raise NotImplementedError + + def decode(self, message): + """ + Decode a serialized message back into a `Message` object. + + Parameters + ---------- + message : bytes or str + The serialized message to decode. + + Returns + ------- + Message + The decoded message. + + Raises + ------ + NotImplementedError + If the method is not implemented by a subclass. + """ + raise NotImplementedError + + +class JsonMessageCodec(MessageCodec): + """ + Message codec for JSON serialization. + + This codec handles encoding and decoding messages using JSON format. + It supports messages that are instances of `Message` or dictionaries. + """ + def encode(self, message): + """ + Encode a message into a JSON string. + + Parameters + ---------- + message : Message or dict + The message to encode. + + Returns + ------- + str + The JSON-encoded message. + """ + if isinstance(message, Message): + data = message.data + else: + data = message + return json_dumps(data) + + def decode(self, message): + """ + Decode a JSON string back into a `Message` object. + + Parameters + ---------- + message : bytes or str + The JSON-encoded message to decode. + + Returns + ------- + Message + The decoded message. + """ + data = json_loads(message.decode("utf-8")) + return Message(**data) + + +class BinaryMessageCodec(MessageCodec): + """ + Message codec for binary serialization using MessagePack. + + This codec handles encoding and decoding messages using MessagePack. + """ + def encode(self, message): + """ + Encode a message into a binary format using MessagePack. + + Parameters + ---------- + message : Message or dict + The message to encode. + + Returns + ------- + bytes + The MessagePack-encoded message. + """ + + if isinstance(message, Message): + data = message.data + else: + data = message + return msgpack.packb(data) + + def decode(self, message): + """ + Decode a MessagePack binary message back into a `Message` object. + + Parameters + ---------- + message : bytes + The MessagePack-encoded message to decode. + + Returns + ------- + Message + The decoded message. + """ + data = msgpack.unpackb(message) + return Message(**data) + + class Message(object): """Message objects used for publishing and subscribing to/from topics. From faab5cb188d7220cbc48eeb2829e219711f8f284 Mon Sep 17 00:00:00 2001 From: Eleni Alexi Date: Fri, 22 Nov 2024 10:58:32 -0500 Subject: [PATCH 2/5] Integration into `Transport` class for user selection of message coding type --- src/compas_eve/core.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/compas_eve/core.py b/src/compas_eve/core.py index 5b3bc85..07e5a0b 100644 --- a/src/compas_eve/core.py +++ b/src/compas_eve/core.py @@ -1,5 +1,6 @@ from compas.data import json_dumps from compas.data import json_loads +import msgpack DEFAULT_TRANSPORT = None @@ -32,9 +33,10 @@ def set_default_transport(transport): class Transport(object): """Defines the base interface for different transport implementations.""" - def __init__(self, *args, **kwargs): + def __init__(self, codec=None, *args, **kwargs): super(Transport, self).__init__(*args, **kwargs) self._id_counter = 0 + self.codec = codec or JsonMessageCodec() @property def id_counter(self): @@ -43,6 +45,7 @@ def id_counter(self): return self._id_counter def publish(self, topic, message): + self.codec.encode(message) pass def subscribe(self, topic, callback): From 3c3757a9a4ee70d1e7c00901bbe60e4ffbd0c691 Mon Sep 17 00:00:00 2001 From: Eleni Alexi Date: Fri, 22 Nov 2024 11:11:18 -0500 Subject: [PATCH 3/5] Class imports at package level & changing message encoding to selected Encoder (MessageCodec) --- src/compas_eve/__init__.py | 4 ++++ src/compas_eve/mqtt/mqtt_paho.py | 11 +++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/compas_eve/__init__.py b/src/compas_eve/__init__.py index 8dadd66..1aec875 100644 --- a/src/compas_eve/__init__.py +++ b/src/compas_eve/__init__.py @@ -44,6 +44,8 @@ EchoSubscriber, Transport, Topic, + JsonMessageCodec, + BinaryMessageCodec, get_default_transport, set_default_transport, ) @@ -61,6 +63,8 @@ "Subscriber", "EchoSubscriber", "Topic", + "JsonMessageCodec", + "BinaryMessageCodec", "Transport", "get_default_transport", "set_default_transport", diff --git a/src/compas_eve/mqtt/mqtt_paho.py b/src/compas_eve/mqtt/mqtt_paho.py index 6ded69e..576e883 100644 --- a/src/compas_eve/mqtt/mqtt_paho.py +++ b/src/compas_eve/mqtt/mqtt_paho.py @@ -60,8 +60,8 @@ def publish(self, topic, message): """ def _callback(**kwargs): - json_message = topic._message_to_json(message) - self.client.publish(topic.name, json_message) + encoded_message = self.codec.encode(message) + self.client.publish(topic.name, encoded_message) self.on_ready(_callback) @@ -87,8 +87,11 @@ def subscribe(self, topic, callback): subscribe_id = "{}:{}".format(event_key, id(callback)) def _local_callback(msg): - msg = topic._message_from_json(msg.payload.decode()) - callback(msg) + decoded_message = self.codec.decode(msg.payload) + callback(decoded_message) + + # msg = topic._message_from_json(msg.payload.decode()) + # callback(msg) def _subscribe_callback(**kwargs): self.client.subscribe(topic.name) From 7651adc87d6652354d103660e8aeb63f1b4b2cb5 Mon Sep 17 00:00:00 2001 From: Eleni Alexi Date: Fri, 22 Nov 2024 11:12:15 -0500 Subject: [PATCH 4/5] Authors & Changelog & requirements --- AUTHORS.md | 1 + CHANGELOG.md | 4 ++++ requirements.txt | 1 + 3 files changed, 6 insertions(+) diff --git a/AUTHORS.md b/AUTHORS.md index 385a531..3756494 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -1,3 +1,4 @@ # Authors - Gonzalo Casas <> [@gonzalocasas](https://github.com/gonzalocasas) +- Eleni Vasiliki Alexi <> [@elenialex](https://github.com/elenialex) diff --git a/CHANGELOG.md b/CHANGELOG.md index ebd8439..643ea2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +* Add binary serialization support to `Transport` class. +* Add classes `MessageCodec`, `JsonMessageCodec`, `BinaryMessageCodec`. +* Add `msgpack` depedency for binary serialization support. + ### Changed ### Removed diff --git a/requirements.txt b/requirements.txt index c9d1c1e..235581a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ compas>=1.17.6 paho-mqtt >=1, <2 +msgpack From 5a26c111b3947b88c48dab00b374cdc863afdc7c Mon Sep 17 00:00:00 2001 From: Eleni Alexi Date: Fri, 22 Nov 2024 11:17:43 -0500 Subject: [PATCH 5/5] lint & format --- src/compas_eve/core.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/compas_eve/core.py b/src/compas_eve/core.py index 07e5a0b..a1dfa29 100644 --- a/src/compas_eve/core.py +++ b/src/compas_eve/core.py @@ -69,6 +69,7 @@ class MessageCodec: Subclasses should implement the `encode` and `decode` methods to handle specific serialization formats. """ + def encode(self, message): """ Encode a message into a serialized format. @@ -119,6 +120,7 @@ class JsonMessageCodec(MessageCodec): This codec handles encoding and decoding messages using JSON format. It supports messages that are instances of `Message` or dictionaries. """ + def encode(self, message): """ Encode a message into a JSON string. @@ -163,6 +165,7 @@ class BinaryMessageCodec(MessageCodec): This codec handles encoding and decoding messages using MessagePack. """ + def encode(self, message): """ Encode a message into a binary format using MessagePack.