diff --git a/AUTHORS.md b/AUTHORS.md index 385a531..3756494 100644 --- a/AUTHORS.md +++ b/AUTHORS.md @@ -1,3 +1,4 @@ # Authors - Gonzalo Casas <> [@gonzalocasas](https://github.com/gonzalocasas) +- Eleni Vasiliki Alexi <> [@elenialex](https://github.com/elenialex) diff --git a/CHANGELOG.md b/CHANGELOG.md index ebd8439..643ea2d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +* Add binary serialization support to `Transport` class. +* Add classes `MessageCodec`, `JsonMessageCodec`, `BinaryMessageCodec`. +* Add `msgpack` depedency for binary serialization support. + ### Changed ### Removed diff --git a/requirements.txt b/requirements.txt index c9d1c1e..235581a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ compas>=1.17.6 paho-mqtt >=1, <2 +msgpack diff --git a/src/compas_eve/__init__.py b/src/compas_eve/__init__.py index 8dadd66..1aec875 100644 --- a/src/compas_eve/__init__.py +++ b/src/compas_eve/__init__.py @@ -44,6 +44,8 @@ EchoSubscriber, Transport, Topic, + JsonMessageCodec, + BinaryMessageCodec, get_default_transport, set_default_transport, ) @@ -61,6 +63,8 @@ "Subscriber", "EchoSubscriber", "Topic", + "JsonMessageCodec", + "BinaryMessageCodec", "Transport", "get_default_transport", "set_default_transport", diff --git a/src/compas_eve/core.py b/src/compas_eve/core.py index 768f254..a1dfa29 100644 --- a/src/compas_eve/core.py +++ b/src/compas_eve/core.py @@ -1,5 +1,6 @@ from compas.data import json_dumps from compas.data import json_loads +import msgpack DEFAULT_TRANSPORT = None @@ -32,9 +33,10 @@ def set_default_transport(transport): class Transport(object): """Defines the base interface for different transport implementations.""" - def __init__(self, *args, **kwargs): + def __init__(self, codec=None, *args, **kwargs): super(Transport, self).__init__(*args, **kwargs) self._id_counter = 0 + self.codec = codec or JsonMessageCodec() @property def id_counter(self): @@ -43,6 +45,7 @@ def id_counter(self): return self._id_counter def publish(self, topic, message): + self.codec.encode(message) pass def subscribe(self, topic, callback): @@ -58,6 +61,150 @@ def unadvertise(self, topic): pass +class MessageCodec: + """ + Base class for message codecs. + + This class defines the interface for message encoding and decoding. + Subclasses should implement the `encode` and `decode` methods to handle + specific serialization formats. + """ + + def encode(self, message): + """ + Encode a message into a serialized format. + + Parameters + ---------- + message : Message or dict + The message to encode. + + Returns + ------- + bytes or str + The encoded message. + + Raises + ------ + NotImplementedError + If the method is not implemented by a subclass. + """ + raise NotImplementedError + + def decode(self, message): + """ + Decode a serialized message back into a `Message` object. + + Parameters + ---------- + message : bytes or str + The serialized message to decode. + + Returns + ------- + Message + The decoded message. + + Raises + ------ + NotImplementedError + If the method is not implemented by a subclass. + """ + raise NotImplementedError + + +class JsonMessageCodec(MessageCodec): + """ + Message codec for JSON serialization. + + This codec handles encoding and decoding messages using JSON format. + It supports messages that are instances of `Message` or dictionaries. + """ + + def encode(self, message): + """ + Encode a message into a JSON string. + + Parameters + ---------- + message : Message or dict + The message to encode. + + Returns + ------- + str + The JSON-encoded message. + """ + if isinstance(message, Message): + data = message.data + else: + data = message + return json_dumps(data) + + def decode(self, message): + """ + Decode a JSON string back into a `Message` object. + + Parameters + ---------- + message : bytes or str + The JSON-encoded message to decode. + + Returns + ------- + Message + The decoded message. + """ + data = json_loads(message.decode("utf-8")) + return Message(**data) + + +class BinaryMessageCodec(MessageCodec): + """ + Message codec for binary serialization using MessagePack. + + This codec handles encoding and decoding messages using MessagePack. + """ + + def encode(self, message): + """ + Encode a message into a binary format using MessagePack. + + Parameters + ---------- + message : Message or dict + The message to encode. + + Returns + ------- + bytes + The MessagePack-encoded message. + """ + + if isinstance(message, Message): + data = message.data + else: + data = message + return msgpack.packb(data) + + def decode(self, message): + """ + Decode a MessagePack binary message back into a `Message` object. + + Parameters + ---------- + message : bytes + The MessagePack-encoded message to decode. + + Returns + ------- + Message + The decoded message. + """ + data = msgpack.unpackb(message) + return Message(**data) + + class Message(object): """Message objects used for publishing and subscribing to/from topics. diff --git a/src/compas_eve/mqtt/mqtt_paho.py b/src/compas_eve/mqtt/mqtt_paho.py index 6ded69e..576e883 100644 --- a/src/compas_eve/mqtt/mqtt_paho.py +++ b/src/compas_eve/mqtt/mqtt_paho.py @@ -60,8 +60,8 @@ def publish(self, topic, message): """ def _callback(**kwargs): - json_message = topic._message_to_json(message) - self.client.publish(topic.name, json_message) + encoded_message = self.codec.encode(message) + self.client.publish(topic.name, encoded_message) self.on_ready(_callback) @@ -87,8 +87,11 @@ def subscribe(self, topic, callback): subscribe_id = "{}:{}".format(event_key, id(callback)) def _local_callback(msg): - msg = topic._message_from_json(msg.payload.decode()) - callback(msg) + decoded_message = self.codec.decode(msg.payload) + callback(decoded_message) + + # msg = topic._message_from_json(msg.payload.decode()) + # callback(msg) def _subscribe_callback(**kwargs): self.client.subscribe(topic.name)