diff --git a/custom_components/opendisplay/ble/__init__.py b/custom_components/opendisplay/ble/__init__.py index f1b03f7..1f3d2c4 100644 --- a/custom_components/opendisplay/ble/__init__.py +++ b/custom_components/opendisplay/ble/__init__.py @@ -2,8 +2,10 @@ from .color_scheme import ColorScheme # Re-export key classes and functions for backward compatibility from .connection import BLEConnection +from .esp32_ota import perform_esp32_ota from .image_upload import BLEImageUploader from .metadata import BLEDeviceMetadata +from .nrf_dfu import perform_dfu_update, parse_dfu_package from .operations import ( turn_led_on, turn_led_off, @@ -31,6 +33,11 @@ "BLEImageUploader", # Metadata "BLEDeviceMetadata", + # NRF DFU + "perform_dfu_update", + "parse_dfu_package", + # ESP32 OTA + "perform_esp32_ota", # Operations "turn_led_on", "turn_led_off", diff --git a/custom_components/opendisplay/ble/esp32_ota.py b/custom_components/opendisplay/ble/esp32_ota.py new file mode 100644 index 0000000..f1b32ab --- /dev/null +++ b/custom_components/opendisplay/ble/esp32_ota.py @@ -0,0 +1,143 @@ +"""ESP32 BLE OTA update implementation. + +Uses the OpenDisplay BLE OTA protocol (commands 0x0046/0x0047/0x0048) to flash +firmware directly over an existing BLE connection. This is used for ESP32-S3, +ESP32-C3, and ESP32-C6 devices. + +The firmware binary must be the **application-only** ``.bin`` (e.g. +``esp32-s3-N16R8.bin``), **not** the merged ``_full.bin`` which includes the +bootloader and partition table and would be rejected by ``Update.write()``. + +Protocol flow: + HA → [0x00, 0x46, size₀, size₁, size₂, size₃] → ACK {0x00, 0x46} + HA → [0x00, 0x47, chunk...] → ACK {0x00, 0x47} (repeat) + HA → [0x00, 0x48] → ACK {0x00, 0x48} → reboot +""" + +from __future__ import annotations + +import logging +import struct +from collections.abc import Callable +from typing import TYPE_CHECKING + +from .protocol_open_display import ( + CMD_OTA_DATA, + CMD_OTA_END, + CMD_OTA_START, + RESP_ERROR, + RESP_SUCCESS, +) + +if TYPE_CHECKING: + from .connection import BLEConnection + +_LOGGER = logging.getLogger(__name__) + +# Maximum firmware data bytes per BLE write (conservative for BLE MTU). +# The full packet is CMD_OTA_DATA (2 bytes) + payload, so the overall +# BLE write is ESP32_OTA_CHUNK_SIZE + 2. +ESP32_OTA_CHUNK_SIZE = 200 + + +def _check_ota_response(response: bytes, expected_cmd: int) -> None: + """Validate an OTA ACK response from the device. + + Args: + response: Raw response bytes from device + expected_cmd: Expected command echo byte (e.g. 0x46, 0x47, 0x48) + + Raises: + RuntimeError: If response indicates an error or is unexpected + """ + if len(response) < 2: + raise RuntimeError( + f"OTA response too short ({len(response)} bytes): {response.hex()}" + ) + status = response[0] + cmd_echo = response[1] + if status == RESP_ERROR: + raise RuntimeError( + f"Device rejected OTA command 0x{expected_cmd:02x}" + " (not supported on this platform)" + ) + if status != RESP_SUCCESS or cmd_echo != expected_cmd: + raise RuntimeError( + f"Unexpected OTA response for 0x{expected_cmd:02x}: {response.hex()}" + ) + + +async def perform_esp32_ota( + connection: BLEConnection, + firmware_data: bytes, + progress_callback: Callable[[int, int], None] | None = None, +) -> bool: + """Flash firmware to an ESP32 device over BLE using the OTA protocol. + + The device must be connected via BLEConnection using the OpenDisplay + service UUID. The firmware binary (.bin) is sent in three phases: + + 1. OTA Start – sends total firmware size + 2. OTA Data – streams firmware in chunks, ACK per chunk + 3. OTA End – finalises and triggers reboot + + Args: + connection: Active BLEConnection to the device + firmware_data: Raw application firmware binary (.bin) + progress_callback: Optional ``callback(bytes_sent, total_bytes)`` + + Returns: + True if the update completed successfully. + + Raises: + RuntimeError: If the device rejects any OTA command. + """ + total_size = len(firmware_data) + _LOGGER.info( + "Starting ESP32 BLE OTA for %s (%d bytes)", + connection.mac_address, + total_size, + ) + + # --- Step 1: OTA Start --------------------------------------------------- + start_payload = CMD_OTA_START + struct.pack(" DfuPackage: + """Parse an Adafruit nrfutil DFU package (.zip). + + The .zip contains: + - manifest.json + - *.dat (init packet) + - *.bin (firmware binary) + + Args: + zip_data: Raw bytes of the .zip DFU package + + Returns: + DfuPackage: Parsed package with init_packet and firmware + + Raises: + ValueError: If package is missing required files + """ + with zipfile.ZipFile(io.BytesIO(zip_data)) as zf: + dat_file = None + bin_file = None + + for name in zf.namelist(): + if name.endswith(".dat"): + dat_file = zf.read(name) + elif name.endswith(".bin"): + bin_file = zf.read(name) + + if not dat_file or not bin_file: + raise ValueError("DFU package missing .dat or .bin file") + + return DfuPackage(init_packet=dat_file, firmware=bin_file) + + +class NordicDfuController: + """BLE DFU controller for Nordic/Adafruit bootloader.""" + + def __init__(self, client: BleakClient) -> None: + self._client = client + self._response_event = asyncio.Event() + self._response_data: bytes = b"" + + def _notification_handler(self, _sender: int, data: bytearray) -> None: + """Handle DFU control point notifications.""" + self._response_data = bytes(data) + self._response_event.set() + + async def _write_control_point( + self, data: bytes, *, wait_response: bool = True + ) -> bytes: + """Write to DFU control point and optionally wait for response.""" + self._response_event.clear() + await self._client.write_gatt_char( + DFU_CONTROL_POINT_UUID, data, response=True + ) + + if wait_response: + await asyncio.wait_for(self._response_event.wait(), timeout=10.0) + return self._response_data + return b"" + + async def _write_data(self, data: bytes) -> None: + """Write to DFU data characteristic.""" + chunk_size = min(len(data), 200) + offset = 0 + while offset < len(data): + chunk = data[offset : offset + chunk_size] + await self._client.write_gatt_char( + DFU_PACKET_UUID, chunk, response=False + ) + offset += len(chunk) + + async def start(self) -> None: + """Initialize DFU process.""" + await self._client.start_notify( + DFU_CONTROL_POINT_UUID, self._notification_handler + ) + + prn_cmd = struct.pack(" None: + """Send init packet (command object).""" + select_cmd = struct.pack(" None: + """Send firmware data in objects. + + Args: + firmware: Firmware binary data + progress_callback: Optional callback(bytes_sent, total_bytes) + """ + select_cmd = struct.pack(" None: + """Validate DFU response.""" + if len(response) < 3: + raise BleakError(f"DFU response too short: {response.hex()}") + + if response[0] != DfuOpcode.RESPONSE: + raise BleakError( + f"Expected response opcode 0x60, got 0x{response[0]:02x}" + ) + + if response[1] != expected_opcode: + raise BleakError( + f"Response for wrong opcode: expected 0x{expected_opcode:02x}," + f" got 0x{response[1]:02x}" + ) + + if response[2] != DfuResult.SUCCESS: + raise BleakError( + f"DFU operation failed with result code: 0x{response[2]:02x}" + ) + + async def stop(self) -> None: + """Clean up DFU controller.""" + try: + await self._client.stop_notify(DFU_CONTROL_POINT_UUID) + except Exception: # noqa: BLE001 + _LOGGER.debug("Error stopping DFU notifications", exc_info=True) + + +async def perform_dfu_update( + mac_address: str, + dfu_package_data: bytes, + progress_callback=None, + scan_timeout: float = 30.0, +) -> bool: + """Perform complete DFU update on a device already in DFU bootloader mode. + + Args: + mac_address: Original device MAC address + dfu_package_data: Raw bytes of the .zip DFU package + progress_callback: Optional callback(bytes_sent, total_bytes) + scan_timeout: How long to scan for the DFU bootloader + + Returns: + bool: True if update completed successfully + """ + package = parse_dfu_package(dfu_package_data) + _LOGGER.info( + "DFU package: init=%d bytes, firmware=%d bytes", + len(package.init_packet), + len(package.firmware), + ) + + _LOGGER.info("Scanning for DFU bootloader (timeout=%ds)...", scan_timeout) + + def _match_dfu_device(_device, adv_data): + """Match a device advertising the DFU service.""" + if DFU_SERVICE_UUID in (adv_data.service_uuids or []): + return True + if adv_data.local_name and "dfu" in adv_data.local_name.lower(): + return True + return False + + dfu_device = await BleakScanner.find_device_by_filter( + _match_dfu_device, timeout=scan_timeout + ) + + if not dfu_device: + _LOGGER.error( + "DFU bootloader not found after scanning for %ds", scan_timeout + ) + return False + + _LOGGER.info( + "Found DFU bootloader: %s (%s)", dfu_device.name, dfu_device.address + ) + + async with BleakClient(dfu_device, timeout=15.0) as client: + dfu = NordicDfuController(client) + + try: + await dfu.start() + _LOGGER.info("Sending init packet...") + await dfu.send_init_packet(package.init_packet) + _LOGGER.info( + "Sending firmware (%d bytes)...", len(package.firmware) + ) + await dfu.send_firmware(package.firmware, progress_callback) + _LOGGER.info("DFU update completed successfully!") + return True + + except Exception as err: + _LOGGER.error("DFU update failed: %s", err) + raise + finally: + await dfu.stop() diff --git a/custom_components/opendisplay/ble/protocol_open_display.py b/custom_components/opendisplay/ble/protocol_open_display.py index fe0304e..c90ff8a 100644 --- a/custom_components/opendisplay/ble/protocol_open_display.py +++ b/custom_components/opendisplay/ble/protocol_open_display.py @@ -20,6 +20,15 @@ # OpenDisplay protocol constants CMD_READ_CONFIG = bytes([0x00, 0x40]) CMD_READ_FW_VERSION = bytes([0x00, 0x43]) +CMD_ENTER_DFU_OTA = bytes([0x00, 0x44]) +CMD_ENTER_DFU_SERIAL = bytes([0x00, 0x45]) +CMD_OTA_START = bytes([0x00, 0x46]) +CMD_OTA_DATA = bytes([0x00, 0x47]) +CMD_OTA_END = bytes([0x00, 0x48]) + +# Response codes +RESP_SUCCESS = 0x00 +RESP_ERROR = 0xFF def _format_config_summary(config: GlobalConfig, mac_address: str) -> str: @@ -417,6 +426,52 @@ def get_last_config(self) -> GlobalConfig | None: """ return self._last_config + async def enter_dfu_mode(self, connection: "BLEConnection") -> bool: + """Send command to enter OTA DFU bootloader mode. + + The device will: + 1. Respond with [0x00, 0x44] (success) or [0xFF, 0x44] (error) + 2. Reset into the Adafruit BLE DFU bootloader + 3. The BLE connection will be lost + + Args: + connection: Active BLE connection to device + + Returns: + bool: True if device acknowledged the command + + Raises: + BLEProtocolError: If device returns error response + """ + response = await connection.write_command_with_response( + CMD_ENTER_DFU_OTA, timeout=5.0 + ) + + if len(response) >= 2: + status = response[0] + cmd_echo = response[1] + + if status == RESP_SUCCESS and cmd_echo == 0x44: + _LOGGER.info( + "Device %s acknowledged DFU mode entry", + connection.mac_address, + ) + return True + if status == RESP_ERROR: + _LOGGER.error( + "Device %s rejected DFU mode entry" + " (not supported on this platform)", + connection.mac_address, + ) + return False + + _LOGGER.warning( + "Unexpected DFU response from %s: %s", + connection.mac_address, + response.hex(), + ) + return False + async def read_firmware_version(self, connection: "BLEConnection") -> dict: """Read firmware version using command 0x0043. diff --git a/custom_components/opendisplay/strings.json b/custom_components/opendisplay/strings.json index 7a974b6..785eba6 100644 --- a/custom_components/opendisplay/strings.json +++ b/custom_components/opendisplay/strings.json @@ -279,7 +279,7 @@ "preview_features": { "opendisplay_ble_updates": { "name": "OpenDisplay firmware updates", - "description": "Shows a firmware update entity for OpenDisplay BLE tags that compares the installed firmware to the latest OpenDisplay_BLE GitHub release. Install is not available yet." + "description": "Shows a firmware update entity for OpenDisplay BLE tags that compares the installed firmware to the latest GitHub release. Supports OTA installation for NRF52840 (Nordic DFU) and ESP32 (BLE OTA) devices." } }, "exceptions": { @@ -373,6 +373,18 @@ "config_flow_invalid_config": { "message": "Device returned invalid configuration data." }, "config_flow_missing_config": { "message": "Device returned no configuration data." }, "refresh_config_failed": { "message": "Failed to refresh configuration: {error}." }, - "multiple_errors": { "message": "Multiple errors occurred:\n{errors}" } + "multiple_errors": { "message": "Multiple errors occurred:\n{errors}" }, + "dfu_already_in_progress": { "message": "A firmware update is already in progress for {mac_address}." }, + "dfu_no_target_version": { "message": "No target firmware version available for update." }, + "dfu_download_failed": { "message": "Failed to download DFU package for version {version}: HTTP {status_code}." }, + "dfu_invalid_package": { "message": "Downloaded DFU package is invalid: {error}." }, + "dfu_mode_rejected": { "message": "Device {mac_address} rejected DFU mode command. The device may not be an NRF52840 or may not support OTA updates." }, + "dfu_bootloader_not_found": { "message": "DFU bootloader not found for {mac_address} after {timeout}s. The device may have failed to enter DFU mode." }, + "dfu_flash_failed": { "message": "DFU firmware flash failed for {mac_address}: {error}." }, + "dfu_update_failed": { "message": "Firmware update failed for {mac_address}: {error}." }, + "esp32_ota_rejected": { "message": "Device {mac_address} rejected ESP32 OTA command: {error}." }, + "esp32_ota_failed": { "message": "ESP32 OTA update failed for {mac_address}: {error}." }, + "ota_unknown_ic_type": { "message": "Unknown IC type {ic_type} for {mac_address}. Cannot determine OTA method." }, + "ota_firmware_not_found": { "message": "Could not find firmware asset for {chip_name} in release {version}." } } } diff --git a/custom_components/opendisplay/update.py b/custom_components/opendisplay/update.py index 104493f..4c7eeef 100644 --- a/custom_components/opendisplay/update.py +++ b/custom_components/opendisplay/update.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from datetime import datetime, timedelta import logging @@ -11,11 +12,15 @@ UpdateEntityFeature, ) from homeassistant.core import HomeAssistant, callback +from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.entity import EntityCategory from homeassistant.helpers.entity_platform import AddEntitiesCallback -from .ble import BLEDeviceMetadata +from .ble import BLEConnection, BLEDeviceMetadata, get_protocol_by_name +from .ble.esp32_ota import perform_esp32_ota +from .ble.nrf_dfu import parse_dfu_package, perform_dfu_update +from .ble.protocol_open_display import OpenDisplayProtocol from .const import DOMAIN from .entity import OpenDisplayBLEEntity from .runtime_data import OpenDisplayBLERuntimeData @@ -27,6 +32,30 @@ DEFAULT_RELEASE_URL = "https://github.com/OpenDisplay-org/Firmware/releases" CACHE_DURATION = timedelta(hours=6) +# IC type values from OpenDisplay TLV config (system.ic_type) +IC_TYPE_NRF52840 = 1 +IC_TYPE_ESP32_S3 = 2 +IC_TYPE_ESP32_C3 = 3 +IC_TYPE_ESP32_C6 = 4 + +# Mapping from IC type to firmware asset search prefix for GitHub releases. +# NRF52840 uses the DFU .zip package (contains init packet + binary required +# by the Nordic DFU bootloader; .hex/.uf2 lack the init packet). +# ESP32 variants use the application-only .bin (excludes _full.bin merged images). +_IC_TYPE_ASSET_PREFIXES: dict[int, str] = { + IC_TYPE_NRF52840: "NRF52840", + IC_TYPE_ESP32_S3: "esp32-s3-", + IC_TYPE_ESP32_C3: "esp32-c3-", + IC_TYPE_ESP32_C6: "esp32-c6-", +} + +_IC_TYPE_NAMES: dict[int, str] = { + IC_TYPE_NRF52840: "NRF52840", + IC_TYPE_ESP32_S3: "ESP32-S3", + IC_TYPE_ESP32_C3: "ESP32-C3", + IC_TYPE_ESP32_C6: "ESP32-C6", +} + async def async_setup_entry( hass: HomeAssistant, entry, async_add_entities: AddEntitiesCallback @@ -87,7 +116,11 @@ class OpenDisplayBleUpdateEntity(OpenDisplayBLEEntity, UpdateEntity): _attr_translation_key = "opendisplay_ble_firmware" _attr_device_class = UpdateDeviceClass.FIRMWARE _attr_entity_category = EntityCategory.DIAGNOSTIC - _attr_supported_features = UpdateEntityFeature.RELEASE_NOTES + _attr_supported_features = ( + UpdateEntityFeature.INSTALL + | UpdateEntityFeature.RELEASE_NOTES + | UpdateEntityFeature.PROGRESS + ) _attr_should_poll = True _attr_entity_registry_enabled_default = True @@ -108,6 +141,7 @@ def __init__( self._mac = runtime_data.mac_address self._name = runtime_data.name self._session = async_get_clientsession(hass) + self._is_updating = False super().__init__(self._mac, self._name, entry) self._attr_unique_id = f"opendisplay_ble_{self._mac}_firmware_update" self._attr_installed_version = self._compute_installed_version() @@ -206,9 +240,283 @@ async def async_update(self) -> None: else: _LOGGER.debug("Failed to fetch OpenDisplay firmware latest version: %s", msg) + def _get_ic_type(self) -> int | None: + """Return IC type from device metadata, or None if unavailable.""" + return ( + (self._entry_data.device_metadata or {}) + .get("open_display_config", {}) + .get("system", {}) + .get("ic_type") + ) + def version_is_newer(self, latest_version: str, installed_version: str) -> bool: """Use AwesomeVersion for comparison.""" try: return AwesomeVersion(latest_version) > AwesomeVersion(installed_version) except Exception: return latest_version != installed_version + + async def async_install( + self, version: str | None, backup: bool, **kwargs + ) -> None: + """Install firmware update via BLE OTA. + + The update method depends on the device IC type: + - NRF52840: Download .zip DFU package → enter DFU bootloader → Nordic DFU protocol + - ESP32-S3/C3/C6: Download .bin → stream via BLE OTA commands (0x0046/47/48) + """ + if self._is_updating: + _LOGGER.warning("Update already in progress for %s", self._mac) + return + + self._is_updating = True + + try: + target_version = version or self._latest_version + if not target_version: + raise HomeAssistantError("No target version available") + + ic_type = self._get_ic_type() + _LOGGER.info( + "Starting OTA for %s (ic_type=%s, version=%s)", + self._mac, + ic_type, + target_version, + ) + + self._attr_in_progress = True + self.async_write_ha_state() + + if ic_type == IC_TYPE_NRF52840: + await self._install_nrf52840(target_version) + elif ic_type in (IC_TYPE_ESP32_S3, IC_TYPE_ESP32_C3, IC_TYPE_ESP32_C6): + await self._install_esp32(target_version, ic_type) + else: + raise HomeAssistantError( + f"Unknown IC type {ic_type} — cannot determine OTA method" + ) + + # Mark update as complete + self._attr_installed_version = target_version + self._attr_in_progress = False + self.async_write_ha_state() + + _LOGGER.info( + "Firmware update complete for %s: now running %s", + self._mac, + target_version, + ) + + except Exception: + self._attr_in_progress = False + self.async_write_ha_state() + raise + finally: + self._is_updating = False + + # ------------------------------------------------------------------ + # NRF52840: Nordic DFU via bootloader + # ------------------------------------------------------------------ + + async def _install_nrf52840(self, target_version: str) -> None: + """Install firmware on NRF52840 via Nordic DFU bootloader. + + Downloads ``NRF52840.zip`` (the Adafruit nrfutil DFU package) which + contains the init packet (``.dat``) and firmware binary (``.bin``). + The ``.hex`` and ``.uf2`` release assets do NOT contain the init + packet required by the Nordic DFU protocol, so the ``.zip`` is the + only viable format for BLE OTA on this platform. + """ + # Download DFU package + dfu_url = await self._get_firmware_download_url( + target_version, IC_TYPE_NRF52840 + ) + if not dfu_url: + raise HomeAssistantError( + f"Could not find NRF52840.zip in release {target_version}" + ) + + _LOGGER.info("Downloading NRF52840 DFU package from %s", dfu_url) + async with self._session.get(dfu_url) as resp: + if resp.status != 200: + raise HomeAssistantError( + f"Failed to download DFU package: HTTP {resp.status}" + ) + dfu_data = await resp.read() + + _LOGGER.info("Downloaded DFU package: %d bytes", len(dfu_data)) + + # Validate package structure + try: + parse_dfu_package(dfu_data) + except ValueError as err: + raise HomeAssistantError(f"Invalid DFU package: {err}") from err + + # Enter DFU bootloader via command 0x0044 + protocol = get_protocol_by_name("open_display") + assert isinstance(protocol, OpenDisplayProtocol) + + _LOGGER.info("Sending DFU mode command to %s", self._mac) + async with BLEConnection( + self.hass, self._mac, protocol.service_uuid, protocol + ) as conn: + success = await protocol.enter_dfu_mode(conn) + if not success: + raise HomeAssistantError( + "Device rejected DFU mode command (may not be NRF52840)" + ) + + # Wait for device to reset into bootloader + _LOGGER.info("Waiting for device to enter DFU bootloader...") + await asyncio.sleep(3) + + # Perform Nordic DFU flash + def _progress_callback(bytes_sent, total_bytes): + progress = int((bytes_sent / total_bytes) * 100) + self._attr_in_progress = progress + self.async_write_ha_state() + + success = await perform_dfu_update( + mac_address=self._mac, + dfu_package_data=dfu_data, + progress_callback=_progress_callback, + scan_timeout=30.0, + ) + if not success: + raise HomeAssistantError("NRF52840 DFU update failed") + + _LOGGER.info("NRF52840 DFU complete, waiting for reboot...") + await asyncio.sleep(5) + + # ------------------------------------------------------------------ + # ESP32: BLE OTA via commands 0x0046/0x0047/0x0048 + # ------------------------------------------------------------------ + + async def _install_esp32(self, target_version: str, ic_type: int) -> None: + """Install firmware on ESP32 via BLE OTA protocol. + + Downloads the application-only ``.bin`` (e.g. ``esp32-s3-N16R8.bin``), + NOT the merged ``_full.bin`` which includes the bootloader/partition + table and is only for initial USB flashing. + """ + fw_url = await self._get_firmware_download_url(target_version, ic_type) + if not fw_url: + chip_name = _IC_TYPE_NAMES.get(ic_type, f"ic_type={ic_type}") + raise HomeAssistantError( + f"Could not find firmware .bin for {chip_name}" + f" in release {target_version}" + ) + + _LOGGER.info("Downloading ESP32 firmware from %s", fw_url) + async with self._session.get(fw_url) as resp: + if resp.status != 200: + raise HomeAssistantError( + f"Failed to download firmware: HTTP {resp.status}" + ) + fw_data = await resp.read() + + _LOGGER.info("Downloaded ESP32 firmware: %d bytes", len(fw_data)) + + if len(fw_data) == 0: + raise HomeAssistantError("Downloaded firmware file is empty") + + # Stream firmware over BLE + protocol = get_protocol_by_name("open_display") + assert isinstance(protocol, OpenDisplayProtocol) + + def _progress_callback(bytes_sent, total_bytes): + progress = int((bytes_sent / total_bytes) * 100) + self._attr_in_progress = progress + self.async_write_ha_state() + + async with BLEConnection( + self.hass, self._mac, protocol.service_uuid, protocol + ) as conn: + await perform_esp32_ota( + connection=conn, + firmware_data=fw_data, + progress_callback=_progress_callback, + ) + + _LOGGER.info("ESP32 OTA complete, waiting for reboot...") + await asyncio.sleep(5) + + # ------------------------------------------------------------------ + # Firmware asset resolution + # ------------------------------------------------------------------ + + async def _get_firmware_download_url( + self, version: str, ic_type: int + ) -> str | None: + """Find the download URL for the correct firmware asset. + + Release assets and which one each platform needs: + + **NRF52840** — uses Nordic DFU bootloader (cmd 0x0044) which requires + a DFU package containing both an init packet (``.dat``) and firmware + binary (``.bin``). Only ``NRF52840.zip`` has both; the ``.hex`` and + ``.uf2`` assets lack the init packet and cannot be used for BLE DFU. + + **ESP32** — uses BLE OTA (cmds 0x0046/47/48) streaming a raw + application ``.bin`` via ``Update.write()``. The ``_full.bin`` merged + images (bootloader + partitions + app) must be excluded as they are + intended for initial USB flash only. + + Args: + version: Release version tag (e.g. "1.2" or "v1.2") + ic_type: Device IC type constant + + Returns: + Browser download URL for the asset, or *None* if not found. + """ + prefix = _IC_TYPE_ASSET_PREFIXES.get(ic_type) + if prefix is None: + return None + + tags_to_try = ( + [version, f"v{version}"] + if not version.startswith("v") + else [version, version[1:]] + ) + + for tag in tags_to_try: + url = ( + "https://api.github.com/repos/OpenDisplay-org/" + f"Firmware/releases/tags/{tag}" + ) + try: + async with self._session.get( + url, + headers={ + "Accept": "application/vnd.github+json", + "User-Agent": "HomeAssistant-OpenDisplay-OTA", + }, + ) as resp: + if resp.status != 200: + continue + data = await resp.json() + + for asset in data.get("assets", []): + name = asset.get("name", "") + if ic_type == IC_TYPE_NRF52840: + # Exact match for NRF52840.zip + if name == "NRF52840.zip": + return asset.get("browser_download_url") + else: + # ESP32: match prefix, must end with .bin, + # skip merged *_full.bin images + if ( + name.startswith(prefix) + and name.endswith(".bin") + and not name.endswith("_full.bin") + ): + return asset.get("browser_download_url") + except Exception: # noqa: BLE001 + _LOGGER.debug( + "Failed to query GitHub release for tag %s", + tag, + exc_info=True, + ) + continue + + return None