From 28c24f9622ee900ffca08f91197a79e69e1db5bc Mon Sep 17 00:00:00 2001 From: gabriel Date: Tue, 24 Mar 2026 22:39:36 +0100 Subject: [PATCH 1/3] feat: add initial support for encryption --- README.md | 47 ++++++ pyproject.toml | 1 + src/opendisplay/__init__.py | 8 + src/opendisplay/crypto.py | 129 ++++++++++++++++ src/opendisplay/device.py | 176 ++++++++++++++-------- src/opendisplay/exceptions.py | 26 ++++ src/opendisplay/models/config.py | 42 ++++++ src/opendisplay/protocol/__init__.py | 4 + src/opendisplay/protocol/commands.py | 32 ++++ src/opendisplay/protocol/config_parser.py | 29 ++-- src/opendisplay/protocol/responses.py | 86 ++++++++++- 11 files changed, 505 insertions(+), 75 deletions(-) create mode 100644 src/opendisplay/crypto.py diff --git a/README.md b/README.md index fc87ad0..1cb06c9 100644 --- a/README.md +++ b/README.md @@ -293,6 +293,53 @@ async with OpenDisplayDevice(mac_address="BB:CC:DD:EE:FF:00") as device: `import_config_json()` raises `ValueError` if required packets (`system`, `manufacturer`, `power`) or all display packets are missing. JSON packet id `38` (`wifi_config` / TLV `0x26`) is supported for import/export. +### Encryption + +Devices with firmware encryption enabled require authentication before accepting any commands (except `read_firmware_version`). Pass the 16-byte AES-128 master key to the constructor — authentication and session setup happen automatically before the first interrogation. + +```python +from opendisplay import OpenDisplayDevice + +key = bytes.fromhex("0102030405060708090a0b0c0d0e0f10") + +async with OpenDisplayDevice(mac_address="AA:BB:CC:DD:EE:FF", encryption_key=key) as device: + await device.upload_image(image) +``` + +All commands are transparently encrypted after authentication. Devices without encryption enabled work exactly as before — the `encryption_key` parameter is ignored if the device does not require it. + +#### Getting the key + +The encryption key is set when configuring the device via the [Open Display Config Builder](https://opendisplay.org/firmware/config/) web tool. It can be read from the device config once authenticated: + +```python +async with OpenDisplayDevice(mac_address="AA:BB:CC:DD:EE:FF", encryption_key=key) as device: + sc = device.config.security_config + print(sc.encryption_key.hex()) # 32-char hex string + print(sc.encryption_enabled_flag) # True + print(sc.rewrite_allowed) # True if WRITE_CONFIG works without auth + print(sc.session_timeout_seconds) # How long before re-authentication is needed +``` + +#### Error handling + +```python +from opendisplay import AuthenticationRequiredError, AuthenticationFailedError + +try: + async with OpenDisplayDevice(mac_address="AA:BB:CC:DD:EE:FF") as device: + pass +except AuthenticationRequiredError: + # Device has encryption enabled but no key was provided + # (or the session expired and re-authentication is needed) + print("This device requires an encryption key") +except AuthenticationFailedError: + # A key was provided but the device rejected it (wrong key or rate-limited) + print("Wrong encryption key") +``` + +Both exceptions are subclasses of `AuthenticationError`, which can be used as a catch-all when the distinction doesn't matter. + ### Rebooting the Device Remotely reboot the device (useful after configuration changes or troubleshooting): diff --git a/pyproject.toml b/pyproject.toml index 869ebe2..04b84b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,6 +35,7 @@ dependencies = [ "numpy>=1.24.0,!=2.4.0", "bleak-retry-connector>=3.5.0", "epaper-dithering==0.6.3", + "cryptography>=41.0.0", ] [project.optional-dependencies] diff --git a/src/opendisplay/__init__.py b/src/opendisplay/__init__.py index 6fd954c..5f2c8ab 100644 --- a/src/opendisplay/__init__.py +++ b/src/opendisplay/__init__.py @@ -9,6 +9,9 @@ from .device import OpenDisplayDevice, prepare_image from .discovery import discover_devices from .exceptions import ( + AuthenticationError, + AuthenticationFailedError, + AuthenticationRequiredError, BLEConnectionError, BLETimeoutError, ConfigParseError, @@ -34,6 +37,7 @@ LedConfig, ManufacturerData, PowerOption, + SecurityConfig, SensorData, SystemConfig, WifiConfig, @@ -64,6 +68,9 @@ "prepare_image", # Exceptions "OpenDisplayError", + "AuthenticationError", + "AuthenticationFailedError", + "AuthenticationRequiredError", "BLEConnectionError", "BLETimeoutError", "ProtocolError", @@ -82,6 +89,7 @@ "SensorData", "DataBus", "BinaryInputs", + "SecurityConfig", "WifiConfig", # Models - Other "DeviceCapabilities", diff --git a/src/opendisplay/crypto.py b/src/opendisplay/crypto.py new file mode 100644 index 0000000..95302dc --- /dev/null +++ b/src/opendisplay/crypto.py @@ -0,0 +1,129 @@ +"""AES-128-CCM/CMAC crypto helpers for OpenDisplay BLE encryption. + +Implements the application-layer encryption protocol used by firmware >= commit b04a22b. +All operations match the firmware's mbedtls/CryptoCell implementations exactly. +""" + +from __future__ import annotations + +import os + +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.ciphers.aead import AESCCM +from cryptography.hazmat.primitives.cmac import CMAC + +# Firmware placeholder device ID (hardcoded in firmware, never changes) +_DEVICE_ID = bytes([0x00, 0x00, 0x00, 0x01]) + +# CCM auth tag length used by firmware +_TAG_LEN = 12 + + +def aes_cmac(key: bytes, data: bytes) -> bytes: + """Compute AES-128-CMAC.""" + c = CMAC(algorithms.AES(key)) + c.update(data) + return c.finalize() + + +def aes_ecb_encrypt(key: bytes, block: bytes) -> bytes: + """Encrypt a single 16-byte block with AES-ECB (used in KDF only).""" + cipher = Cipher(algorithms.AES(key), modes.ECB()) # noqa: S305 — intentional single-block KDF step + enc = cipher.encryptor() + return enc.update(block) + enc.finalize() + + +def derive_session_key(master_key: bytes, client_nonce: bytes, server_nonce: bytes) -> bytes: + """Derive per-session AES-128 key from master key and nonces. + + Matches firmware deriveSessionKey(): + 1. CMAC(master_key, label || 0x00 || device_id || client_nonce || server_nonce || 0x00 0x80) + 2. AES-ECB(master_key, counter_be(1, 8 bytes) || intermediate[0:8]) + """ + label = b"OpenDisplay session" + cmac_input = label + b"\x00" + _DEVICE_ID + client_nonce + server_nonce + bytes([0x00, 0x80]) + intermediate = aes_cmac(master_key, cmac_input) + + # counter = 1, big-endian 8 bytes + counter_be = (1).to_bytes(8, "big") + final_input = counter_be + intermediate[:8] + return aes_ecb_encrypt(master_key, final_input) + + +def derive_session_id(session_key: bytes, client_nonce: bytes, server_nonce: bytes) -> bytes: + """Derive 8-byte session ID from session key and nonces. + + Matches firmware deriveSessionId(): + AES-CMAC(session_key, client_nonce || server_nonce)[0:8] + """ + return aes_cmac(session_key, client_nonce + server_nonce)[:8] + + +def compute_challenge_response(master_key: bytes, server_nonce: bytes, client_nonce: bytes) -> bytes: + """Compute CMAC challenge proof sent to device in step 2 of auth. + + CMAC(master_key, server_nonce || client_nonce || device_id) + """ + return aes_cmac(master_key, server_nonce + client_nonce + _DEVICE_ID) + + +def get_nonce(session_id: bytes, counter: int) -> bytes: + """Build the 16-byte full nonce: session_id(8) || counter_be(8).""" + return session_id + counter.to_bytes(8, "big") + + +def encrypt_command(session_key: bytes, session_id: bytes, counter: int, cmd: bytes, payload: bytes) -> bytes: + """Encrypt a command payload for sending to the device. + + Returns full BLE write bytes: [cmd:2][nonce_full:16][ciphertext][tag:12] + + The CCM nonce is nonce_full[3:16] (13 bytes). + AD = cmd bytes (2 bytes). + Plaintext = [len(payload):1][payload]. + """ + nonce_full = get_nonce(session_id, counter) + ccm_nonce = nonce_full[3:] # 13 bytes + ad = cmd # 2-byte command code as AAD + plaintext = bytes([len(payload)]) + payload + + aesccm = AESCCM(session_key, tag_length=_TAG_LEN) + ciphertext_and_tag = aesccm.encrypt(ccm_nonce, plaintext, ad) + ciphertext = ciphertext_and_tag[:-_TAG_LEN] + tag = ciphertext_and_tag[-_TAG_LEN:] + + return cmd + nonce_full + ciphertext + tag + + +def decrypt_response(session_key: bytes, raw: bytes) -> tuple[int, bytes]: + """Decrypt an encrypted response notification from the device. + + Parses [cmd:2][nonce_full:16][ciphertext][tag:12]. + Returns (cmd_code, plaintext_payload). + + Raises: + ValueError: If data is too short or tag verification fails. + """ + min_len = 2 + 16 + 1 + _TAG_LEN # cmd + nonce + 1-byte payload + tag + if len(raw) < min_len: + raise ValueError(f"Encrypted response too short: {len(raw)} bytes") + + cmd_code = int.from_bytes(raw[:2], "big") + nonce_full = raw[2:18] + ciphertext = raw[18:-_TAG_LEN] + tag = raw[-_TAG_LEN:] + ccm_nonce = nonce_full[3:] # 13 bytes + ad = raw[:2] # cmd bytes as AAD + + aesccm = AESCCM(session_key, tag_length=_TAG_LEN) + # cryptography library expects ciphertext+tag concatenated + decrypted = aesccm.decrypt(ccm_nonce, ciphertext + tag, ad) + + # First byte is payload length + payload_len = decrypted[0] + payload = decrypted[1 : 1 + payload_len] + return cmd_code, payload + + +def generate_client_nonce() -> bytes: + """Generate 16 cryptographically random bytes for the client nonce.""" + return os.urandom(16) diff --git a/src/opendisplay/device.py b/src/opendisplay/device.py index 9ac01ab..4d1aaa1 100644 --- a/src/opendisplay/device.py +++ b/src/opendisplay/device.py @@ -8,6 +8,14 @@ from epaper_dithering import ColorScheme, DitherMode, dither_image from PIL import Image +from .crypto import ( + compute_challenge_response, + decrypt_response, + derive_session_id, + derive_session_key, + encrypt_command, + generate_client_nonce, +) from .display_palettes import PANELS_4GRAY, get_palette_for_display from .encoding import ( compress_image_data, @@ -15,7 +23,7 @@ encode_image, fit_image, ) -from .exceptions import BLETimeoutError, ProtocolError +from .exceptions import AuthenticationRequiredError, BLETimeoutError, ProtocolError from .models.capabilities import DeviceCapabilities from .models.config import GlobalConfig from .models.enums import BoardManufacturer, FitMode, RefreshMode, Rotation @@ -25,6 +33,8 @@ CHUNK_SIZE, MAX_COMPRESSED_SIZE, CommandCode, + build_authenticate_step1, + build_authenticate_step2, build_direct_write_data_command, build_direct_write_end_command, build_direct_write_start_compressed, @@ -39,7 +49,13 @@ serialize_config, validate_ack_response, ) -from .protocol.responses import check_response_type, strip_command_echo, unpack_command_code +from .protocol.responses import ( + check_response_type, + parse_authenticate_challenge, + parse_authenticate_success, + strip_command_echo, + unpack_command_code, +) from .transport import BLEConnection if TYPE_CHECKING: @@ -200,6 +216,7 @@ def __init__( max_attempts: int = 4, use_services_cache: bool = True, use_measured_palettes: bool = True, + encryption_key: bytes | None = None, ): """Initialize OpenDisplay device. @@ -214,16 +231,10 @@ def __init__( max_attempts: Maximum connection attempts for bleak-retry-connector (default: 4) use_services_cache: Enable GATT service caching for faster reconnections (default: True) use_measured_palettes: Use measured color palettes when available (default: True) + encryption_key: 16-byte AES-128 master key for encrypted devices (optional). Raises: ValueError: If neither or both mac_address and device_name provided - - Examples: - # Using MAC address (existing behavior) - device = OpenDisplayDevice(mac_address="AA:BB:CC:DD:EE:FF") - - # Using device name (new feature) - device = OpenDisplayDevice(device_name="OpenDisplay-A123") """ # Validation: exactly one of mac_address or device_name must be provided if mac_address and device_name: @@ -249,6 +260,12 @@ def __init__( self._capabilities = capabilities self._fw_version: FirmwareVersion | None = None + # Encryption session state (populated by authenticate()) + self._encryption_key = encryption_key + self._session_key: bytes | None = None + self._session_id: bytes | None = None + self._nonce_counter: int = 0 + async def __aenter__(self) -> OpenDisplayDevice: """Connect and optionally interrogate device.""" @@ -288,6 +305,10 @@ async def __aenter__(self) -> OpenDisplayDevice: await self._conn.connect() + # Authenticate before any other commands if key provided + if self._encryption_key is not None: + await self.authenticate(self._encryption_key) + # Auto-interrogate if no config or capabilities provided if self._config is None and self._capabilities is None: _LOGGER.info("No config provided, auto-interrogating device") @@ -316,6 +337,69 @@ def _conn(self) -> BLEConnection: raise RuntimeError("Device not connected") return self._connection + async def _write(self, data: bytes) -> None: + """Write a command, encrypting it if an active session exists.""" + if self._session_key is not None and self._session_id is not None: + cmd = data[:2] + payload = data[2:] + self._nonce_counter += 1 + encrypted = encrypt_command(self._session_key, self._session_id, self._nonce_counter, cmd, payload) + await self._conn.write_command(encrypted) + else: + await self._conn.write_command(data) + + async def _read(self, timeout: float) -> bytes: + """Read a response, decrypting it if an active session exists. + + Raises: + AuthenticationRequiredError: If device returns 0xFE (encryption required, no active session) + """ + raw = await self._conn.read_response(timeout=timeout) + if self._session_key is not None: + cmd_code, payload = decrypt_response(self._session_key, raw) + return cmd_code.to_bytes(2, "big") + payload + # Firmware returns [cmd_high, cmd_low, 0xFE] (3 bytes) when a command + # requires authentication but no session is active. + if len(raw) == 3 and raw[2] == 0xFE: + raise AuthenticationRequiredError( + "Device requires an encryption key — pass encryption_key=bytes.fromhex('...') to OpenDisplayDevice" + ) + return raw + + async def authenticate(self, key: bytes) -> None: + """Perform two-step challenge-response authentication with the device. + + After successful authentication, all subsequent commands and responses + are transparently encrypted/decrypted via _write() and _read(). + + Args: + key: 16-byte AES-128 master key + + Raises: + AuthenticationFailedError: If the device rejects the key or is rate-limited + InvalidResponseError: If device sends malformed response + """ + _LOGGER.debug("Authenticating with device %s", self.mac_address) + + # Step 1: Request server nonce + await self._conn.write_command(build_authenticate_step1()) + challenge_response = await self._conn.read_response(timeout=self.TIMEOUT_ACK) + server_nonce = parse_authenticate_challenge(challenge_response) + + # Step 2: Prove key knowledge, receive server proof + client_nonce = generate_client_nonce() + challenge = compute_challenge_response(key, server_nonce, client_nonce) + await self._conn.write_command(build_authenticate_step2(client_nonce, challenge)) + success_response = await self._conn.read_response(timeout=self.TIMEOUT_ACK) + parse_authenticate_success(success_response) # raises on wrong key / error + + # Derive session key and ID + self._session_key = derive_session_key(key, client_nonce, server_nonce) + self._session_id = derive_session_id(self._session_key, client_nonce, server_nonce) + self._nonce_counter = 0 + + _LOGGER.info("Authentication successful, session established") + def _ensure_capabilities(self) -> DeviceCapabilities: """Ensure device capabilities are available. @@ -426,10 +510,10 @@ async def interrogate(self) -> GlobalConfig: # Send read config command cmd = build_read_config_command() - await self._conn.write_command(cmd) + await self._write(cmd) # Read first chunk - response = await self._conn.read_response(timeout=self.TIMEOUT_FIRST_CHUNK) + response = await self._read(self.TIMEOUT_FIRST_CHUNK) chunk_data = strip_command_echo(response, CommandCode.READ_CONFIG) # Parse first chunk header @@ -440,7 +524,7 @@ async def interrogate(self) -> GlobalConfig: # Read remaining chunks while len(tlv_data) < total_length: - next_response = await self._conn.read_response(timeout=self.TIMEOUT_CHUNK) + next_response = await self._read(self.TIMEOUT_CHUNK) next_chunk_data = strip_command_echo(next_response, CommandCode.READ_CONFIG) # Skip chunk number field (2 bytes) and append data @@ -515,7 +599,7 @@ async def reboot(self) -> None: # Build and send reboot command cmd = build_reboot_command() - await self._conn.write_command(cmd) + await self._write(cmd) # Device will reset immediately - no ACK expected _LOGGER.info("Reboot command sent to %s - device will reset (connection will drop)", self.mac_address) @@ -558,10 +642,10 @@ async def activate_led( led_instance=led_instance, flash_config=flash_config, ) - await self._conn.write_command(cmd) + await self._write(cmd) response_timeout = self.TIMEOUT_REFRESH if timeout is None else timeout - response = await self._conn.read_response(timeout=response_timeout) + response = await self._read(response_timeout) # Firmware LED errors use 0xFF73 + error code payload. if len(response) >= 2 and unpack_command_code(response) == 0xFF73: @@ -587,20 +671,6 @@ async def write_config(self, config: GlobalConfig) -> None: ValueError: If config serialization fails or exceeds size limit BLEConnectionError: If write fails ProtocolError: If device returns error response - - Example: - async with OpenDisplayDevice(mac_address="AA:BB:CC:DD:EE:FF") as device: - # Read current config - config = device.config - - # Modify config - config.displays[0].rotation = 180 - - # Write back to device - await device.write_config(config) - - # Reboot to apply changes - await device.reboot() """ _LOGGER.debug("Writing config to device %s", self.mac_address) @@ -632,19 +702,19 @@ async def write_config(self, config: GlobalConfig) -> None: # Send first command _LOGGER.debug("Sending first config chunk (%d bytes)", len(first_cmd)) - await self._conn.write_command(first_cmd) + await self._write(first_cmd) # Wait for ACK - response = await self._conn.read_response(timeout=self.TIMEOUT_ACK) + response = await self._read(self.TIMEOUT_ACK) validate_ack_response(response, CommandCode.WRITE_CONFIG) # Send remaining chunks if needed for i, chunk_cmd in enumerate(chunk_cmds, start=1): _LOGGER.debug("Sending config chunk %d/%d (%d bytes)", i, len(chunk_cmds), len(chunk_cmd)) - await self._conn.write_command(chunk_cmd) + await self._write(chunk_cmd) # Wait for ACK after each chunk - response = await self._conn.read_response(timeout=self.TIMEOUT_ACK) + response = await self._read(self.TIMEOUT_ACK) validate_ack_response(response, CommandCode.WRITE_CONFIG_CHUNK) _LOGGER.info("Config written successfully to %s", self.mac_address) @@ -721,22 +791,7 @@ def _prepare_image( fit: FitMode = FitMode.STRETCH, rotate: Rotation = Rotation.ROTATE_0, ) -> tuple[bytes, bytes | None, Image.Image]: - """Prepare image for upload. - - Handles optional source rotation, fitting, dithering, encoding, - and optional compression. - - Args: - image: PIL Image to prepare - dither_mode: Dithering algorithm to use - compress: Whether to compress the image data - tone_compression: Dynamic range compression ("auto", or 0.0-1.0) - fit: How to map the image to display dimensions - rotate: Source image rotation enum (0/90/180/270) - - Returns: - Tuple of (uncompressed_data, compressed_data or None, processed_image) - """ + """Prepare image for upload. Internal wrapper for the module-level prepare_image().""" panel_ic_type = self._config.displays[0].panel_ic_type if self._config and self._config.displays else None return prepare_image( image, @@ -780,15 +835,8 @@ async def upload_image( refresh_mode: Display refresh mode (default: FULL) dither_mode: Dithering algorithm (default: BURKES) compress: Enable zlib compression (default: True) - tone_compression: Dynamic range compression (default: "auto"). - "auto" = analyze image and fit to display range. - 0.0 = disabled, 0.0-1.0 = fixed linear compression. - Only applies to measured palettes. + tone_compression: Dynamic range compression ("auto" or 0.0–1.0, default: "auto"). fit: How to map the image to display dimensions (default: CONTAIN). - STRETCH - distort to fill exact dimensions - CONTAIN - scale to fit, pad with white - COVER - scale to cover, crop overflow - CROP - center-crop at native resolution, pad if smaller rotate: Source image rotation enum, applied before fit/encoding. Raises: @@ -913,10 +961,10 @@ async def _execute_upload( start_cmd = build_direct_write_start_uncompressed() remaining_compressed = None - await self._conn.write_command(start_cmd) + await self._write(start_cmd) # 2. Wait for START ACK (identical for both protocols) - response = await self._conn.read_response(timeout=self.TIMEOUT_ACK) + response = await self._read(self.TIMEOUT_ACK) validate_ack_response(response, CommandCode.DIRECT_WRITE_START) # 3. Send data chunks @@ -932,10 +980,10 @@ async def _execute_upload( # 4. Send END command if needed (identical for both protocols) if not auto_completed: end_cmd = build_direct_write_end_command(refresh_mode.value) - await self._conn.write_command(end_cmd) + await self._write(end_cmd) # Wait for END ACK (90s timeout for display refresh) - response = await self._conn.read_response(timeout=self.TIMEOUT_REFRESH) + response = await self._read(self.TIMEOUT_REFRESH) validate_ack_response(response, CommandCode.DIRECT_WRITE_END) async def _send_data_chunks(self, image_data: bytes) -> bool: @@ -968,14 +1016,14 @@ async def _send_data_chunks(self, image_data: bytes) -> bool: # Send DATA command data_cmd = build_direct_write_data_command(chunk_data) - await self._conn.write_command(data_cmd) + await self._write(data_cmd) bytes_sent += len(chunk_data) chunks_sent += 1 # Wait for response after every chunk (PIPELINE_CHUNKS=1) try: - response = await self._conn.read_response(timeout=self.TIMEOUT_ACK) + response = await self._read(self.TIMEOUT_ACK) except BLETimeoutError: # Timeout on response - firmware might be doing display refresh # This happens when the chunk completes directWriteTotalBytes @@ -986,7 +1034,7 @@ async def _send_data_chunks(self, image_data: bytes) -> bool: ) # Wait up to 90 seconds for the END response - response = await self._conn.read_response(timeout=self.TIMEOUT_REFRESH) + response = await self._read(self.TIMEOUT_REFRESH) # Check what response we got (firmware can send 0x0072 on ANY chunk, not just last!) command, _ = check_response_type(response) diff --git a/src/opendisplay/exceptions.py b/src/opendisplay/exceptions.py index c9cbb51..bf40d1b 100644 --- a/src/opendisplay/exceptions.py +++ b/src/opendisplay/exceptions.py @@ -39,6 +39,32 @@ class InvalidResponseError(ProtocolError): pass +class AuthenticationError(ProtocolError): + """Base class for authentication errors.""" + + pass + + +class AuthenticationFailedError(AuthenticationError): + """Authentication was attempted but rejected by the device. + + Raised when the device returns a bad-key or rate-limit status during the + challenge-response handshake. The configured key is likely wrong. + """ + + pass + + +class AuthenticationRequiredError(AuthenticationError): + """Command rejected because no authenticated session exists. + + Raised when the device returns 0xFE — encryption is enabled but no session + has been established. Either no key was provided or the session expired. + """ + + pass + + class ImageEncodingError(OpenDisplayError): """Failed to encode image.""" diff --git a/src/opendisplay/models/config.py b/src/opendisplay/models/config.py index 86cbea5..1a22a4c 100644 --- a/src/opendisplay/models/config.py +++ b/src/opendisplay/models/config.py @@ -612,6 +612,47 @@ def to_bytes(self) -> bytes: ) +@dataclass +class SecurityConfig: + """Security and encryption configuration (TLV packet type 0x27). + + Size: 64 bytes (firmware fixed layout, excluding packet header) + """ + + encryption_enabled: int # uint8: 0=disabled, 1=enabled + encryption_key: bytes # 16-byte AES-128 master key (all-zero means disabled) + session_timeout_seconds: int # uint16 LE: 0 = no timeout + flags: int # uint8 bitfield (see flag properties below) + reset_pin: int # uint8: pin number for hardware reset + reserved: bytes # 43 bytes + + SIZE: ClassVar[int] = 64 + + @property + def encryption_enabled_flag(self) -> bool: + """True if encryption is both enabled and key is non-zero.""" + return self.encryption_enabled != 0 and any(self.encryption_key) + + @property + def rewrite_allowed(self) -> bool: + """Bit 0: allow unauthenticated WRITE_CONFIG even when encryption is on.""" + return bool(self.flags & 0x01) + + @classmethod + def from_bytes(cls, data: bytes) -> SecurityConfig: + """Parse from TLV packet data.""" + if len(data) < cls.SIZE: + raise ValueError(f"Invalid SecurityConfig size: {len(data)} < {cls.SIZE}") + return cls( + encryption_enabled=data[0], + encryption_key=bytes(data[1:17]), + session_timeout_seconds=int.from_bytes(data[17:19], "little"), + flags=data[19], + reset_pin=data[20], + reserved=bytes(data[21:64]), + ) + + @dataclass class GlobalConfig: """Complete device configuration parsed from TLV data. @@ -631,6 +672,7 @@ class GlobalConfig: data_buses: list[DataBus] = field(default_factory=list) binary_inputs: list[BinaryInputs] = field(default_factory=list) wifi_config: WifiConfig | None = None + security_config: SecurityConfig | None = None # Metadata version: int = 0 diff --git a/src/opendisplay/protocol/__init__.py b/src/opendisplay/protocol/__init__.py index f94b644..6b3152d 100644 --- a/src/opendisplay/protocol/__init__.py +++ b/src/opendisplay/protocol/__init__.py @@ -7,6 +7,8 @@ PIPELINE_CHUNKS, SERVICE_UUID, CommandCode, + build_authenticate_step1, + build_authenticate_step2, build_direct_write_data_command, build_direct_write_end_command, build_direct_write_start_compressed, @@ -29,6 +31,8 @@ __all__ = [ "CommandCode", + "build_authenticate_step1", + "build_authenticate_step2", "SERVICE_UUID", "MANUFACTURER_ID", "CHUNK_SIZE", diff --git a/src/opendisplay/protocol/commands.py b/src/opendisplay/protocol/commands.py index e8a951f..dfbe5d3 100644 --- a/src/opendisplay/protocol/commands.py +++ b/src/opendisplay/protocol/commands.py @@ -19,6 +19,9 @@ class CommandCode(IntEnum): READ_FW_VERSION = 0x0043 # Read firmware version REBOOT = 0x000F # Reboot device + # Authentication command (firmware with encryption support) + AUTHENTICATE = 0x0050 # Two-step challenge-response authentication + # Image upload commands (direct write mode) DIRECT_WRITE_START = 0x0070 # Start direct write transfer DIRECT_WRITE_DATA = 0x0071 # Send image data chunk @@ -257,3 +260,32 @@ def build_write_config_command(config_data: bytes) -> tuple[bytes, list[bytes]]: remaining_data = remaining_data[CONFIG_CHUNK_SIZE:] return first_chunk, chunks + + +def build_authenticate_step1() -> bytes: + """Build step-1 auth command: request a server nonce. + + Returns: + Command bytes: [0x0050][0x00] + """ + cmd = CommandCode.AUTHENTICATE.to_bytes(2, byteorder="big") + return cmd + b"\x00" + + +def build_authenticate_step2(client_nonce: bytes, challenge_response: bytes) -> bytes: + """Build step-2 auth command: prove knowledge of the master key. + + Args: + client_nonce: 16 random bytes generated by the client + challenge_response: AES-CMAC(master_key, server_nonce || client_nonce || device_id) + + Returns: + Command bytes: [0x0050][client_nonce:16][challenge_response:16] + """ + if len(client_nonce) != 16: + raise ValueError(f"client_nonce must be 16 bytes, got {len(client_nonce)}") + if len(challenge_response) != 16: + raise ValueError(f"challenge_response must be 16 bytes, got {len(challenge_response)}") + + cmd = CommandCode.AUTHENTICATE.to_bytes(2, byteorder="big") + return cmd + client_nonce + challenge_response diff --git a/src/opendisplay/protocol/config_parser.py b/src/opendisplay/protocol/config_parser.py index f32f9c6..9acc772 100644 --- a/src/opendisplay/protocol/config_parser.py +++ b/src/opendisplay/protocol/config_parser.py @@ -14,6 +14,7 @@ LedConfig, ManufacturerData, PowerOption, + SecurityConfig, SensorData, SystemConfig, WifiConfig, @@ -32,9 +33,11 @@ PACKET_TYPE_DATABUS = 0x24 PACKET_TYPE_BINARY_INPUT = 0x25 PACKET_TYPE_WIFI_CONFIG = 0x26 +PACKET_TYPE_SECURITY_CONFIG = 0x27 WIFI_CONFIG_SIZE = 160 WIFI_CONFIG_LEGACY_SIZE = 65 +SECURITY_CONFIG_SIZE = 64 def parse_config_response(raw_data: bytes) -> GlobalConfig: @@ -153,6 +156,7 @@ def parse_tlv_config(data: bytes, version: int = 1) -> GlobalConfig: data_buses = [] binary_inputs = [] wifi_config = None + security_config = None for (packet_type, packet_number), packet_data in packets.items(): if packet_type == PACKET_TYPE_SYSTEM: @@ -173,16 +177,19 @@ def parse_tlv_config(data: bytes, version: int = 1) -> GlobalConfig: binary_inputs.append(_parse_binary_inputs(packet_data)) elif packet_type == PACKET_TYPE_WIFI_CONFIG: wifi_config = _parse_wifi_config(packet_data) - - missing_required = [] - if system is None: - missing_required.append("system") - if manufacturer is None: - missing_required.append("manufacturer") - if power is None: - missing_required.append("power") - if not displays: - missing_required.append("display") + elif packet_type == PACKET_TYPE_SECURITY_CONFIG: + security_config = SecurityConfig.from_bytes(packet_data) + + missing_required = [ + name + for name, present in [ + ("system", system), + ("manufacturer", manufacturer), + ("power", power), + ("display", displays), + ] + if not present + ] if missing_required: raise ConfigParseError("Missing required packet(s): " + ", ".join(missing_required)) @@ -200,6 +207,7 @@ def parse_tlv_config(data: bytes, version: int = 1) -> GlobalConfig: data_buses=data_buses, binary_inputs=binary_inputs, wifi_config=wifi_config, + security_config=security_config, version=version, # From firmware wrapper minor_version=1, # Not stored in device (only single version byte exists) loaded=True, @@ -225,6 +233,7 @@ def _get_packet_size(packet_type: int) -> int | None: PACKET_TYPE_DATABUS: 30, # Fixed: was 28 PACKET_TYPE_BINARY_INPUT: 30, # Fixed: was 29 PACKET_TYPE_WIFI_CONFIG: 160, + PACKET_TYPE_SECURITY_CONFIG: SECURITY_CONFIG_SIZE, } return sizes.get(packet_type) diff --git a/src/opendisplay/protocol/responses.py b/src/opendisplay/protocol/responses.py index 99898a9..37e598c 100644 --- a/src/opendisplay/protocol/responses.py +++ b/src/opendisplay/protocol/responses.py @@ -4,10 +4,16 @@ import struct -from ..exceptions import InvalidResponseError +from ..exceptions import AuthenticationFailedError, AuthenticationRequiredError, InvalidResponseError from ..models.firmware import FirmwareVersion from .commands import RESPONSE_HIGH_BIT_FLAG, CommandCode +# Status bytes returned in AUTHENTICATE responses +_AUTH_STATUS_OK = 0x00 +_AUTH_STATUS_WRONG_KEY = 0x01 +_AUTH_STATUS_ENCRYPTION_NOT_CONFIGURED = 0x03 +_AUTH_STATUS_RATE_LIMITED = 0x04 + def unpack_command_code(data: bytes, offset: int = 0) -> int: """Extract 2-byte big-endian command code from response data. @@ -83,6 +89,84 @@ def validate_ack_response(data: bytes, expected_command: int) -> None: raise InvalidResponseError(f"ACK mismatch: expected 0x{expected_command:04x}, got 0x{response_code:04x}") +def parse_authenticate_challenge(data: bytes) -> bytes: + """Parse step-1 AUTHENTICATE response and return the server nonce. + + Expected format: [0x0050][status:1][server_nonce:16] + + Args: + data: Raw BLE notification from device + + Returns: + 16-byte server nonce + + Raises: + AuthenticationError: If device returns an error status + InvalidResponseError: If response format is invalid + """ + if len(data) < 2: + raise InvalidResponseError(f"Auth challenge response too short: {len(data)} bytes") + + echo = unpack_command_code(data) + if echo not in (0x0050, 0x0050 | RESPONSE_HIGH_BIT_FLAG): + raise InvalidResponseError(f"Auth challenge echo mismatch: got 0x{echo:04x}") + + if len(data) < 3: + raise InvalidResponseError("Auth challenge response missing status byte") + + status = data[2] + if status == _AUTH_STATUS_ENCRYPTION_NOT_CONFIGURED: + raise AuthenticationRequiredError("Device does not have encryption configured") + if status == _AUTH_STATUS_RATE_LIMITED: + raise AuthenticationFailedError("Authentication rate limit exceeded — wait before retrying") + if status != _AUTH_STATUS_OK: + raise AuthenticationFailedError(f"Auth challenge failed with status 0x{status:02x}") + + if len(data) < 19: # 2 echo + 1 status + 16 nonce + raise InvalidResponseError(f"Auth challenge response too short for nonce: {len(data)} bytes (need 19)") + + return data[3:19] + + +def parse_authenticate_success(data: bytes) -> bytes: + """Parse step-2 AUTHENTICATE response and return the server proof. + + Expected format: [0x0050][status:1][server_response:16] + + Args: + data: Raw BLE notification from device + + Returns: + 16-byte server response (proof that device also knows the key) + + Raises: + AuthenticationError: If device rejects the challenge response + InvalidResponseError: If response format is invalid + """ + if len(data) < 2: + raise InvalidResponseError(f"Auth success response too short: {len(data)} bytes") + + echo = unpack_command_code(data) + if echo not in (0x0050, 0x0050 | RESPONSE_HIGH_BIT_FLAG): + raise InvalidResponseError(f"Auth success echo mismatch: got 0x{echo:04x}") + + if len(data) < 3: + raise InvalidResponseError("Auth success response missing status byte") + + status = data[2] + if status == _AUTH_STATUS_WRONG_KEY: + raise AuthenticationFailedError("Authentication failed: wrong encryption key") + if status == _AUTH_STATUS_RATE_LIMITED: + raise AuthenticationFailedError("Authentication rate limit exceeded — wait before retrying") + if status != _AUTH_STATUS_OK: + raise AuthenticationFailedError(f"Authentication failed with status 0x{status:02x}") + + if len(data) < 19: # 2 echo + 1 status + 16 server_response + raise InvalidResponseError(f"Auth success response too short for server proof: {len(data)} bytes (need 19)") + + return data[3:19] + + def parse_firmware_version(data: bytes) -> FirmwareVersion: """Parse firmware version response. From 3214ead07f4c4f48ede373764fb84f84993541c8 Mon Sep 17 00:00:00 2001 From: gabriel Date: Wed, 25 Mar 2026 00:10:11 +0100 Subject: [PATCH 2/3] feat: support reauthentication --- src/opendisplay/device.py | 57 ++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/src/opendisplay/device.py b/src/opendisplay/device.py index 4d1aaa1..d7e6fb8 100644 --- a/src/opendisplay/device.py +++ b/src/opendisplay/device.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import time from typing import TYPE_CHECKING from epaper_dithering import ColorScheme, DitherMode, dither_image @@ -265,6 +266,7 @@ def __init__( self._session_key: bytes | None = None self._session_id: bytes | None = None self._nonce_counter: int = 0 + self._auth_time: float | None = None # monotonic timestamp of last successful auth async def __aenter__(self) -> OpenDisplayDevice: """Connect and optionally interrogate device.""" @@ -340,6 +342,7 @@ def _conn(self) -> BLEConnection: async def _write(self, data: bytes) -> None: """Write a command, encrypting it if an active session exists.""" if self._session_key is not None and self._session_id is not None: + await self._reauthenticate_if_needed() cmd = data[:2] payload = data[2:] self._nonce_counter += 1 @@ -348,6 +351,24 @@ async def _write(self, data: bytes) -> None: else: await self._conn.write_command(data) + async def _reauthenticate_if_needed(self) -> None: + """Re-authenticate proactively at 90% of session_timeout_seconds.""" + if self._encryption_key is None or self._auth_time is None: + return + if self._config is None or self._config.security_config is None: + return + timeout = self._config.security_config.session_timeout_seconds + if timeout == 0: + return + elapsed = time.monotonic() - self._auth_time + if elapsed >= timeout * 0.9: + _LOGGER.info( + "Session approaching timeout (%.0fs / %ds), re-authenticating", + elapsed, + timeout, + ) + await self.authenticate(self._encryption_key) + async def _read(self, timeout: float) -> bytes: """Read a response, decrypting it if an active session exists. @@ -397,6 +418,7 @@ async def authenticate(self, key: bytes) -> None: self._session_key = derive_session_key(key, client_nonce, server_nonce) self._session_id = derive_session_id(self._session_key, client_nonce, server_nonce) self._nonce_counter = 0 + self._auth_time = time.monotonic() _LOGGER.info("Authentication successful, session established") @@ -664,6 +686,12 @@ async def write_config(self, config: GlobalConfig) -> None: to the device using the WRITE_CONFIG (0x0041) command with automatic chunking for large configs. + On encrypted devices this command is sent encrypted (normal flow). + If the device has the ``rewrite_allowed`` flag set in its SecurityConfig, + the firmware also accepts unencrypted WRITE_CONFIG — useful for + provisioning without knowing the current key (connect with + ``config=`` or ``capabilities=`` to skip interrogation). + Args: config: GlobalConfig to write to device @@ -720,20 +748,10 @@ async def write_config(self, config: GlobalConfig) -> None: _LOGGER.info("Config written successfully to %s", self.mac_address) def export_config_json(self, file_path: str) -> None: - """Export device config to JSON file. - - Exports the configuration in a format compatible with the - Open Display Config Builder web tool. - - Args: - file_path: Path to save JSON file + """Export device config to JSON file (Open Display Config Builder format). Raises: ValueError: If no config loaded - - Example: - async with OpenDisplayDevice(mac_address="AA:BB:CC:DD:EE:FF") as device: - device.export_config_json("my_config.json") """ if not self._config: raise ValueError("No config loaded - interrogate device first") @@ -751,26 +769,11 @@ def export_config_json(self, file_path: str) -> None: @staticmethod def import_config_json(file_path: str) -> GlobalConfig: - """Import config from JSON file. - - Imports configuration from a JSON file created by the - Open Display Config Builder web tool or exported by - export_config_json(). - - Args: - file_path: Path to JSON file - - Returns: - GlobalConfig instance + """Import config from JSON file (Open Display Config Builder format). Raises: FileNotFoundError: If file not found ValueError: If JSON invalid - - Example: - config = OpenDisplayDevice.import_config_json("my_config.json") - async with OpenDisplayDevice(mac_address="AA:BB:CC:DD:EE:FF") as device: - await device.write_config(config) """ import json From 9a3eb87d86c0e845ee3e9bbd2a374d26dd9d03c4 Mon Sep 17 00:00:00 2001 From: gabriel Date: Wed, 25 Mar 2026 00:15:02 +0100 Subject: [PATCH 3/3] tests: add encryption tests --- tests/unit/test_auth_protocol.py | 150 ++++++++++++++++++ tests/unit/test_crypto.py | 259 +++++++++++++++++++++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 tests/unit/test_auth_protocol.py create mode 100644 tests/unit/test_crypto.py diff --git a/tests/unit/test_auth_protocol.py b/tests/unit/test_auth_protocol.py new file mode 100644 index 0000000..e2caaee --- /dev/null +++ b/tests/unit/test_auth_protocol.py @@ -0,0 +1,150 @@ +"""Tests for authenticate command builders and response parsers.""" + +import pytest + +from opendisplay.exceptions import AuthenticationFailedError, AuthenticationRequiredError, InvalidResponseError +from opendisplay.protocol.commands import build_authenticate_step1, build_authenticate_step2 +from opendisplay.protocol.responses import parse_authenticate_challenge, parse_authenticate_success + +_SERVER_NONCE = bytes(range(16)) +_CLIENT_NONCE = bytes(range(16, 32)) +_CHALLENGE = bytes(range(32, 48)) +_SERVER_PROOF = bytes(range(48, 64)) + + +class TestBuildAuthenticateStep1: + """Tests for build_authenticate_step1.""" + + def test_returns_3_bytes(self): + """Step-1 command is exactly 3 bytes.""" + cmd = build_authenticate_step1() + assert len(cmd) == 3 + + def test_exact_bytes(self): + """Step-1 command is [0x00, 0x50, 0x00].""" + assert build_authenticate_step1() == b"\x00\x50\x00" + + +class TestBuildAuthenticateStep2: + """Tests for build_authenticate_step2.""" + + def test_returns_34_bytes(self): + """Step-2 command is cmd(2) + client_nonce(16) + challenge(16) = 34 bytes.""" + cmd = build_authenticate_step2(_CLIENT_NONCE, _CHALLENGE) + assert len(cmd) == 34 + + def test_command_prefix(self): + """First two bytes are the AUTHENTICATE command code.""" + cmd = build_authenticate_step2(_CLIENT_NONCE, _CHALLENGE) + assert cmd[:2] == b"\x00\x50" + + def test_nonce_and_challenge_embedded(self): + """client_nonce and challenge_response appear verbatim in output.""" + cmd = build_authenticate_step2(_CLIENT_NONCE, _CHALLENGE) + assert cmd[2:18] == _CLIENT_NONCE + assert cmd[18:34] == _CHALLENGE + + def test_wrong_nonce_length_raises(self): + """Raises ValueError when client_nonce is not 16 bytes.""" + with pytest.raises(ValueError, match="client_nonce"): + build_authenticate_step2(b"\x00" * 15, _CHALLENGE) + + def test_wrong_challenge_length_raises(self): + """Raises ValueError when challenge_response is not 16 bytes.""" + with pytest.raises(ValueError, match="challenge_response"): + build_authenticate_step2(_CLIENT_NONCE, b"\x00" * 8) + + +class TestParseAuthenticateChallenge: + """Tests for parse_authenticate_challenge.""" + + def _valid(self, nonce: bytes = _SERVER_NONCE) -> bytes: + return b"\x00\x50\x00" + nonce + + def test_valid_returns_nonce(self): + """Valid response returns 16-byte server nonce.""" + nonce = parse_authenticate_challenge(self._valid()) + assert nonce == _SERVER_NONCE + + def test_valid_with_high_bit(self): + """High-bit echo (ACK flag) is also accepted.""" + data = b"\x80\x50\x00" + _SERVER_NONCE + nonce = parse_authenticate_challenge(data) + assert nonce == _SERVER_NONCE + + def test_nonce_is_16_bytes(self): + """Returned nonce is always 16 bytes.""" + nonce = parse_authenticate_challenge(self._valid()) + assert len(nonce) == 16 + + def test_status_wrong_key_raises_auth_failed(self): + """Status 0x01 (wrong key) raises AuthenticationFailedError.""" + data = b"\x00\x50\x01" + _SERVER_NONCE + with pytest.raises(AuthenticationFailedError): + parse_authenticate_challenge(data) + + def test_status_not_configured_raises_auth_required(self): + """Status 0x03 (not configured) raises AuthenticationRequiredError.""" + data = b"\x00\x50\x03" + _SERVER_NONCE + with pytest.raises(AuthenticationRequiredError): + parse_authenticate_challenge(data) + + def test_status_rate_limited_raises_auth_failed(self): + """Status 0x04 (rate limited) raises AuthenticationFailedError.""" + data = b"\x00\x50\x04" + _SERVER_NONCE + with pytest.raises(AuthenticationFailedError): + parse_authenticate_challenge(data) + + def test_too_short_raises_invalid(self): + """Response shorter than 19 bytes raises InvalidResponseError.""" + with pytest.raises(InvalidResponseError): + parse_authenticate_challenge(b"\x00\x50\x00" + b"\x00" * 10) + + def test_empty_raises_invalid(self): + """Empty response raises InvalidResponseError.""" + with pytest.raises(InvalidResponseError): + parse_authenticate_challenge(b"") + + +class TestParseAuthenticateSuccess: + """Tests for parse_authenticate_success.""" + + def _valid(self, proof: bytes = _SERVER_PROOF) -> bytes: + return b"\x00\x50\x00" + proof + + def test_valid_returns_proof(self): + """Valid response returns 16-byte server proof.""" + proof = parse_authenticate_success(self._valid()) + assert proof == _SERVER_PROOF + + def test_valid_with_high_bit(self): + """High-bit echo is accepted.""" + data = b"\x80\x50\x00" + _SERVER_PROOF + proof = parse_authenticate_success(data) + assert proof == _SERVER_PROOF + + def test_proof_is_16_bytes(self): + """Returned proof is 16 bytes.""" + assert len(parse_authenticate_success(self._valid())) == 16 + + def test_status_wrong_key_raises_auth_failed(self): + """Status 0x01 raises AuthenticationFailedError with 'wrong' in message.""" + data = b"\x00\x50\x01" + _SERVER_PROOF + with pytest.raises(AuthenticationFailedError, match="wrong"): + parse_authenticate_success(data) + + def test_status_rate_limited_raises_auth_failed(self): + """Status 0x04 raises AuthenticationFailedError.""" + data = b"\x00\x50\x04" + _SERVER_PROOF + with pytest.raises(AuthenticationFailedError): + parse_authenticate_success(data) + + def test_too_short_raises_invalid(self): + """Response shorter than 19 bytes raises InvalidResponseError.""" + with pytest.raises(InvalidResponseError): + parse_authenticate_success(b"\x00\x50\x00" + b"\x00" * 5) + + def test_empty_raises_invalid(self): + """Empty response raises InvalidResponseError.""" + with pytest.raises(InvalidResponseError): + parse_authenticate_success(b"") diff --git a/tests/unit/test_crypto.py b/tests/unit/test_crypto.py new file mode 100644 index 0000000..55a2708 --- /dev/null +++ b/tests/unit/test_crypto.py @@ -0,0 +1,259 @@ +"""Tests for opendisplay.crypto.""" + +import pytest + +from opendisplay.crypto import ( + aes_cmac, + aes_ecb_encrypt, + compute_challenge_response, + decrypt_response, + derive_session_id, + derive_session_key, + encrypt_command, + generate_client_nonce, + get_nonce, +) + +_RFC4493_KEY = bytes.fromhex("2b7e151628aed2a6abf7158809cf4f3c") + + +class TestAesCmac: + """RFC 4493 test vectors for AES-CMAC.""" + + def test_empty_message(self): + """RFC 4493 example 1: key=2b7e..., msg=empty → bb1d...""" + result = aes_cmac(_RFC4493_KEY, b"") + assert result.hex() == "bb1d6929e95937287fa37d129b756746" + + def test_sixteen_byte_message(self): + """RFC 4493 example 2: key=2b7e..., msg=6bc1...172a → 070a...""" + msg = bytes.fromhex("6bc1bee22e409f96e93d7e117393172a") + result = aes_cmac(_RFC4493_KEY, msg) + assert result.hex() == "070a16b46b4d4144f79bdd9dd04a287c" + + def test_returns_16_bytes(self): + """Output is always 16 bytes.""" + result = aes_cmac(bytes(16), b"some data") + assert len(result) == 16 + + +class TestAesEcbEncrypt: + """NIST AES-ECB known-answer test.""" + + def test_known_vector(self): + """NIST AESAVS vector: key=2b7e..., pt=6bc1... → 3ad7...""" + key = bytes.fromhex("2b7e151628aed2a6abf7158809cf4f3c") + plaintext = bytes.fromhex("6bc1bee22e409f96e93d7e117393172a") + result = aes_ecb_encrypt(key, plaintext) + assert result.hex() == "3ad77bb40d7a3660a89ecaf32466ef97" + + def test_returns_16_bytes(self): + """Output is always 16 bytes for a 16-byte block.""" + result = aes_ecb_encrypt(bytes(16), bytes(16)) + assert len(result) == 16 + + +class TestDeriveSessionKey: + """Tests for derive_session_key.""" + + def test_returns_16_bytes(self): + """Session key is 16 bytes.""" + key = derive_session_key(bytes(16), bytes(16), bytes(16)) + assert len(key) == 16 + + def test_deterministic(self): + """Same inputs produce the same key.""" + master = bytes(range(16)) + cn = bytes(range(16, 32)) + sn = bytes(range(32, 48)) + assert derive_session_key(master, cn, sn) == derive_session_key(master, cn, sn) + + def test_changes_with_master_key(self): + """Different master key → different session key.""" + cn = bytes(16) + sn = bytes(16) + k1 = derive_session_key(bytes(16), cn, sn) + k2 = derive_session_key(bytes([1] * 16), cn, sn) + assert k1 != k2 + + def test_changes_with_client_nonce(self): + """Different client nonce → different session key.""" + master = bytes(16) + sn = bytes(16) + k1 = derive_session_key(master, bytes(16), sn) + k2 = derive_session_key(master, bytes([1] * 16), sn) + assert k1 != k2 + + def test_changes_with_server_nonce(self): + """Different server nonce → different session key.""" + master = bytes(16) + cn = bytes(16) + k1 = derive_session_key(master, cn, bytes(16)) + k2 = derive_session_key(master, cn, bytes([1] * 16)) + assert k1 != k2 + + +class TestDeriveSessionId: + """Tests for derive_session_id.""" + + def test_returns_8_bytes(self): + """Session ID is 8 bytes.""" + sid = derive_session_id(bytes(16), bytes(16), bytes(16)) + assert len(sid) == 8 + + def test_deterministic(self): + """Same inputs → same session ID.""" + sk = bytes(range(16)) + cn = bytes(range(16, 32)) + sn = bytes(range(32, 48)) + assert derive_session_id(sk, cn, sn) == derive_session_id(sk, cn, sn) + + def test_changes_with_inputs(self): + """Different session key → different session ID.""" + cn = bytes(16) + sn = bytes(16) + sid1 = derive_session_id(bytes(16), cn, sn) + sid2 = derive_session_id(bytes([0xFF] * 16), cn, sn) + assert sid1 != sid2 + + +class TestComputeChallengeResponse: + """Tests for compute_challenge_response.""" + + def test_returns_16_bytes(self): + """Challenge response is 16 bytes.""" + result = compute_challenge_response(bytes(16), bytes(16), bytes(16)) + assert len(result) == 16 + + def test_deterministic(self): + """Same inputs → same response.""" + master = bytes(range(16)) + sn = bytes(range(16, 32)) + cn = bytes(range(32, 48)) + r1 = compute_challenge_response(master, sn, cn) + r2 = compute_challenge_response(master, sn, cn) + assert r1 == r2 + + def test_changes_with_key(self): + """Different master key → different response.""" + sn = bytes(16) + cn = bytes(16) + r1 = compute_challenge_response(bytes(16), sn, cn) + r2 = compute_challenge_response(bytes([0xAB] * 16), sn, cn) + assert r1 != r2 + + def test_changes_with_nonces(self): + """Different nonces → different response.""" + master = bytes(16) + r1 = compute_challenge_response(master, bytes(16), bytes(16)) + r2 = compute_challenge_response(master, bytes([1] * 16), bytes(16)) + assert r1 != r2 + + def test_matches_direct_cmac(self): + """compute_challenge_response is CMAC(master, server_nonce || client_nonce || device_id).""" + master = bytes(range(16)) + sn = bytes(range(16, 32)) + cn = bytes(range(32, 48)) + device_id = bytes([0x00, 0x00, 0x00, 0x01]) + + result = compute_challenge_response(master, sn, cn) + expected = aes_cmac(master, sn + cn + device_id) + assert result == expected + + +class TestEncryptDecryptCommand: + """Round-trip and format tests for encrypt_command / decrypt_response.""" + + def _make_session(self): + session_key = bytes(range(16)) + session_id = bytes(range(8)) + return session_key, session_id + + def test_round_trip(self): + """Encrypt then decrypt recovers the original payload.""" + session_key, session_id = self._make_session() + cmd = bytes([0x00, 0x50]) + payload = b"hello world" + counter = 1 + + encrypted = encrypt_command(session_key, session_id, counter, cmd, payload) + cmd_code, recovered = decrypt_response(session_key, encrypted) + + assert recovered == payload + assert cmd_code == 0x0050 + + def test_output_format(self): + """Encrypted output is cmd(2) + nonce_full(16) + ciphertext + tag(12).""" + session_key, session_id = self._make_session() + cmd = bytes([0x00, 0x70]) + payload = b"\xab\xcd" + counter = 42 + + encrypted = encrypt_command(session_key, session_id, counter, cmd, payload) + + assert encrypted[:2] == cmd + # nonce_full at [2:18] + assert len(encrypted[2:18]) == 16 + # tag is last 12 bytes + assert len(encrypted[-12:]) == 12 + # total: 2 + 16 + (1+2) + 12 = 33 + assert len(encrypted) == 2 + 16 + (1 + len(payload)) + 12 + + def test_nonce_encodes_counter(self): + """Nonce field contains session_id and counter.""" + session_key, session_id = self._make_session() + cmd = bytes([0x00, 0x50]) + payload = b"" + counter = 7 + + encrypted = encrypt_command(session_key, session_id, counter, cmd, payload) + nonce_full = encrypted[2:18] + + assert nonce_full[:8] == session_id + assert nonce_full[8:] == counter.to_bytes(8, "big") + + def test_different_counters_produce_different_output(self): + """Each counter value produces a distinct ciphertext.""" + session_key, session_id = self._make_session() + cmd = bytes([0x00, 0x50]) + payload = b"test" + + enc1 = encrypt_command(session_key, session_id, 1, cmd, payload) + enc2 = encrypt_command(session_key, session_id, 2, cmd, payload) + assert enc1 != enc2 + + def test_decrypt_too_short_raises(self): + """decrypt_response raises ValueError when data is too short.""" + with pytest.raises(ValueError, match="too short"): + decrypt_response(bytes(16), b"\x00" * 10) + + +class TestGetNonce: + """Tests for get_nonce.""" + + def test_nonce_length(self): + """Full nonce is 16 bytes.""" + nonce = get_nonce(bytes(8), 0) + assert len(nonce) == 16 + + def test_nonce_structure(self): + """session_id(8) || counter_be(8).""" + sid = bytes(range(8)) + counter = 256 + nonce = get_nonce(sid, counter) + assert nonce[:8] == sid + assert nonce[8:] == counter.to_bytes(8, "big") + + +class TestGenerateClientNonce: + """Tests for generate_client_nonce.""" + + def test_returns_16_bytes(self): + """Client nonce is 16 bytes.""" + assert len(generate_client_nonce()) == 16 + + def test_random(self): + """Two consecutive calls return different values.""" + n1 = generate_client_nonce() + n2 = generate_client_nonce() + assert n1 != n2