Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions src/bydantic/framing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import typing as t
from .core import BitfieldT


class FramingProtocol(t.Protocol):
def unframe_data(self, data: bytes) -> t.Tuple[t.List[bytes], bytes]:
...

def frame_data(self, frames: t.Sequence[bytes]) -> bytes:
...


class SimpleFraming:
def __init__(
self,
delimiter: int,
escape_byte: int,
escape_map: t.Dict[int, int],
):
self.delimiter = delimiter
self.escape_byte = escape_byte
self.escape_map = escape_map

def unescape_frame(self, frame: t.ByteString) -> bytes:
inverse_map = {v: k for k, v in self.escape_map.items()}
unescaped = bytearray()
i = 0
while i < len(frame):
byte = frame[i]
if byte == self.escape_byte:
i += 1
if i >= len(frame):
break
esc = frame[i]
if esc not in inverse_map:
raise ValueError(
f"Invalid escape sequence: {self.escape_byte:02X} {esc:02X}"
)
unescaped.append(inverse_map[esc])
else:
unescaped.append(byte)
i += 1
return bytes(unescaped)

def unframe_data(self, data: bytes) -> t.Tuple[t.List[bytes], bytes]:
frames: t.List[bytes] = []
current_frame = bytearray()
i = 0

while i < len(data):
byte = data[i]

if byte == self.delimiter:
if current_frame:
frames.append(self.unescape_frame(current_frame))
current_frame.clear()
i += 1
else:
current_frame.append(byte)
i += 1

remaining = (
bytes([self.delimiter]) + current_frame if current_frame else b""
)

return frames, remaining

def frame_data(self, frames: t.Sequence[bytes]) -> bytes:
output = bytearray()
for frame in frames:
output.append(self.delimiter)
for byte in frame:
if byte in (self.delimiter, self.escape_byte):
output.append(self.escape_byte)
output.append(self.escape_map[byte])
else:
output.append(byte)
output.append(self.delimiter)

return bytes(output)


class BitfieldFramer(t.Generic[BitfieldT]):
def __init__(
self,
bitfield: t.Type[BitfieldT],
framing: FramingProtocol,
):
self.bitfield = bitfield
self.framing = framing

def from_bytes_batch(self, data: bytes) -> t.Tuple[t.List[BitfieldT], bytes]:
"""
Deserializes a batch of bitfields from a byte string, with framing.

Args:
data (bytes): The byte string to deserialize.

Returns:
t.Tuple[t.List[BitfieldT], bytes]: A tuple containing a list of
deserialized bitfields and any remaining bytes.
"""
frames, remaining = self.framing.unframe_data(data)
bitfields = [self.bitfield.from_bytes_exact(frame) for frame in frames]
return bitfields, remaining

def to_bytes(self, data: t.Sequence[BitfieldT]) -> bytes:
"""
Serializes the bitfield to a byte string, with framing.

Args:
data (BitfieldT): The bitfield to serialize.

Returns:
bytes: The serialized bitfield as a byte string.
"""
raw_data = tuple(frame.to_bytes() for frame in data)
return self.framing.frame_data(raw_data)
92 changes: 92 additions & 0 deletions tests/test_framing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import pytest
import bydantic as bd
from bydantic.framing import SimpleFraming, BitfieldFramer

# Define KissFraming as an instance of SimpleFraming
kissFraming = SimpleFraming(
delimiter=0xC0,
escape_byte=0xDB,
escape_map={
0xC0: 0xDC, # Frame delimiter
0xDB: 0xDD, # Escape byte
},
)


def test_frame_data():
frames = [
b"\x01\x02\x03",
b"\x04\x05\x06"
]

framed_data = kissFraming.frame_data(frames)

# Includes delimiter framing
expected_framed_data = b"\xC0\x01\x02\x03\xC0\xC0\x04\x05\x06\xC0"

assert framed_data == expected_framed_data


def test_unframe_data():
data = b"\xC0\x01\x02\x03\xC0\xC0\x04\x05\x06\xC0"

frames, remaining = kissFraming.unframe_data(data)

assert frames == [b"\x01\x02\x03", b"\x04\x05\x06"]
assert remaining == b""


def test_unframe_data_with_remaining():
data = b"\xC0\x01\x02\x03\xC0\xC0\x04\x05\x06\xC0\xC0\x07\x08"

frames, remaining = kissFraming.unframe_data(data)

assert frames == [b"\x01\x02\x03", b"\x04\x05\x06"]
assert remaining == b"\xC0\x07\x08"


def test_frame_data_with_escape():
frames = [
b"\x01\x02\xC0\x03",
b"\x04\xDB\x05"
]

framed_data = kissFraming.frame_data(frames)

expected_framed_data = b"\xC0\x01\x02\xDB\xDC\x03\xC0\xC0\x04\xDB\xDD\x05\xC0"
assert framed_data == expected_framed_data


def test_unframe_data_with_escaped_bytes():
data = b"\xC0\x01\x02\xDB\xDC\x03\xC0\xC0\x04\xDB\xDD\x05\xC0"

frames, remaining = kissFraming.unframe_data(data)

assert frames == [b"\x01\x02\xC0\x03", b"\x04\xDB\x05"]
assert remaining == b""


def test_unframe_data_invalid_escape():
data = b"\xC0\x01\x02\xDB\xFF\x03\xC0"

with pytest.raises(ValueError):
kissFraming.unframe_data(data)


def test_framed_bitfield():
class Foo(bd.Bitfield):
a: int = bd.uint_field(4)
b: int = bd.uint_field(4)

data = b"\xC0\x12\xC0\xC0\x12\xC0\xC0\x12\xC0"

framer = BitfieldFramer(Foo, kissFraming)

foo = Foo(a=1, b=2)

assert framer.to_bytes([foo, foo, foo]) == data

frames, remaining = framer.from_bytes_batch(data + b"\xC0\x12")

assert frames == [foo, foo, foo]
assert remaining == b"\xC0\x12"