diff --git a/homgarapi/__init__.py b/homgarapi/__init__.py index 2962e90..9718fb0 100644 --- a/homgarapi/__init__.py +++ b/homgarapi/__init__.py @@ -1,10 +1,15 @@ -""" -homgarapi. - -A client library for interacting with the HomGar API -""" +"""HomGar API client library.""" __version__ = "0.0.1" -__author__ = 'Rembrand van Lakwijk' +__author__ = "Rembrand van Lakwijk" + +from .api import HomgarApi, HomgarApiException, load_product_models +from .auth import AuthRetryManager, AuthRetryPolicy -from .api import HomgarApi, HomgarApiException +__all__ = [ + "AuthRetryManager", + "AuthRetryPolicy", + "HomgarApi", + "HomgarApiException", + "load_product_models", +] diff --git a/homgarapi/__main__.py b/homgarapi/__main__.py index 97eb1d7..20da2ed 100644 --- a/homgarapi/__main__.py +++ b/homgarapi/__main__.py @@ -1,63 +1,309 @@ -import logging -import pickle +"""Command line entry point for the HomGar API demo.""" + from argparse import ArgumentParser +from collections.abc import Mapping, MutableMapping +import json +import logging from pathlib import Path -from platformdirs import user_cache_dir +import pickle +from typing import Any +from platformdirs import user_cache_dir import yaml -from homgarapi.api import HomgarApi -from homgarapi.logutil import get_logger, TRACE +from .api import HomgarApi +from .devices import MODEL_CODE_MAPPING +from .dp_spec_builder import ( + extract_model_specs, + load_product_models_payload, + save_model_specs, +) +from .logutil import TRACE, get_logger logging.addLevelName(TRACE, "TRACE") logger = get_logger(__file__) -def demo(api: HomgarApi, config): - api.ensure_logged_in(config['email'], config['password']) +def _extract_models_from_payload(payload: Mapping[str, Any]) -> list[Mapping[str, Any]]: + """Return the list of models contained in a product models payload.""" + + models = payload.get("models") + if isinstance(models, list): + return models + data_section = payload.get("data") + if isinstance(data_section, Mapping): + nested_models = data_section.get("models") + if isinstance(nested_models, list): + return nested_models + return [] + + +def _generate_model_specs_if_requested( + api: HomgarApi, + *, + output_path: Path, + model_codes: list[int] | None, + source_path: Path | None, + product_models_payload: Mapping[str, Any] | None, +) -> Mapping[str, Any] | None: + """Generate a trimmed datapoint cache when triggered by the CLI.""" + + codes = sorted({int(code) for code in model_codes}) if model_codes else sorted(MODEL_CODE_MAPPING.keys()) + + models_payload: list[Mapping[str, Any]] = [] + if source_path is not None: + models_payload = load_product_models_payload(source_path) + if not models_payload: + logger.error( + "No product models were loaded from %s; skipping model spec generation", + source_path, + ) + return product_models_payload + else: + if product_models_payload is None: + product_models_payload = api.get_product_models() + models_payload = _extract_models_from_payload(product_models_payload) + if not models_payload: + logger.warning( + "Product models payload did not contain model entries; unable to generate specs", + ) + return product_models_payload + + trimmed_specs = extract_model_specs(models_payload, codes) + if not trimmed_specs: + logger.warning( + "No datapoint specifications generated for model codes: %s", + ", ".join(str(code) for code in codes), + ) + return product_models_payload + + save_model_specs(trimmed_specs, output_path) + logger.info( + "Wrote model specs for %d model codes to %s", + len(trimmed_specs), + output_path, + ) + return product_models_payload + + +def demo(api: HomgarApi, config: Mapping[str, str]) -> None: + """Run a simple demonstration against the HomGar API. + + :param api: Instantiated `HomgarApi` client. + :param config: Mapping containing at least ``email`` and ``password`` keys. + """ + api.ensure_logged_in(config["email"], config["password"]) for home in api.get_homes(): - print(f"({home.hid}) {home.name}:") + logger.info("Home (%s) %s", home.hid, home.name) for hub in api.get_devices_for_hid(home.hid): - print(f" - {hub}") + logger.info(" Hub %s", hub) api.get_device_status(hub) for subdevice in hub.subdevices: - print(f" + {subdevice}") + description = str(subdevice) + updated = getattr(subdevice, "updated_in_last_poll", False) + if not updated: + description = f"{description} [offline]" + logger.info(" Subdevice %s", description) + +def main() -> None: + """Parse CLI arguments and run the demo. -def main(): + :raises TypeError: If the configuration file does not contain a mapping. + """ argparse = ArgumentParser( description="Demo of HomGar API client library", - prog="homgarapi" + prog="homgarapi", + ) + argparse.add_argument( + "-v", "--verbose", action="store_true", help="Verbose (DEBUG) mode" + ) + argparse.add_argument( + "-vv", "--very-verbose", action="store_true", help="Very verbose (TRACE) mode" + ) + argparse.add_argument( + "-c", + "--cache", + type=Path, + help="Cache file to use. Should be writable, will be created if it does not exist.", + ) + argparse.add_argument( + "--email", + help="HomGar account email address (overrides value in configuration file)", + ) + argparse.add_argument( + "--password", + help="HomGar account password (overrides value in configuration file)", + ) + argparse.add_argument( + "--unknown-output", + type=Path, + help="Optional path to write unsupported device report (YAML)", + ) + argparse.add_argument( + "--dictionary-output", + nargs="?", + type=Path, + const=Path("dictionary.json"), + help=( + "Write the platform dictionary response to disk. Supply an optional path, " + "or omit to use './dictionary.json'." + ), + ) + argparse.add_argument( + "--product-models-output", + nargs="?", + type=Path, + const=Path("product_models.json"), + help=( + "Write the product models response to disk. Supply an optional path, " + "or omit to use './product_models.json'." + ), + ) + argparse.add_argument( + "--model-specs-output", + nargs="?", + type=Path, + const=Path("model_specs.json"), + help=( + "Generate trimmed datapoint specifications for supported model codes. " + "Supply an optional path, or omit to use './model_specs.json'." + ), + ) + argparse.add_argument( + "--model-specs-code", + action="append", + type=int, + dest="model_specs_codes", + help="Model code to include when generating model specs. Can be provided multiple times.", + ) + argparse.add_argument( + "--model-specs-source", + type=Path, + help=( + "Optional path to an existing product_models.json file to use when " + "generating model specs. When omitted, data is fetched from the API." + ), + ) + argparse.add_argument( + "config", + nargs="?", + type=Path, + help="Optional YAML file containing authentication details", ) - argparse.add_argument("-v", "--verbose", action='store_true', help="Verbose (DEBUG) mode") - argparse.add_argument("-vv", "--very-verbose", action='store_true', help="Very verbose (TRACE) mode") - argparse.add_argument("-c", "--cache", type=Path, help="Cache file to use. Should be writable, will be created if it does not exist.") - argparse.add_argument("config", type=Path, help="Yaml file containing email and password to use to log in") args = argparse.parse_args() - logging.basicConfig(level=TRACE if args.very_verbose else logging.DEBUG if args.verbose else logging.INFO) + logging.basicConfig( + level=TRACE + if args.very_verbose + else logging.DEBUG + if args.verbose + else logging.INFO + ) - cache_file = args.cache or (Path(user_cache_dir("homgarapi", ensure_exists=True)) / "cache.pickle") - config_file = args.config + cache_file: Path = args.cache or ( + Path(user_cache_dir("homgarapi", ensure_exists=True)) / "cache.pickle" + ) + config_file: Path | None = args.config - cache = {} + cache: MutableMapping[str, Any] = {} try: - with open(cache_file, 'rb') as f: - cache = pickle.load(f) - except OSError as e: + with cache_file.open("rb") as cache_handle: + cache = pickle.load(cache_handle) + except OSError: logger.info("Could not load cache, starting fresh") - with open(config_file, 'rb') as f: - config = yaml.unsafe_load(f) + config_mapping: dict[str, str] = {} + if config_file is not None: + with config_file.open("rb") as config_handle: + try: + config = yaml.safe_load(config_handle) + except yaml.YAMLError as exc: + raise ValueError( + f"Failed parsing configuration file {config_file}" + ) from exc + if not isinstance(config, Mapping): + msg = f"Configuration file {config_file} must contain a mapping" + raise TypeError(msg) + config_mapping.update({str(key): str(value) for key, value in config.items()}) + + if args.email is not None: + config_mapping["email"] = args.email + if args.password is not None: + config_mapping["password"] = args.password + + required_keys = {"email", "password"} + missing = required_keys - set(config_mapping) + if missing: + msg = ( + "Authentication details must include both email and password. " + f"Missing: {', '.join(sorted(missing))}" + ) + raise ValueError(msg) try: api = HomgarApi(cache) - demo(api, config) + product_models_payload: Mapping[str, Any] | None = None + + demo(api, config_mapping) + + def _write_json(output_path: Path, payload: Any, description: str) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as output_handle: + json.dump(payload, output_handle, indent=2, sort_keys=True) + logger.info("Wrote %s to %s", description, output_path) + + if args.dictionary_output: + dictionary_payload = api.get_dictionary() + _write_json(args.dictionary_output, dictionary_payload, "dictionary data") + + if args.product_models_output: + product_models_payload = api.get_product_models() + _write_json( + args.product_models_output, + product_models_payload, + "product models data", + ) + + if args.model_specs_output: + product_models_payload = _generate_model_specs_if_requested( + api, + output_path=args.model_specs_output, + model_codes=args.model_specs_codes, + source_path=args.model_specs_source, + product_models_payload=product_models_payload, + ) + + unknown_devices_raw = api.get_unknown_devices() + if unknown_devices_raw: + unknown_devices: list[dict[str, Any]] = [] + for device in unknown_devices_raw: + enriched = dict(device) + if status_values := enriched.pop("status_values", None): + enriched["status_payloads"] = status_values + unknown_devices.append(enriched) + output_path = args.unknown_output or cache_file.with_name( + "unknown_devices.yaml" + ) + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as output_handle: + yaml.safe_dump( + {"unknown_devices": unknown_devices}, + output_handle, + sort_keys=False, + allow_unicode=True, + ) + logger.warning( + "Unsupported devices detected. Details written to %s. " + "Please share this file when requesting support for new hardware.", + output_path, + ) finally: - with open(cache_file, 'wb') as f: - pickle.dump(cache, f) + cache_file.parent.mkdir(parents=True, exist_ok=True) + with cache_file.open("wb") as cache_write_handle: + pickle.dump(cache, cache_write_handle) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/homgarapi/api.py b/homgarapi/api.py index 9ab9503..d4d29c4 100644 --- a/homgarapi/api.py +++ b/homgarapi/api.py @@ -1,172 +1,535 @@ +"""Client wrapper for the HomGar REST API.""" + +from __future__ import annotations + import binascii +from collections.abc import Mapping, MutableMapping, Sequence +from datetime import UTC, datetime, timedelta import hashlib +import json import os -from datetime import datetime, timedelta -from typing import Optional, List +from pathlib import Path +import time +from typing import Any, cast import requests -from homgarapi.devices import HomgarHome, MODEL_CODE_MAPPING, HomgarHubDevice -from homgarapi.logutil import TRACE, get_logger +from .auth import AuthRetryManager, AuthRetryPolicy +from .constants import build_model_list +from .devices import MODEL_CODE_MAPPING, HomgarDevice, HomgarHome, HomgarHubDevice +from .logutil import TRACE, get_logger logger = get_logger(__file__) - - class HomgarApiException(Exception): - def __init__(self, code, msg): - super().__init__() - self.code = code - self.msg = msg - - def __str__(self): - s = f"HomGar API returned code {self.code}" - if self.msg: - s += f" ('{self.msg}')" - return s + """Raised when the HomGar API returns a non-success response.""" + def __init__(self, code: int, message: str | None) -> None: + """Store the API error information.""" + super().__init__(code, message) + self.code = code + self.message = message or "" + def __str__(self) -> str: + """Return a readable representation of the error.""" + base = f"HomGar API returned code {self.code}" + return f"{base} ('{self.message}')" if self.message else base class HomgarApi: + """Thin client around the HomGar REST endpoints.""" + def __init__( - self, - auth_cache: Optional[dict] = None, - api_base_url: str = "https://region3.homgarus.com", - requests_session: requests.Session = None - ): + self, + auth_cache: MutableMapping[str, Any] | None = None, + api_base_url: str = "https://region3.homgarus.com", + requests_session: requests.Session | None = None, + ) -> None: + """Initialise the API client. + + :param auth_cache: Mutable mapping used to persist authentication data between runs. + :param api_base_url: Base URL for the HomGar API, without a trailing slash. + :param requests_session: Optional `requests.Session` to reuse; a new one is created when omitted. """ - Create an object for interacting with the Homgar API - :param auth_cache: A dictionary in which authentication information will be stored. - Save this dict on exit and supply it again next time constructing this object to avoid logging in - if a valid token is still present. - :param api_base_url: The base URL for the Homgar API. Omit trailing slash. - :param requests_session: Optional requests lib session to use. New session is created if omitted. + self.session: requests.Session = requests_session or requests.Session() + self.cache: MutableMapping[str, Any] = auth_cache or {} + self.base = api_base_url.rstrip("/") + self._auth_manager = AuthRetryManager(policy=AuthRetryPolicy()) + self._unknown_devices: dict[ + tuple[str | None, str | None, str | None], dict[str, Any] + ] = {} + self._unknown_device_status: dict[tuple[str | None, str], list[str]] = {} + + def _request( + self, + method: str, + url: str, + *, + with_auth: bool = True, + headers: Mapping[str, str] | None = None, + **kwargs: Any, + ) -> requests.Response: + """Make an HTTP request against the HomGar API. + + :param method: HTTP verb to use (GET, POST, ...). + :param url: Fully qualified URL to call. + :param with_auth: Whether to include the cached authentication token. + :param headers: Additional HTTP headers to include with the request. + :param kwargs: Extra keyword arguments forwarded to `requests.Session.request`. + :returns: The HTTP response object. """ - self.session = requests_session or requests.Session() - self.cache = auth_cache or {} - self.base = api_base_url + request_headers = {"lang": "en", "appCode": "1", **(headers or {})} + if with_auth: + token = self.cache.get("token") + if not token: + msg = "Authentication token missing from cache" + raise HomgarApiException(-1, msg) + request_headers["auth"] = str(token) - def _request(self, method, url, with_auth=True, headers=None, **kwargs): logger.log(TRACE, "%s %s %s", method, url, kwargs) - headers = {"lang": "en", "appCode": "1", **(headers or {})} - if with_auth: - headers["auth"] = self.cache["token"] - response = self.session.request(method, url, headers=headers, **kwargs) + response = self.session.request(method, url, headers=request_headers, **kwargs) logger.log(TRACE, "-[%03d]-> %s", response.status_code, response.text) return response - def _request_json(self, method, path, **kwargs): - response = self._request(method, self.base + path, **kwargs).json() - code = response.get('code') + def _request_json(self, method: str, path: str, **kwargs: Any) -> Any: + """Perform a JSON request and return the inner `data` payload. + + :param method: HTTP verb to use (GET, POST, ...). + :param path: Path relative to the configured base URL. + :param kwargs: Extra keyword arguments forwarded to `_request`. + :returns: The JSON `data` field when the call succeeds. + :raises HomgarApiException: If the API reports a non-zero code. + """ + response = self._request(method, f"{self.base}{path}", **kwargs).json() + code = int(response.get("code", -1)) if code != 0: - raise HomgarApiException(code, response.get('msg')) - return response.get('data') + raise HomgarApiException(code, response.get("msg")) + return response.get("data") - def _get_json(self, path, **kwargs): + def _get_json(self, path: str, **kwargs: Any) -> Any: return self._request_json("GET", path, **kwargs) - def _post_json(self, path, body, **kwargs): + def _post_json(self, path: str, body: Mapping[str, Any], **kwargs: Any) -> Any: return self._request_json("POST", path, json=body, **kwargs) - def login(self, email: str, password: str, area_code="31") -> None: + def get_product_models(self) -> Mapping[str, Any]: + """Retrieve the list of supported product models from the API. + + :returns: Mapping containing model metadata (`version`, `modelCodes`, `models`, ...). """ - Perform a new login. - :param email: Account e-mail - :param password: Account password - :param area_code: Seems to need to be the phone country code associated with the account, e.g. "31" for NL + return cast( + Mapping[str, Any], self._get_json("/app/common/core/productModel/json") + ) + + def get_dictionary(self) -> Mapping[str, Any]: + """Retrieve platform dictionary metadata (currency, soil types, etc.).""" + data = self._get_json("/app/common/core/dict") + return cast(Mapping[str, Any], data or {}) + + def login(self, email: str, password: str, area_code: str = "31") -> None: + """Perform a login and cache the resulting tokens. + + :param email: HomGar account e-mail address. + :param password: HomGar account password. + :param area_code: Phone country code associated with the account (for example ``"31"`` for NL). """ - data = self._post_json("/auth/basic/app/login", { + payload = { "areaCode": area_code, "phoneOrEmail": email, - "password": hashlib.md5(password.encode('utf-8')).hexdigest(), - "deviceId": binascii.b2a_hex(os.urandom(16)).decode('utf-8') - }, with_auth=False) - self.cache['email'] = email - self.cache['token'] = data.get('token') - self.cache['token_expires'] = datetime.utcnow().timestamp() + data.get('tokenExpired') - self.cache['refresh_token'] = data.get('refreshToken') - - def get_homes(self) -> List[HomgarHome]: - """ - Retrieves all HomgarHome objects associated with the logged in account. - Requires first logging in. - :return: List of HomgarHome objects + "password": hashlib.md5(password.encode("utf-8")).hexdigest(), + "deviceId": binascii.b2a_hex(os.urandom(16)).decode("utf-8"), + } + data = self._post_json("/auth/basic/app/login", payload, with_auth=False) + expires_in: int = int(data.get("tokenExpired", 0)) + self.cache["email"] = email + self.cache["token"] = data.get("token") + self.cache["token_expires"] = datetime.now(tz=UTC).timestamp() + expires_in + self.cache["refresh_token"] = data.get("refreshToken") + + def get_homes(self) -> list[HomgarHome]: + """Return all homes linked to the current account. + + :returns: List of `HomgarHome` instances. """ data = self._get_json("/app/member/appHome/list") - return [HomgarHome(hid=h.get('hid'), name=h.get('homeName')) for h in data] + homes_data: Sequence[Mapping[str, Any]] = data or [] + result: list[HomgarHome] = [] + for home in homes_data: + hid_value = home.get("hid") + if not isinstance(hid_value, (str, int)): + hid_value = "" if hid_value is None else str(hid_value) + name_value = home.get("homeName") + if not isinstance(name_value, str): + name_value = "" if name_value is None else str(name_value) + result.append(HomgarHome(hid=hid_value, name=name_value)) + return result - def get_devices_for_hid(self, hid: str) -> List[HomgarHubDevice]: - """ - Retrieves a device tree associated with the home identified by the given hid (home ID). - This function returns a list of hubs associated with the home. Each hub contains associated - subdevices that use the hub as gateway. - :param hid: The home ID to retrieve hubs and associated subdevices for - :return: List of hubs with associated subdevicse + def get_devices_for_hid(self, hid: str) -> list[HomgarHubDevice]: + """Return hubs and subdevices for the provided home id. + + :param hid: The home identifier supplied by HomGar. + :returns: List of `HomgarHubDevice` objects populated with subdevices. """ data = self._get_json("/app/device/getDeviceByHid", params={"hid": str(hid)}) - hubs = [] - - def device_base_props(dev_data): - return dict( - model=dev_data.get('model'), - model_code=dev_data.get('modelCode'), - name=dev_data.get('name'), - did=dev_data.get('did'), - mid=dev_data.get('mid'), - address=dev_data.get('addr'), - port_number=dev_data.get('portNumber'), - alerts=dev_data.get('alerts'), - ) + hubs: list[HomgarHubDevice] = [] + devices: Sequence[Mapping[str, Any]] = data or [] + + def device_base_props(dev_data: Mapping[str, Any]) -> dict[str, Any]: + return { + "model": dev_data.get("model"), + "model_code": dev_data.get("modelCode"), + "name": dev_data.get("name"), + "did": dev_data.get("did"), + "mid": dev_data.get("mid"), + "address": dev_data.get("addr"), + "port_number": dev_data.get("portNumber"), + "alerts": dev_data.get("alerts"), + "device_name": dev_data.get("deviceName"), + "product_key": dev_data.get("productKey"), + } - def get_device_class(dev_data): - model_code = dev_data.get('modelCode') - if model_code not in MODEL_CODE_MAPPING: - logger.warning("Unknown device '%s' with modelCode %d", dev_data.get('model'), model_code) + def get_device_class(dev_data: Mapping[str, Any]) -> type[HomgarDevice] | None: + model_code_value = dev_data.get("modelCode") + if not isinstance(model_code_value, (str, int)): + return None + try: + model_code_int = int(model_code_value) + except (TypeError, ValueError): return None - return MODEL_CODE_MAPPING[model_code] - - for hub_data in data: - subdevices = [] - for subdevice_data in hub_data.get('subDevices', []): - did = subdevice_data.get('did') - if did == 1: - # Display hub + + device_class = MODEL_CODE_MAPPING.get(model_code_int) + if device_class is None: + logger.warning( + "Unknown device '%s' with modelCode %s", + dev_data.get("model"), + model_code_value, + ) + return device_class + + for hub_data in devices: + subdevices: list[HomgarDevice] = [] + for subdevice_data in hub_data.get("subDevices", []): + did = subdevice_data.get("did") + if did == 1: # display hub continue subdevice_class = get_device_class(subdevice_data) if subdevice_class is None: + self._record_unknown_device(subdevice_data) continue - subdevices.append(subdevice_class(**device_base_props(subdevice_data))) + subdevice = subdevice_class(**device_base_props(subdevice_data)) + self._enrich_device_metadata(subdevice, subdevice_data) + subdevices.append(subdevice) hub_class = get_device_class(hub_data) - if hub_class is None: - hub_class = HomgarHubDevice - - hubs.append(hub_class( - **device_base_props(hub_data), - subdevices=subdevices - )) + if hub_class is not None and issubclass(hub_class, HomgarHubDevice): + hub_instance = hub_class( + **device_base_props(hub_data), + subdevices=subdevices, + ) + else: + hub_instance = HomgarHubDevice( + **device_base_props(hub_data), + subdevices=subdevices, + ) + self._record_unknown_device(hub_data) + self._enrich_device_metadata(hub_instance, hub_data) + hubs.append(hub_instance) return hubs def get_device_status(self, hub: HomgarHubDevice) -> None: + """Request status updates for all devices attached to a hub. + + :param hub: Hub whose subdevices should be updated in place. """ - Updates the device status of all subdevices associated with the given hub device. - :param hub: The hub to update - """ - data = self._get_json("/app/device/getDeviceStatus", params={"mid": str(hub.mid)}) - id_map = {status_id: device for device in [hub, *hub.subdevices] for status_id in device.get_device_status_ids()} + status_payload: Sequence[Mapping[str, Any]] | None = None - for subdevice_status in data['subDeviceStatus']: - device = id_map.get(subdevice_status['id']) - if device is not None: - device.set_device_status(subdevice_status) + if hub.device_name and hub.product_key: + body = { + "devices": [ + { + "deviceName": hub.device_name, + "mid": str(hub.mid), + "productKey": hub.product_key, + } + ] + } + try: + response = self._post_json("/app/device/multipleDeviceStatus", body) + if isinstance(response, Sequence) and response: + first_entry = response[0] + if isinstance(first_entry, Mapping): + raw_status = first_entry.get("status", []) + if isinstance(raw_status, Sequence): + status_payload = [ + item for item in raw_status if isinstance(item, Mapping) + ] + except HomgarApiException as err: + logger.debug("multipleDeviceStatus failed: %s", err) - def ensure_logged_in(self, email: str, password: str, area_code: str = "31") -> None: - """ - Ensures this API object has valid credentials. - Attempts to verify the token stored in the auth cache. If invalid, attempts to login. - See login() for parameter info. + if status_payload is None: + data = self._get_json( + "/app/device/getDeviceStatus", params={"mid": str(hub.mid)} + ) + raw_status = ( + data.get("subDeviceStatus", []) if isinstance(data, Mapping) else [] + ) + if isinstance(raw_status, Sequence): + status_payload = [ + item for item in raw_status if isinstance(item, Mapping) + ] + else: + status_payload = [] + + id_map: dict[str, HomgarDevice] = {} + for homgar_device in (hub, *hub.subdevices): + homgar_device.updated_in_last_poll = False + status_ids = list(homgar_device.get_device_status_ids()) + device_did = getattr(homgar_device, "did", None) + if device_did is not None: + status_ids.append(str(device_did)) + for status_id in status_ids: + id_map[str(status_id)] = homgar_device + + for subdevice_status in status_payload: + status_id_raw = subdevice_status.get("id") + if status_id_raw is None: + continue + status_key = str(status_id_raw) + matched_device: HomgarDevice | None = ( + id_map.get(status_key) + or id_map.get(status_key.upper()) + or id_map.get(status_key.lower()) + ) + if matched_device is None: + self._record_unknown_status(hub, status_key, subdevice_status) + logger.debug( + "Unmatched status entry for hub %s (%s): id=%s payload=%s", + getattr(hub, "name", hub.mid), + hub.mid, + status_key, + subdevice_status, + ) + continue + matched_device.set_device_status(subdevice_status) + matched_device.updated_in_last_poll = True + + def ensure_logged_in( + self, email: str, password: str, area_code: str = "31" + ) -> None: + """Ensure a valid token is present, refreshing if required. + + :param email: HomGar account e-mail address. + :param password: HomGar account password. + :param area_code: Phone country code associated with the account. """ - if ( - self.cache.get('email') != email or - datetime.fromtimestamp(self.cache.get('token_expires', 0)) - datetime.utcnow() < timedelta(minutes=60) - ): + cache_email = self.cache.get("email") + expires_at = float(self.cache.get("token_expires", 0)) + remaining = datetime.fromtimestamp(expires_at, tz=UTC) - datetime.now(tz=UTC) + if cache_email != email or remaining < timedelta(minutes=60): self.login(email, password, area_code=area_code) + + def ensure_logged_in_with_retries( + self, + email: str, + password: str, + area_code: str = "31", + *, + max_retries: int | None = None, + ) -> None: + """Ensure a valid session exists with retry/backoff behaviour.""" + policy = self._auth_manager.policy + original_max = policy.max_retries + if max_retries is not None: + policy.max_retries = max_retries + + def attempt() -> None: + self.ensure_logged_in(email, password, area_code) + + def wrap(reason: str, detail: float | None) -> HomgarApiException: + if reason == "rate_limited": + wait_msg = ( + f"Too many login failures, please wait {detail:.1f}s before retrying" + if detail is not None + else "Too many login failures, please wait before retrying" + ) + return HomgarApiException(-1, wait_msg) + return HomgarApiException(-1, reason) + + attempt_limit = max(1, policy.max_retries) + attempt_count = 0 + last_error: HomgarApiException | None = None + success = False + + try: + while attempt_count < attempt_limit: + attempt_count += 1 + attempt_ts = datetime.now(tz=UTC).timestamp() + try: + self._auth_manager.execute( + attempt_ts=attempt_ts, + func=attempt, + wrap_exception=wrap, + ) + success = True + if attempt_count > 1: + logger.debug( + "Successfully logged in to HomGar API after %d attempts", + attempt_count, + ) + break + except HomgarApiException as err: + last_error = err + error_code = getattr(err, "code", None) + error_message = str(getattr(err, "message", "")).lower() + if error_code == "invalid_auth": + raise + if "too many login failures" in error_message: + break + if attempt_count >= attempt_limit: + break + backoff = min( + float(policy.max_backoff), + float(policy.min_backoff) * (2 ** (attempt_count - 1)), + ) + logger.debug( + "Login attempt %d/%d failed (%s); retrying in %.1fs", + attempt_count, + attempt_limit, + err, + backoff, + ) + time.sleep(backoff) + finally: + policy.max_retries = original_max + if not success: + if last_error is not None: + raise last_error + raise HomgarApiException(-1, "login_failed") + logger.debug("Successfully logged in to HomGar API") + + def health_check(self) -> bool: + """Return True if the API responds to a basic call.""" + try: + self.get_homes() + except HomgarApiException: + return False + return True + + def get_unknown_devices(self) -> list[Mapping[str, Any]]: + """Return all device payloads that could not be classified.""" + result: list[Mapping[str, Any]] = [] + for key, payload in self._unknown_devices.items(): + mid_str, did_str, addr_str = key + candidate_keys: list[tuple[str | None, str]] = [] + if mid_str is not None and did_str is not None: + candidate_keys.append((mid_str, did_str.upper())) + if mid_str is not None and addr_str is not None: + try: + addr_int = int(addr_str) + except (TypeError, ValueError): + formatted_addr: str | None = None + if isinstance(addr_str, str) and addr_str.upper().startswith("D"): + formatted_addr = addr_str.upper() + else: + formatted_addr = None + if formatted_addr is not None: + candidate_keys.append((mid_str, formatted_addr)) + elif isinstance(addr_str, str): + candidate_keys.append((mid_str, addr_str.upper())) + else: + candidate_keys.append((mid_str, f"D{addr_int:02d}")) + candidate_keys.append((mid_str, f"D{addr_int}")) + candidate_keys.append((mid_str, str(addr_str).upper())) + status_values: list[str] = [] + for candidate in candidate_keys: + values = self._unknown_device_status.get(candidate) + if values: + for value in values: + if value not in status_values: + status_values.append(value) + enriched = dict(payload) + if status_values: + enriched["status_values"] = status_values + result.append(enriched) + return result + + def _record_unknown_device(self, payload: Mapping[str, Any]) -> None: + """Record a device payload that lacks a known model mapping.""" + key = self._build_unknown_device_key(payload) + existing = self._unknown_devices.get(key) + payload_copy = dict(payload) + if existing is not None: + existing.update(payload_copy) + else: + self._unknown_devices[key] = payload_copy + logger.warning( + "Encountered unsupported device (model=%s, modelCode=%s).", + payload.get("model"), + payload.get("modelCode"), + ) + + def _enrich_device_metadata( + self, device: HomgarDevice, payload: Mapping[str, Any] + ) -> None: + """Populate additional metadata on device objects when available.""" + mac = payload.get("mac") + if isinstance(mac, str) and mac: + setattr(device, "mac", mac) + soft_ver = payload.get("softVer") + if isinstance(soft_ver, str) and soft_ver: + setattr(device, "soft_version", soft_ver) + + def _build_unknown_device_key( + self, payload: Mapping[str, Any] + ) -> tuple[str | None, str | None, str | None]: + """Return a stable key for an unknown device payload.""" + mid_raw = payload.get("mid") + did_raw = payload.get("did") + addr_raw = payload.get("addr") + return ( + self._stringify_identifier(mid_raw), + self._stringify_identifier(did_raw), + self._stringify_identifier(addr_raw), + ) + + @staticmethod + def _stringify_identifier(value: Any) -> str | None: + """Coerce identifiers to string form for dictionary keys.""" + if value is None: + return None + if isinstance(value, str): + stripped = value.strip() + return stripped or None + try: + return str(int(value)) + except (TypeError, ValueError): + return str(value) + + def _record_unknown_status( + self, + hub: HomgarHubDevice, + status_key: str, + payload: Mapping[str, Any], + ) -> None: + """Track raw status payloads for devices without model mappings.""" + value = payload.get("value") + if not isinstance(value, str): + return + mid = getattr(hub, "mid", None) + mid_str = self._stringify_identifier(mid) + status_norm = status_key.strip().upper() + key = (mid_str, status_norm) + values = self._unknown_device_status.setdefault(key, []) + if value not in values: + values.append(value) +def load_product_models( + path: str | os.PathLike[str] | None = None, +) -> Mapping[int, Mapping[str, Any]]: + """Load product model metadata from the bundled JSON file.""" + + if path is not None: + target = Path(path) + payload = json.loads(target.read_text(encoding="utf-8")) + models: list[Mapping[str, Any]] = payload["data"]["models"] + return {int(model["modelCode"]): model for model in models} + + models = build_model_list() + return {int(model["modelCode"]): model for model in models} diff --git a/homgarapi/auth.py b/homgarapi/auth.py new file mode 100644 index 0000000..ff8174e --- /dev/null +++ b/homgarapi/auth.py @@ -0,0 +1,103 @@ +"""Authentication retry helpers for HomGar API interactions.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +import random + +from .logutil import get_logger + +_LOGGER = get_logger(__file__) + + +@dataclass(slots=True) +class AuthRetryPolicy: + """Configuration for login retry behaviour.""" + + max_retries: int = 3 + retry_timeout: float = 300.0 + min_backoff: float = 1.0 + max_backoff: float = 60.0 + + +class AuthRetryManager: + """Stateful helper that applies retry/backoff policy for authentication.""" + + def __init__( + self, + *, + policy: AuthRetryPolicy | None = None, + auth_error_predicate: Callable[[str], bool] | None = None, + connection_error_predicate: Callable[[str], bool] | None = None, + ) -> None: + """Initialise the retry manager with optional predicates and policy.""" + self.policy = policy or AuthRetryPolicy() + self._retry_count = 0 + self._last_attempt_ts: float = 0.0 + self._auth_error_predicate = auth_error_predicate or ( + lambda err: any( + indicator in err + for indicator in ("401", "403", "unauthorized", "forbidden", "invalid credentials") + ) + ) + self._connection_error_predicate = connection_error_predicate or ( + lambda err: any( + indicator in err for indicator in ("timeout", "connection", "network", "unreachable") + ) + ) + + @property + def retry_count(self) -> int: + """Return the number of consecutive failed attempts.""" + return self._retry_count + + @property + def last_attempt(self) -> float: + """Return timestamp of the last login attempt.""" + return self._last_attempt_ts + + def execute( + self, + *, + attempt_ts: float, + func: Callable[[], None], + wrap_exception: Callable[[str, float | None], Exception], + ) -> None: + """Execute a login attempt honouring policy.""" + self._ensure_not_rate_limited(attempt_ts, wrap_exception) + + try: + func() + except Exception as err: + self._retry_count += 1 + self._last_attempt_ts = attempt_ts + error_str = str(err).lower() + if self._auth_error_predicate(error_str): + raise wrap_exception("invalid_auth", None) from err + if self._connection_error_predicate(error_str): + raise wrap_exception("connection_timeout", None) from err + raise wrap_exception("login_failed", None) from err + + self._retry_count = 0 + self._last_attempt_ts = attempt_ts + + def _ensure_not_rate_limited( + self, + attempt_ts: float, + wrap_exception: Callable[[str, float | None], Exception], + ) -> None: + """Ensure we are allowed to attempt another login.""" + if self._retry_count < self.policy.max_retries: + return + elapsed = attempt_ts - self._last_attempt_ts + if elapsed < self.policy.retry_timeout: + backoff = self._calculate_backoff_delay(self._retry_count) + raise wrap_exception("rate_limited", backoff) + self._retry_count = 0 + + def _calculate_backoff_delay(self, attempt: int) -> float: + """Return exponential backoff delay with jitter.""" + base_delay = float(min(self.policy.max_backoff, 2**attempt)) + jitter = random.uniform(0.1, 0.5) + return max(float(self.policy.min_backoff), base_delay * jitter) diff --git a/homgarapi/constants.py b/homgarapi/constants.py new file mode 100644 index 0000000..a282e9e --- /dev/null +++ b/homgarapi/constants.py @@ -0,0 +1,100 @@ +"""Static datapoint definitions for supported HomGar devices.""" + +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + +# Minimal datapoint specifications required for supported devices. +# Keys: model code -> datapoint code -> specification details used by the decoder. +PRODUCT_MODEL_SPECS: Mapping[int, Mapping[int, Mapping[str, Any]]] = { + 72: { + 2: {"identity": "S_SMART_VOICE", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 9: {"identity": "STA_TEM", "dataType": 1, "dataTypeSub": 6, "length": 2, "decimal": 1}, + 10: {"identity": "STA_RH", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": 0}, + 25: {"identity": "STA_ILLUMINANCE", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": 1}, + 31: {"identity": "STA_BAT", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 32: {"identity": "STA_RSSI", "dataType": 1, "dataTypeSub": 2, "length": 1, "decimal": None}, + }, + 87: { + 13: {"identity": "STA_TOTAL_RAIN", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 31: {"identity": "STA_BAT", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 32: {"identity": "STA_RSSI", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 43: {"identity": "STA_HOUR_RAIN", "dataType": 1, "dataTypeSub": 2, "length": 2, "decimal": 1}, + 44: {"identity": "STA_DAY_RAIN", "dataType": 1, "dataTypeSub": 2, "length": 2, "decimal": 1}, + 45: {"identity": "STA_7DAY_RAIN", "dataType": 1, "dataTypeSub": 2, "length": 2, "decimal": 1}, + }, + 89: { + 2: {"identity": "S_SMART_VOICE", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 9: {"identity": "STA_TEM", "dataType": 1, "dataTypeSub": 6, "length": 2, "decimal": 1}, + 10: {"identity": "STA_RH", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": 0}, + 22: {"identity": "STA_TREND", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 27: {"identity": "STA_CO2", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 31: {"identity": "STA_BAT", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 33: {"identity": "MAX_TEM", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 34: {"identity": "MAX_RH", "dataType": 1, "dataTypeSub": 2, "length": 2, "decimal": None}, + 47: {"identity": "MAX_CO2", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + }, + 261: { + 1: {"identity": "CTL_WATER", "dataType": 1, "dataTypeSub": 2, "length": 2, "decimal": None}, + 2: {"identity": "STA_ALARM", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 11: {"identity": "CTL_SET_DELAY", "dataType": 0, "dataTypeSub": 0, "length": None, "decimal": None}, + 13: {"identity": "ATTR_SHARE_FLOW", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 15: {"identity": "STA_LASTUSAGE", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 19: {"identity": "STA_DURATION", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 21: {"identity": "STA_EVTIME", "dataType": 5, "dataTypeSub": 10, "length": 4, "decimal": None}, + 30: {"identity": "STA_WKSTATE", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 31: {"identity": "STA_BAT", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 32: {"identity": "STA_RSSI", "dataType": 1, "dataTypeSub": 2, "length": 2, "decimal": None}, + 51: {"identity": "STA_RSSI2", "dataType": 1, "dataTypeSub": 5, "length": 1, "decimal": None}, + 52: {"identity": "STA_EVTIME2", "dataType": 5, "dataTypeSub": 10, "length": 4, "decimal": None}, + }, + 262: { + 2: {"identity": "S_SMART_VOICE", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 9: {"identity": "STA_TEM", "dataType": 1, "dataTypeSub": 6, "length": 2, "decimal": 1}, + 10: {"identity": "STA_RH", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": 0}, + 22: {"identity": "STA_TREND", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 31: {"identity": "STA_BAT", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 33: {"identity": "MAX_TEM", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 34: {"identity": "MAX_RH", "dataType": 1, "dataTypeSub": 2, "length": 2, "decimal": None}, + }, + 268: { + 9: {"identity": "STA_TEM", "dataType": 1, "dataTypeSub": 6, "length": 2, "decimal": 1}, + 22: {"identity": "STA_TREND", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 31: {"identity": "STA_BAT", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 33: {"identity": "MAX_TEM", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + }, + 277: { + 32: {"identity": "STA_RSSI", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 31: {"identity": "STA_BAT", "dataType": 1, "dataTypeSub": 1, "length": 1, "decimal": None}, + 14: {"identity": "STA_VFLOW", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 21: {"identity": "STA_EVTIME", "dataType": 5, "dataTypeSub": 10, "length": 4, "decimal": None}, + 46: {"identity": "STA_CUR_FLOW", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": 1}, + 19: {"identity": "STA_DURATION", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 15: {"identity": "STA_LASTUSAGE", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": 1}, + 49: {"identity": "STA_LAST_DURATION", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + 26: {"identity": "STA_TOTAL_TODAY", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": 1}, + 20: {"identity": "STA_WATER_TOTAL", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": 1}, + 50: {"identity": "STA_OTHER_TOTAL", "dataType": 1, "dataTypeSub": 4, "length": 4, "decimal": None}, + }, + 264: {}, + 273: {}, + 289: {}, +} + + +def build_model_list() -> list[Mapping[str, Any]]: + """Return a list of synthetic model entries compatible with the API metadata schema.""" + + models: list[Mapping[str, Any]] = [] + for model_code, dps in PRODUCT_MODEL_SPECS.items(): + models.append( + { + "modelCode": model_code, + "dp": [ + {"dpCode": dp_code, "identity": spec["identity"], "specs": dict(spec)} + for dp_code, spec in sorted(dps.items()) + ], + } + ) + return models diff --git a/homgarapi/devices.py b/homgarapi/devices.py index 14f9245..73db3a3 100644 --- a/homgarapi/devices.py +++ b/homgarapi/devices.py @@ -1,314 +1,1260 @@ +"""Device model definitions for the HomGar API client.""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable, Mapping +from dataclasses import dataclass +from datetime import UTC, datetime import re -from typing import List +from typing import Any, ClassVar, Final + +from .dp_decoder import decode_status_payload +from .logutil import get_logger + +ParsedStats = tuple[int | None, int | None, int | None, int | None] + +STATS_VALUE_REGEX: Final[re.Pattern[str]] = re.compile(r"^(\d+)\((\d+)/(\d+)/(\d+)\)") + +_LOGGER = get_logger(__file__) + + +@dataclass(frozen=True) +class DeviceSensorMapping: + """Mapping between a sensor key and the function providing its value.""" + + key: str + value_fn: Callable[[Any], Any] + allow_none: bool = True + + def is_available(self, device: Any) -> bool: + """Return True if the sensor has data available on the device.""" + try: + value = self.value_fn(device) + except (AttributeError, KeyError, TypeError, ValueError): + return False + return value is not None or self.allow_none + + +def attr_mapping( + key: str, + *, + attr: str | None = None, + allow_none: bool = True, +) -> DeviceSensorMapping: + """Build a device sensor mapping backed by a simple attribute lookup.""" + + attr_name = attr or key + + def _lookup(device: Any) -> Any: + return getattr(device, attr_name, None) + + return DeviceSensorMapping(key, _lookup, allow_none=allow_none) + + +def _convert_signal_strength(raw: float) -> int: + """Convert signal strength readings to signed dBm values when required.""" + try: + value = int(raw) + except (TypeError, ValueError): + return int(raw) if isinstance(raw, bool) else 0 + # HomGar reports RSSI as unsigned where values > 127 should be interpreted as negative. + if value > 127: + return value - 256 + return value + + +def _battery_percentage_from_state(raw: Any) -> int | None: + """Convert the common battery state datapoint (1=full, 2=medium, 3=low) to percentage.""" + value = _safe_int(raw) + if value is None: + return None + return {1: 100, 2: 50, 3: 0}.get(value, value) + + +def _append_detail_text(base: str, detail: str) -> str: + """Append a formatted detail segment to the base description.""" + detail = detail.strip() + if not detail: + return base + return f"{base}: {detail}" if ":" not in base else f"{base} / {detail}" + -STATS_VALUE_REGEX = re.compile(r'^(\d+)\((\d+)/(\d+)/(\d+)\)') +def _safe_int(value: Any) -> int | None: + """Return ``int(value)`` or ``None`` if conversion fails.""" + try: + return int(value) + except (TypeError, ValueError): + return None -def _parse_stats_value(s): - if match := STATS_VALUE_REGEX.fullmatch(s): - return int(match.group(1)), int(match.group(2)), int(match.group(3)), int(match.group(4)) - else: - return None, None, None, None +def _safe_float(value: Any) -> float | None: + """Return ``float(value)`` or ``None`` if conversion fails.""" + try: + return float(value) + except (TypeError, ValueError): + return None -def _temp_to_mk(f): - return round(1000 * ((int(f) * .1 - 32) * 5 / 9 + 273.15)) +def _mk_to_celsius(value: int | None) -> float | None: + """Convert millikelvin to Celsius.""" + if value is None: + return None + return round(value * 1e-3 - 273.15, 1) + + +def _parse_stats_value(value: str) -> ParsedStats: + """Parse a stats string of the format 'value(max/min/trend)'.""" + match = STATS_VALUE_REGEX.fullmatch(value) + if match: + return ( + int(match.group(1)), + int(match.group(2)), + int(match.group(3)), + int(match.group(4)), + ) + return (None, None, None, None) + + +def _temp_to_mk(raw_fahrenheit_tenths: str | int) -> int: + """Convert tenths of degrees Fahrenheit to millikelvin.""" + raw_value = int(raw_fahrenheit_tenths) + celsius = (raw_value * 0.1 - 32.0) * 5 / 9 + return round((celsius + 273.15) * 1000) + + +def _celsius_to_mk(value: float) -> int: + """Convert degrees Celsius to millikelvin.""" + return round((value + 273.15) * 1000) + + +def _decode_packed_fahrenheit_extrema(raw_value: int | str | None) -> tuple[int | None, int | None]: + """Decode packed Fahrenheit extrema into millikelvin maxima and minima.""" + if raw_value is None: + return (None, None) + packed = _safe_int(raw_value) + if packed is None: + return (None, None) + max_f = (packed >> 16) & 0xFFFF + min_f = packed & 0xFFFF + return (_temp_to_mk(max_f), _temp_to_mk(min_f)) class HomgarHome: - """ - Represents a home in Homgar. - A home can have a number of hubs, each of which can contain sensors/controllers (subdevices). - """ - def __init__(self, hid, name): - self.hid = hid - self.name = name + """Representation of a HomGar home.""" + + hid: str + name: str + + def __init__(self, hid: str | int, name: str | None) -> None: + """Initialise the home model.""" + self.hid = str(hid) + self.name = name or "" class HomgarDevice: - """ - Base class for Homgar devices; both hubs and subdevices. - Each device has a model (name and code), name, some identifiers and may have alerts. - """ + """Base class for HomGar devices.""" - FRIENDLY_DESC = "Unknown HomGar device" + FRIENDLY_DESC: ClassVar[str] = "Unknown HomGar device" + INCLUDE_BASE_TEMPERATURE: ClassVar[bool] = True + INCLUDE_BASE_HUMIDITY: ClassVar[bool] = True - def __init__(self, model, model_code, name, did, mid, alerts, **kwargs): - self.model = model - self.model_code = model_code - self.name = name - self.did = did # the unique device identifier of this device itself - self.mid = mid # the unique identifier of the sensor network - self.alerts = alerts + def __init__( + self, + *, + model: str | None, + model_code: int | None, + name: str | None, + did: str | int | None, + mid: str | int | None, + alerts: Iterable[Any] | None = None, + device_name: str | None = None, + product_key: str | None = None, + **_: Any, + ) -> None: + """Initialise a device with metadata returned by the API.""" + self.model: str | None = model + self.model_code: int | None = ( + int(model_code) if model_code is not None else None + ) + self.name: str = name or "Unknown" + self.did: str = str(did) if did is not None else "unknown" + self.mid: str = str(mid) if mid is not None else "unknown" + self.alerts: list[Any] = list(alerts or []) + self.device_name: str | None = device_name + self.product_key: str | None = product_key + self.status_fields: dict[str, Any] = {} + self.last_status_payload: Mapping[str, Any] | None = None + self.last_seen: str | None = None + self.last_seen_ts: float | None = None - self.address = None - self.rf_rssi = None + self.address: int | None = None + self.rf_rssi: int | None = None + self.updated_in_last_poll: bool = False - def __str__(self): - return f"{self.FRIENDLY_DESC} \"{self.name}\" (DID {self.did})" + def __str__(self) -> str: + """Return a human readable description.""" + return f'{self.FRIENDLY_DESC} "{self.name}" (DID {self.did})' - def get_device_status_ids(self) -> List[str]: - """ - The response for /app/device/getDeviceStatus contains a subDeviceStatus for each of the subdevices. - This function returns which IDs in the subDeviceStatus apply to this device. - Usually this is just Dxx where xx is the device address, but the hub has some additional special keys. - set_device_status() will be called on this object for all subDeviceStatus entries matching any of the - return IDs. - :return: The subDeviceStatus this device should listen to. - """ + def get_device_status_ids(self) -> list[str]: + """Return status identifiers that apply to this device.""" return [] - def set_device_status(self, api_obj: dict) -> None: - """ - Called after a call to /app/device/getDeviceStatus with an entry from $.data.subDeviceStatus - that matches one of the IDs returned by get_device_status_ids(). - Should update the device status with the contents of the given API response. - :param api_obj: The $.data.subDeviceStatus API response that should be used to update this device's status - """ - if api_obj['id'] == f"D{self.address:02d}": - self._parse_status_d_value(api_obj['value']) + def set_device_status(self, api_obj: Mapping[str, Any]) -> None: + """Update the device state with data from the API.""" + self.last_status_payload = api_obj + time_val = api_obj.get("time") + timestamp: float | None = None + if isinstance(time_val, (int, float)): + timestamp = float(time_val) + elif isinstance(time_val, str): + try: + timestamp = float(time_val) + except ValueError: + timestamp = None + if timestamp is not None: + if timestamp > 1e12: + timestamp /= 1000 + try: + seen_dt = datetime.fromtimestamp(timestamp, tz=UTC) + except (OverflowError, OSError, ValueError): + seen_dt = None + if seen_dt is not None: + self.last_seen_ts = timestamp + self.last_seen = seen_dt.isoformat() + if self.address is None: + return + if api_obj.get("id") == f"D{self.address:02d}": + value = api_obj.get("value", "") + if isinstance(value, str): + self._parse_status_d_value(value) - def _parse_status_d_value(self, val: str) -> None: - """ - Parses a $.data.subDeviceStatus[x].value field for an entry with ID 'Dxx' where xx is the device address. - These fields consist of a common part and a device-specific part separated by a ';'. - This call should update the device status. - :param val: Value of the $.data.subDeviceStatus[x].value field to apply - """ - general_str, specific_str = val.split(';') + def supports_sensor(self, sensor_key: str) -> bool: + """Return True if this device supports the requested sensor key.""" + return True + + def _parse_status_d_value(self, payload: str) -> None: + """Parse the common and device-specific sections of a status payload.""" + if ";" not in payload: + self._parse_general_status_d_value(payload) + # Also process device-specific data when the payload does not contain + # a dedicated separator (newer TLV payloads use this format). + self._parse_device_specific_status_d_value(payload) + return + general_str, specific_str = payload.split(";", 1) self._parse_general_status_d_value(general_str) self._parse_device_specific_status_d_value(specific_str) - def _parse_general_status_d_value(self, s: str): - """ - Parses the part of a $.data.subDeviceStatus[x].value field before the ';' character, - which has the same format for all subdevices. It has three ','-separated fields. The first and last fields - are always '1' in my case, I presume it's to do with battery state / connection state. - The second field is the RSSI in dBm. - :param s: The value to parse and apply - """ - unknown_1, rf_rssi, unknown_2 = s.split(',') - self.rf_rssi = int(rf_rssi) + def _parse_general_status_d_value(self, value: str) -> None: + """Parse the general section, capturing the RF RSSI if present.""" + parts = value.split(",") + if len(parts) >= 2: + try: + self.rf_rssi = int(parts[1]) + except ValueError: + self.rf_rssi = None - def _parse_device_specific_status_d_value(self, s: str): - """ - Parses the part of a $.data.subDeviceStatus[x].value field after the ';' character, - which is in a device-specific format. - Should update the device state. - :param s: The value to parse and apply - """ - raise NotImplementedError() + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Parse the device-specific section of the payload.""" + raise NotImplementedError + + def _decode_tlv_payload( + self, + value: str, + *, + model_code: int, + log_label: str | None = None, + ) -> dict[str, Any] | None: + """Decode a TLV payload, updating status fields and returning values.""" + if not value.startswith(("10#", "11#")): + return None + + decoded = decode_status_payload(value, model_code=model_code) + vals = decoded.values + self.status_fields.update(vals) + + if hasattr(self, "raw_status"): + setattr(self, "raw_status", value) + + if log_label: + _LOGGER.debug("Decoded %s payload for %s: %s", log_label, self.did, vals) + + return vals + + def _update_signal_strength(self, value: Any) -> None: + """Update signal strength attributes from a raw reading.""" + strength = _safe_int(value) + if strength is None: + return + converted = _convert_signal_strength(strength) + self.rf_rssi = converted + if hasattr(self, "signal_strength"): + setattr(self, "signal_strength", converted) + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings for common device metrics.""" + if self.rf_rssi is not None: + yield attr_mapping("rf_rssi", allow_none=False) + + if self.INCLUDE_BASE_TEMPERATURE and hasattr(type(self), "temperature_c"): + yield attr_mapping("temperature", attr="temperature_c") + + if self.INCLUDE_BASE_HUMIDITY and hasattr(type(self), "humidity_pct"): + yield attr_mapping("humidity", attr="humidity_pct") + + if getattr(self, "HAS_BATTERY", True): + if hasattr(self, "battery_level"): + yield attr_mapping( + "battery", + attr="battery_level", + ) + if hasattr(self, "battery_state"): + yield attr_mapping( + "battery_state", + attr="battery_state", + ) + + if self.last_seen_ts is not None: + yield DeviceSensorMapping( + "last_seen", + lambda dev: datetime.fromtimestamp(dev.last_seen_ts, tz=UTC) + if getattr(dev, "last_seen_ts", None) is not None + else None, + ) class HomgarHubDevice(HomgarDevice): - """ - A hub acts as a gateway for sensors and actuators (subdevices). - A home contains an arbitrary number of hubs, each of which contains an arbitrary number of subdevices. - """ - def __init__(self, subdevices, **kwargs): + """A hub acts as a gateway for sensors and actuators.""" + + def __init__( + self, *, subdevices: Iterable[HomgarDevice] | None = None, **kwargs: Any + ) -> None: + """Initialise the hub and store its subdevices.""" super().__init__(**kwargs) self.address = 1 - self.subdevices = subdevices + self.subdevices: list[HomgarDevice] = list(subdevices or []) - def __str__(self): + def __str__(self) -> str: + """Return a human readable description for the hub.""" return f"{super().__str__()} with {len(self.subdevices)} subdevices" - def _parse_device_specific_status_d_value(self, s): - pass + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Store raw payload data produced by the hub.""" + self.alerts.append({"raw_status": value}) class HomgarSubDevice(HomgarDevice): - """ - A subdevice is a device that is associated with a hub. - It can be a sensor or an actuator. - """ - def __init__(self, address, port_number, **kwargs): + """A device that is associated with a hub.""" + + def __init__(self, *, address: int, port_number: int, **kwargs: Any) -> None: + """Initialise the subdevice address and port metadata.""" super().__init__(**kwargs) - self.address = address # device address within the sensor network - self.port_number = port_number # the number of ports on the device, e.g. 2 for the 2-zone water timer + self.address = address + self.port_number = port_number + self.signal_strength: int | None = None - def __str__(self): + def __str__(self) -> str: + """Return a human readable description for the subdevice.""" return f"{super().__str__()} at address {self.address}" - def get_device_status_ids(self): + def get_device_status_ids(self) -> list[str]: + """Return identifiers for status updates relevant to this device.""" return [f"D{self.address:02d}"] - def _parse_device_specific_status_d_value(self, s): - pass + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Store raw payload for subclasses that do not override parsing.""" + self.alerts.append({"raw_status": value}) class RainPointDisplayHub(HomgarHubDevice): - MODEL_CODES = [264] - FRIENDLY_DESC = "Irrigation Display Hub" + """RainPoint irrigation display hub.""" + + MODEL_CODES: ClassVar[list[int]] = [264] + FRIENDLY_DESC: ClassVar[str] = "Irrigation Display Hub" + HAS_BATTERY: ClassVar[bool] = False + INCLUDE_BASE_TEMPERATURE: ClassVar[bool] = False + INCLUDE_BASE_HUMIDITY: ClassVar[bool] = False - def __init__(self, **kwargs): + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the display hub.""" super().__init__(**kwargs) - self.wifi_rssi = None - self.battery_state = None - self.connected = None - - self.temp_mk_current = None - self.temp_mk_daily_max = None - self.temp_mk_daily_min = None - self.temp_trend = None - self.hum_current = None - self.hum_daily_max = None - self.hum_daily_min = None - self.hum_trend = None - self.press_pa_current = None - self.press_pa_daily_max = None - self.press_pa_daily_min = None - self.press_trend = None - - def get_device_status_ids(self): + self.wifi_rssi: int | None = None + self.battery_state: int | None = None + self.connected: bool | None = None + + self.temp_mk_current: int | None = None + self.temp_mk_daily_max: int | None = None + self.temp_mk_daily_min: int | None = None + self.temp_trend: int | None = None + self.hum_current: int | None = None + self.hum_daily_max: int | None = None + self.hum_daily_min: int | None = None + self.hum_trend: int | None = None + self.press_pa_current: int | None = None + self.press_pa_daily_max: int | None = None + self.press_pa_daily_min: int | None = None + self.press_trend: int | None = None + self.raw_status: str | None = None + + def get_device_status_ids(self) -> list[str]: + """Return identifiers for hub-specific status updates.""" return ["connected", "state", "D01"] - def set_device_status(self, api_obj): - dev_id = api_obj['id'] - val = api_obj['value'] - if dev_id == "state": - self.battery_state, self.wifi_rssi = [int(s) for s in val.split(',')] + def set_device_status(self, api_obj: Mapping[str, Any]) -> None: + """Handle hub-specific updates before delegating to the base class.""" + dev_id = api_obj.get("id") + val = api_obj.get("value") + if dev_id == "state" and isinstance(val, str): + parts = [segment for segment in val.split(",") if segment] + if len(parts) >= 2: + try: + self.wifi_rssi = int(parts[1]) + except ValueError: + self.wifi_rssi = None elif dev_id == "connected": - self.connected = int(val) == 1 + self.connected = str(val) == "1" else: super().set_device_status(api_obj) - def _parse_device_specific_status_d_value(self, s): - """ - Observed example value: - 781(781/723/1),52(64/50/1),P=10213(10222/10205/1), + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Parse the display hub payload into temperature, humidity, and pressure statistics. + + Observed example value: ``781(781/723/1),52(64/50/1),P=10213(10222/10205/1)``. - Deduced meaning: - temp[.1F](day-max/day-min/trend?),humidity[%](day-max/day-min/trend?),P=pressure[Pa](day-max/day-min/trend?), + Deduced meaning: temperature, humidity, and pressure with day statistics. """ - temp_str, hum_str, press_str, *_ = s.split(',') - self.temp_mk_current, self.temp_mk_daily_max, self.temp_mk_daily_min, self.temp_trend = [_temp_to_mk(v) for v in _parse_stats_value(temp_str)] - self.hum_current, self.hum_daily_max, self.hum_daily_min, self.hum_trend = _parse_stats_value(hum_str) - self.press_pa_current, self.press_pa_daily_max, self.press_pa_daily_min, self.press_trend = _parse_stats_value(press_str[2:]) - def __str__(self): - s = super().__str__() - if self.temp_mk_current: - s += f": {self.temp_mk_current*1e-3:.1f}K / {self.hum_current}% / {self.press_pa_current}Pa" - return s + parts = value.split(",") + if len(parts) < 3: + self.raw_status = value + return + temp_str, hum_str, press_str = parts[0], parts[1], parts[2] + temp_stats = _parse_stats_value(temp_str) + converted_temp = tuple( + _temp_to_mk(stat) if stat is not None else None for stat in temp_stats + ) + ( + self.temp_mk_current, + self.temp_mk_daily_max, + self.temp_mk_daily_min, + self.temp_trend, + ) = converted_temp + self.hum_current, self.hum_daily_max, self.hum_daily_min, self.hum_trend = ( + _parse_stats_value(hum_str) + ) + press_stats = _parse_stats_value(press_str[2:]) + ( + self.press_pa_current, + self.press_pa_daily_max, + self.press_pa_daily_min, + self.press_trend, + ) = press_stats + + def __str__(self) -> str: + """Return a human readable description including current readings.""" + base = super().__str__() + if self.temp_mk_current is not None: + celsius = self.temp_mk_current * 1e-3 - 273.15 + base += ( + f": {celsius:.1f}°C / {self.hum_current}% / {self.press_pa_current}Pa" + ) + return base + + @property + def temperature_c(self) -> float | None: + """Return current temperature in Celsius.""" + return _mk_to_celsius(self.temp_mk_current) + + @property + def temperature_c_max(self) -> float | None: + """Return daily maximum temperature in Celsius.""" + return _mk_to_celsius(self.temp_mk_daily_max) + + @property + def temperature_c_min(self) -> float | None: + """Return daily minimum temperature in Celsius.""" + return _mk_to_celsius(self.temp_mk_daily_min) + + @property + def humidity_pct(self) -> int | None: + """Return current relative humidity percentage.""" + return self.hum_current + + @property + def humidity_pct_max(self) -> int | None: + """Return daily maximum relative humidity percentage.""" + return self.hum_daily_max + + @property + def humidity_pct_min(self) -> int | None: + """Return daily minimum relative humidity percentage.""" + return self.hum_daily_min + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings exposed by the display hub.""" + yield from super().iter_sensor_mappings() + yield attr_mapping("pressure", attr="press_pa_current") + yield attr_mapping("wifi_rssi", attr="wifi_rssi", allow_none=False) + + +class RainPointGatewayHub(HomgarHubDevice): + """RainPoint gateway hub used by newer hardware revisions.""" + + MODEL_CODES: ClassVar[list[int]] = [273, 289] + FRIENDLY_DESC: ClassVar[str] = "RainPoint Gateway Hub" + HAS_BATTERY: ClassVar[bool] = False + INCLUDE_BASE_TEMPERATURE: ClassVar[bool] = False + INCLUDE_BASE_HUMIDITY: ClassVar[bool] = False + + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the gateway hub.""" + super().__init__(**kwargs) + self.battery_level: int | None = None + self.wifi_rssi: int | None = None + self.connected: bool | None = None + + def get_device_status_ids(self) -> list[str]: + """Return identifiers for gateway-specific status updates.""" + return ["connected", "state", "D01"] + + def set_device_status(self, api_obj: Mapping[str, Any]) -> None: + """Handle gateway specific updates before delegating to the base class.""" + dev_id = api_obj.get("id") + val = api_obj.get("value") + if dev_id == "state" and isinstance(val, str): + parts = [segment for segment in val.split(",") if segment] + if len(parts) > 1: + try: + self.wifi_rssi = int(parts[1]) + except ValueError: + self.wifi_rssi = None + elif dev_id == "connected": + self.connected = str(val) == "1" + else: + super().set_device_status(api_obj) + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings exposed by the gateway hub.""" + yield from super().iter_sensor_mappings() + yield attr_mapping("wifi_rssi", attr="wifi_rssi", allow_none=False) + yield DeviceSensorMapping( + "battery_state", + lambda dev: dev.status_fields.get("battery_state") + if getattr(dev, "status_fields", None) + else None, + allow_none=False, + ) class RainPointSoilMoistureSensor(HomgarSubDevice): - MODEL_CODES = [72] - FRIENDLY_DESC = "Soil Moisture Sensor" + """RainPoint soil moisture sensor.""" + + MODEL_CODES: ClassVar[list[int]] = [72] + FRIENDLY_DESC: ClassVar[str] = "Soil Moisture Sensor" - def __init__(self, **kwargs): + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the soil sensor.""" super().__init__(**kwargs) - self.temp_mk_current = None - self.moist_percent_current = None - self.light_lux_current = None + self.temp_mk_current: int | None = None + self.moist_percent_current: int | None = None + self.light_lux_current: float | None = None + self.battery_state: str | None = None + self.battery_level_raw: int | None = None + self.battery_level: int | None = None + self.signal_strength: int | None = None + self.raw_status: str | None = None - def _parse_device_specific_status_d_value(self, s): - """ - Observed example value: - 766,52,G=31351 + def _apply_decoded_values(self, values: Mapping[str, Any]) -> None: + """Apply decoded sensor values to attributes and diagnostics.""" - Deduced meaning: - temp[.1F],soil-moisture[%],G=light[.1lux] - """ - temp_str, moist_str, light_str = s.split(',') - self.temp_mk_current = _temp_to_mk(temp_str) - self.moist_percent_current = int(moist_str) - self.light_lux_current = int(light_str[2:]) * .1 + def _store(field: str, val: Any) -> None: + if val is None: + self.status_fields.pop(field, None) + else: + self.status_fields[field] = val + + temp_in_values = False + temp_c: float | None = None + + if "temperature_mk" in values: + temp_in_values = True + temp_mk = _safe_int(values.get("temperature_mk")) + self.temp_mk_current = temp_mk + if temp_mk is not None: + temp_c = _mk_to_celsius(temp_mk) + + if "temperature_c" in values: + temp_in_values = True + temp_c = _safe_float(values.get("temperature_c")) + if temp_c is not None: + self.temp_mk_current = _celsius_to_mk(temp_c) + else: + self.temp_mk_current = None + elif "temperature_f" in values and not temp_in_values: + temp_in_values = True + temp_f = _safe_float(values.get("temperature_f")) + if temp_f is not None: + temp_c = round((temp_f - 32.0) * 5.0 / 9.0, 1) + self.temp_mk_current = _celsius_to_mk(temp_c) + else: + self.temp_mk_current = None - def __str__(self): - s = super().__str__() - if self.temp_mk_current: - s += f": {self.temp_mk_current*1e-3-273.15:.1f}°C / {self.moist_percent_current}% / {self.light_lux_current:.1f}lx" - return s + if temp_in_values: + _store("temperature_c", temp_c) + + if "humidity_pct" in values: + moist = _safe_int(values.get("humidity_pct")) + self.moist_percent_current = moist + _store("humidity_pct", moist) + + if "illuminance_lux" in values: + lux = _safe_float(values.get("illuminance_lux")) + self.light_lux_current = lux + _store("illuminance_lux", lux) + + battery_raw_present = "battery_state_raw" in values + if battery_raw_present: + battery_raw = _safe_int(values.get("battery_state_raw")) + self.battery_level_raw = battery_raw + _store("battery_state_raw", battery_raw) + self.battery_level = _battery_percentage_from_state(battery_raw) + _store("battery_level", self.battery_level) + elif "battery_level" in values: + battery_level = _safe_int(values.get("battery_level")) + self.battery_level = battery_level + _store("battery_level", battery_level) + + if "battery_state" in values: + battery_state = values.get("battery_state") + self.battery_state = ( + str(battery_state) if battery_state is not None else None + ) + _store("battery_state", self.battery_state) + elif battery_raw_present and self.battery_state is not None: + # Keep decoder-derived state when raw value disappears. + _store("battery_state", self.battery_state) + + if "signal_strength" in values: + signal = values.get("signal_strength") + if signal is not None: + self._update_signal_strength(signal) + _store("signal_strength", self.signal_strength) + _store("rf_rssi", self.rf_rssi) + else: + self.rf_rssi = None + self.signal_strength = None + _store("signal_strength", None) + _store("rf_rssi", None) + + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Parse the soil moisture payload into temperature, soil, and light readings.""" + if (vals := self._decode_tlv_payload( + value, + model_code=72, + log_label="soil", + )) is not None: + self._apply_decoded_values(vals) + return + + self.raw_status = value + + def __str__(self) -> str: + """Return a human readable description including current readings.""" + base = super().__str__() + temp_c = self.temperature_c + moisture = self.moist_percent_current + if temp_c is not None and moisture is not None: + detail_parts = [f"{temp_c:.1f}°C", f"{moisture}%"] + if self.light_lux_current is not None: + detail_parts.append(f"{self.light_lux_current:.1f}lx") + base = _append_detail_text(base, " / ".join(detail_parts)) + elif self.light_lux_current is not None: + base = _append_detail_text(base, f"{self.light_lux_current:.1f}lx") + if self.battery_level is not None: + base = _append_detail_text(base, f"Battery {self.battery_level}%") + return base + + @property + def temperature_c(self) -> float | None: + """Return current soil temperature in Celsius.""" + return _mk_to_celsius(self.temp_mk_current) + + @property + def humidity_pct(self) -> int | None: + """Return soil humidity percentage if available.""" + return self.moist_percent_current + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings exposed by the soil sensor.""" + yield from super().iter_sensor_mappings() + yield attr_mapping("light", attr="light_lux_current") class RainPointRainSensor(HomgarSubDevice): - MODEL_CODES = [87] - FRIENDLY_DESC = "High Precision Rain Sensor" + """RainPoint rainfall sensor.""" - def __init__(self, **kwargs): + MODEL_CODES: ClassVar[list[int]] = [87] + FRIENDLY_DESC: ClassVar[str] = "High Precision Rain Sensor" + INCLUDE_BASE_TEMPERATURE: ClassVar[bool] = False + INCLUDE_BASE_HUMIDITY: ClassVar[bool] = False + + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the rain sensor.""" super().__init__(**kwargs) - self.rainfall_mm_total = None - self.rainfall_mm_hour = None - self.rainfall_mm_daily = None - self.rainfall_mm_total = None + self.rainfall_mm_total: float | None = None + self.rainfall_mm_hour: float | None = None + self.rainfall_mm_daily: float | None = None + self.rainfall_mm_7days: float | None = None + self.battery_level_raw: int | None = None + self.battery_level: int | None = None + self.battery_state: str | None = None + self.signal_strength: int | None = None + self.raw_status: str | None = None - def _parse_device_specific_status_d_value(self, s): - """ - Observed example value: - R=270(0/0/270) + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Parse the rainfall payload into rolling accumulation metrics.""" + if (vals := self._decode_tlv_payload( + value, + model_code=87, + log_label="rain", + )) is not None: + def _scaled(name: str) -> float | None: + measured = _safe_float(vals.get(name)) + return measured * 0.1 if measured is not None else None - Deduced meaning: - R=total?[.1mm](hour?[.1mm]/24hours?[.1mm]/7days?[.1mm]) - """ - self.rainfall_mm_total, self.rainfall_mm_hour, self.rainfall_mm_daily, self.rainfall_mm_7days = [.1*v for v in _parse_stats_value(s[2:])] + if (total := _scaled("STA_TOTAL_RAIN")) is not None: + self.rainfall_mm_total = total + if (hour := _scaled("STA_HOUR_RAIN")) is not None: + self.rainfall_mm_hour = hour + if (daily := _scaled("STA_DAY_RAIN")) is not None: + self.rainfall_mm_daily = daily + if (seven_day := _scaled("STA_7DAY_RAIN")) is not None: + self.rainfall_mm_7days = seven_day + + if (signal := vals.get("signal_strength")) is not None: + self._update_signal_strength(signal) + + if (battery_raw := vals.get("battery_state_raw")) is not None: + battery_raw_int = _safe_int(battery_raw) + self.battery_level_raw = battery_raw_int + self.battery_level = _battery_percentage_from_state(battery_raw_int) + if self.battery_level is not None: + self.status_fields["battery_level"] = self.battery_level + if "battery_state" in vals: + battery_state = vals["battery_state"] + self.battery_state = str(battery_state) if battery_state is not None else None + return + + self.raw_status = value - def __str__(self): - s = super().__str__() - if self.rainfall_mm_total: - s += f": {self.rainfall_mm_total}mm total / {self.rainfall_mm_hour}mm 1h / {self.rainfall_mm_daily}mm 24h / {self.rainfall_mm_7days}mm 7days" - return s + def __str__(self) -> str: + """Return a human readable description including rainfall totals.""" + base = super().__str__() + detail_parts: list[str] = [] + if self.rainfall_mm_total is not None: + detail_parts.append(f"{self.rainfall_mm_total}mm total") + if self.rainfall_mm_hour is not None: + detail_parts.append(f"{self.rainfall_mm_hour}mm 1h") + if self.rainfall_mm_daily is not None: + detail_parts.append(f"{self.rainfall_mm_daily}mm 24h") + if self.rainfall_mm_7days is not None: + detail_parts.append(f"{self.rainfall_mm_7days}mm 7days") + if detail_parts: + base = _append_detail_text(base, " / ".join(detail_parts)) + if self.battery_level is not None: + base = _append_detail_text(base, f"Battery {self.battery_level}%") + return base + + def supports_sensor(self, sensor_key: str) -> bool: + """Disable RF RSSI sensor when the API does not provide it.""" + if sensor_key == "rf_rssi": + return False + return super().supports_sensor(sensor_key) + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings exposed by the rain sensor.""" + yield from super().iter_sensor_mappings() + yield attr_mapping("rainfall_total", attr="rainfall_mm_total") + yield attr_mapping("rainfall_hourly", attr="rainfall_mm_hour") + yield attr_mapping("rainfall_daily", attr="rainfall_mm_daily") + yield attr_mapping("rainfall_weekly", attr="rainfall_mm_7days") class RainPointAirSensor(HomgarSubDevice): - MODEL_CODES = [262] - FRIENDLY_DESC = "Outdoor Air Humidity Sensor" + """RainPoint outdoor air sensor.""" + + MODEL_CODES: ClassVar[list[int]] = [262] + FRIENDLY_DESC: ClassVar[str] = "Outdoor Air Humidity Sensor" - def __init__(self, **kwargs): + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the air sensor.""" super().__init__(**kwargs) - self.temp_mk_current = None - self.temp_mk_daily_max = None - self.temp_mk_daily_min = None - self.temp_trend = None - self.hum_current = None - self.hum_daily_max = None - self.hum_daily_min = None - self.hum_trend = None - - def _parse_device_specific_status_d_value(self, s): - """ - Observed example value: - 755(1020/588/1),54(91/24/1), + self.temp_mk_current: int | None = None + self.temp_mk_daily_max: int | None = None + self.temp_mk_daily_min: int | None = None + self.temp_trend: int | None = None + self.hum_current: int | None = None + self.hum_daily_max: int | None = None + self.hum_daily_min: int | None = None + self.hum_trend: int | None = None + self.battery_state: str | None = None + self.battery_level_raw: int | None = None + self.battery_level: int | None = None + self.raw_status: str | None = None + self.temp_c_max: float | None = None + self.temp_c_min: float | None = None + self.hum_pct_max: int | None = None + self.hum_pct_min: int | None = None - Deduced meaning: - temp[.1F](day-max/day-min/trend?),humidity[%](day-max/day-min/trend?) - """ - temp_str, hum_str, *_ = s.split(',') - self.temp_mk_current, self.temp_mk_daily_max, self.temp_mk_daily_min, self.temp_trend = [_temp_to_mk(v) for v in _parse_stats_value(temp_str)] - self.hum_current, self.hum_daily_max, self.hum_daily_min, self.hum_trend = _parse_stats_value(hum_str) + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Parse the air sensor payload into temperature and humidity statistics.""" + if (vals := self._decode_tlv_payload( + value, + model_code=262, + log_label="air", + )) is not None: + max_mk, min_mk = _decode_packed_fahrenheit_extrema(vals.get("MAX_TEM")) + if max_mk is not None: + self.temp_mk_daily_max = max_mk + self.temp_c_max = _mk_to_celsius(max_mk) + if min_mk is not None: + self.temp_mk_daily_min = min_mk + self.temp_c_min = _mk_to_celsius(min_mk) + + if (temp_c := _safe_float(vals.get("temperature_c"))) is not None: + self.temp_mk_current = _celsius_to_mk(temp_c) + self.temp_c_max = self.temp_c_max or temp_c + self.temp_c_min = self.temp_c_min or temp_c + + if (humidity := _safe_int(vals.get("humidity_pct"))) is not None: + self.hum_current = humidity + self.hum_pct_max = self.hum_pct_max or self.hum_current + self.hum_pct_min = self.hum_pct_min or self.hum_current + + if (humidity_stats := _safe_int(vals.get("MAX_RH"))) is not None: + self.hum_daily_max = (humidity_stats >> 8) & 0xFF + self.hum_daily_min = humidity_stats & 0xFF + self.hum_pct_max = self.hum_daily_max + self.hum_pct_min = self.hum_daily_min + + if (battery_raw := _safe_int(vals.get("battery_state_raw"))) is not None: + self.battery_level_raw = battery_raw + self.battery_level = _battery_percentage_from_state(battery_raw) + if self.battery_level is not None: + self.status_fields["battery_level"] = self.battery_level + if "battery_state" in vals: + self.battery_state = vals["battery_state"] + return + + self.raw_status = value + + def __str__(self) -> str: + """Return a human readable description including temperature and humidity.""" + base = super().__str__() + temp_c = self.temperature_c + if temp_c is not None: + detail_parts = [f"{temp_c:.1f}°C"] + if self.hum_current is not None: + detail_parts.append(f"{self.hum_current}%") + base = _append_detail_text(base, " / ".join(detail_parts)) + elif self.hum_current is not None: + base = _append_detail_text(base, f"{self.hum_current}%") + if self.battery_level is not None: + base = _append_detail_text(base, f"Battery {self.battery_level}%") + return base + + @property + def temperature_c(self) -> float | None: + """Return current air temperature in Celsius.""" + return _mk_to_celsius(self.temp_mk_current) + + @property + def temperature_c_max(self) -> float | None: + """Return daily maximum air temperature in Celsius.""" + return self.temp_c_max or _mk_to_celsius(self.temp_mk_daily_max) + + @property + def temperature_c_min(self) -> float | None: + """Return daily minimum air temperature in Celsius.""" + return self.temp_c_min or _mk_to_celsius(self.temp_mk_daily_min) + + @property + def humidity_pct(self) -> int | None: + """Return current relative humidity percentage.""" + return self.hum_current + + @property + def humidity_pct_max(self) -> int | None: + """Return daily maximum relative humidity percentage.""" + return self.hum_pct_max or self.hum_daily_max + + @property + def humidity_pct_min(self) -> int | None: + """Return daily minimum relative humidity percentage.""" + return self.hum_pct_min or self.hum_daily_min + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings exposed by the air sensor.""" + yield from super().iter_sensor_mappings() + yield attr_mapping("temperature_max", attr="temperature_c_max") + yield attr_mapping("temperature_min", attr="temperature_c_min") + yield attr_mapping("humidity_max", attr="humidity_pct_max") + yield attr_mapping("humidity_min", attr="humidity_pct_min") + + +class RainPointCO2Sensor(HomgarSubDevice): + """RainPoint CO₂ sensor.""" + + MODEL_CODES: ClassVar[list[int]] = [89] + FRIENDLY_DESC: ClassVar[str] = "CO₂ Sensor" + + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the CO₂ sensor.""" + super().__init__(**kwargs) + self.temp_mk_current: int | None = None + self.hum_current: int | None = None + self.co2_ppm: int | None = None + self.co2_min_ppm: int | None = None + self.co2_max_ppm: int | None = None + self.co2_alert_ppm: int | None = None + self.battery_state: str | None = None + self.battery_level_raw: int | None = None + self.battery_level: int | None = None + self.raw_status: str | None = None + + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Decode TLV payload with CO₂, temperature and humidity measurements.""" + if (vals := self._decode_tlv_payload( + value, + model_code=89, + log_label="CO₂", + )) is not None: + if (co2_int := _safe_int(vals.get("STA_CO2"))) is not None: + self.co2_ppm = co2_int & 0xFFFF + self.co2_alert_ppm = (co2_int >> 16) & 0xFFFF + self.status_fields["co2_ppm"] = self.co2_ppm + self.status_fields["co2_alert_ppm"] = self.co2_alert_ppm + + if (limits_int := _safe_int(vals.get("MAX_CO2"))) is not None: + self.co2_max_ppm = limits_int & 0xFFFF + self.co2_min_ppm = (limits_int >> 16) & 0xFFFF + elif ( + (co2_bytes := vals.get("dp_8")) + and isinstance(co2_bytes, bytes) + and len(co2_bytes) >= 4 + ): + self.co2_min_ppm = int.from_bytes(co2_bytes[0:2], "little") + self.co2_max_ppm = int.from_bytes(co2_bytes[2:4], "little") + + if self.co2_min_ppm is not None: + self.status_fields["co2_min_ppm"] = self.co2_min_ppm + if self.co2_max_ppm is not None: + self.status_fields["co2_max_ppm"] = self.co2_max_ppm + + if (temp_c := _safe_float(vals.get("temperature_c"))) is not None: + self.temp_mk_current = _celsius_to_mk(temp_c) + if (humidity := _safe_int(vals.get("humidity_pct"))) is not None: + self.hum_current = humidity + if (battery_raw := _safe_int(vals.get("battery_state_raw"))) is not None: + self.battery_level_raw = battery_raw + self.battery_level = _battery_percentage_from_state(battery_raw) + if self.battery_level is not None: + self.status_fields["battery_level"] = self.battery_level + if "battery_state" in vals: + self.battery_state = vals["battery_state"] + return - def __str__(self): - s = super().__str__() - if self.temp_mk_current: - s += f": {self.temp_mk_current*1e-3-273.15:.1f}°C / {self.hum_current}%" - return s + self.raw_status = value + + @property + def temperature_c(self) -> float | None: + """Return current ambient temperature in Celsius.""" + return _mk_to_celsius(self.temp_mk_current) + + @property + def humidity_pct(self) -> int | None: + """Return current relative humidity percentage.""" + return self.hum_current + + def supports_sensor(self, sensor_key: str) -> bool: + """Expose extra CO₂-specific sensors while keeping default handling.""" + if sensor_key.startswith("co2"): + return True + return super().supports_sensor(sensor_key) + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings exposed by the CO₂ sensor.""" + yield from super().iter_sensor_mappings() + yield attr_mapping("co2", attr="co2_ppm") + yield attr_mapping("co2_min", attr="co2_min_ppm") + yield attr_mapping("co2_max", attr="co2_max_ppm") + yield attr_mapping("co2_alert", attr="co2_alert_ppm") + + def __str__(self) -> str: + """Return human readable description including CO₂, temperature, humidity.""" + base = super().__str__() + details: list[str] = [] + if self.co2_ppm is not None: + details.append(f"{self.co2_ppm}ppm CO₂") + temp = self.temperature_c + if temp is not None: + details.append(f"{temp:.1f}°C") + if self.hum_current is not None: + details.append(f"{self.hum_current}% RH") + if self.battery_level is not None: + details.append(f"Battery {self.battery_level}%") + if not details: + return base + return f"{base}: {', '.join(details)}" + + +class DiivooWaterFlowMeter(HomgarSubDevice): + """Diivoo water usage meter.""" + + MODEL_CODES: ClassVar[list[int]] = [277] + FRIENDLY_DESC: ClassVar[str] = "Diivoo Water Flow Meter" + INCLUDE_BASE_TEMPERATURE: ClassVar[bool] = False + INCLUDE_BASE_HUMIDITY: ClassVar[bool] = False + + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the water meter.""" + super().__init__(**kwargs) + self.flow_rate_lpm: float | None = None + self.water_usage_total_liters: float | None = None + self.water_usage_today_liters: float | None = None + self.water_last_usage_liters: float | None = None + self.water_other_total_liters: float | None = None + self.current_duration_seconds: int | None = None + self.last_duration_seconds: int | None = None + self.last_event_time: int | None = None + self.battery_level: int | None = None + self.battery_level_raw: int | None = None + self.battery_state: str | None = None + self.raw_status: str | None = None + + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Decode TLV payload with flow and water usage metrics.""" + if (vals := self._decode_tlv_payload( + value, + model_code=277, + log_label="diivoo_water", + )) is not None: + def _scaled(name: str, scale: float = 0.1) -> float | None: + measured = _safe_float(vals.get(name)) + return measured * scale if measured is not None else None + + if (total := _scaled("STA_WATER_TOTAL")) is not None: + self.water_usage_total_liters = total + self.status_fields["water_usage_total_liters"] = total + if (today := _scaled("STA_TOTAL_TODAY")) is not None: + self.water_usage_today_liters = today + self.status_fields["water_usage_today_liters"] = today + if (last := _scaled("STA_LASTUSAGE")) is not None: + self.water_last_usage_liters = last + self.status_fields["water_last_usage_liters"] = last + if (other_total := _safe_float(vals.get("STA_OTHER_TOTAL"))) is not None: + self.water_other_total_liters = other_total + self.status_fields["water_other_total_liters"] = other_total + if (flow := _scaled("STA_CUR_FLOW")) is not None: + self.flow_rate_lpm = flow + self.status_fields["water_flow_lpm"] = flow + + if (duration := _safe_int(vals.get("STA_DURATION"))) is not None: + self.current_duration_seconds = duration + self.status_fields["watering_duration_seconds"] = duration + if (last_duration := _safe_int(vals.get("STA_LAST_DURATION"))) is not None: + self.last_duration_seconds = last_duration + self.status_fields["watering_last_duration_seconds"] = last_duration + if (event_time := _safe_int(vals.get("STA_EVTIME"))) is not None: + self.last_event_time = event_time + self.status_fields["last_event_epoch"] = event_time + + if (signal := vals.get("signal_strength")) is not None: + self._update_signal_strength(signal) + + if (battery_raw := _safe_int(vals.get("battery_state_raw"))) is not None: + self.battery_level = _battery_percentage_from_state(battery_raw) + self.battery_level_raw = battery_raw + if self.battery_level is not None: + self.status_fields["battery_level"] = self.battery_level + + if "battery_state" in vals: + self.battery_state = vals["battery_state"] + return + + self.raw_status = value + + def __str__(self) -> str: + """Return a human readable description including flow and totals.""" + base = super().__str__() + details: list[str] = [] + if self.water_usage_total_liters is not None: + details.append(f"{self.water_usage_total_liters:.1f} L total") + if self.water_usage_today_liters is not None: + details.append(f"{self.water_usage_today_liters:.1f} L today") + if self.flow_rate_lpm is not None: + details.append(f"{self.flow_rate_lpm:.1f} L/min") + if details: + base = _append_detail_text(base, " / ".join(details)) + return base + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings exposed by the water meter.""" + yield from super().iter_sensor_mappings() + yield attr_mapping("water_usage_total", attr="water_usage_total_liters") + yield attr_mapping("water_usage_today", attr="water_usage_today_liters") + yield attr_mapping("water_usage_last", attr="water_last_usage_liters") + yield attr_mapping("water_flow", attr="flow_rate_lpm") + yield attr_mapping("watering_duration", attr="current_duration_seconds") + yield attr_mapping("watering_last_duration", attr="last_duration_seconds") + + +class RainPointPoolSensor(HomgarSubDevice): + """RainPoint pool temperature sensor.""" + + MODEL_CODES: ClassVar[list[int]] = [268] + FRIENDLY_DESC: ClassVar[str] = "Pool Temperature Sensor" + INCLUDE_BASE_TEMPERATURE: ClassVar[bool] = True + INCLUDE_BASE_HUMIDITY: ClassVar[bool] = False + + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the pool sensor.""" + super().__init__(**kwargs) + self.water_temp_c: float | None = None + self.temp_mk_current: int | None = None + self.temp_mk_daily_max: int | None = None + self.temp_mk_daily_min: int | None = None + self.water_temp_f: float | None = None + self.battery_level: int | None = None + self.battery_level_raw: int | None = None + self.raw_status: str | None = None + self.trend_raw: int | None = None + self.battery_state: str | None = None + self.signal_strength: int | None = None + + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Decode the raw payload using the product model definitions.""" + if (vals := self._decode_tlv_payload( + value, + model_code=268, + log_label="pool", + )) is not None: + if (temp_c := _safe_float(vals.get("temperature_c"))) is not None: + self.water_temp_c = temp_c + self.temp_mk_current = _celsius_to_mk(temp_c) + if (temp_f := _safe_float(vals.get("temperature_f"))) is not None: + self.water_temp_f = temp_f + + max_mk, min_mk = _decode_packed_fahrenheit_extrema(vals.get("MAX_TEM")) + if max_mk is not None: + self.temp_mk_daily_max = max_mk + if min_mk is not None: + self.temp_mk_daily_min = min_mk + + if (battery_raw := _safe_int(vals.get("battery_state_raw"))) is not None: + self.battery_level = _battery_percentage_from_state(battery_raw) + self.battery_level_raw = battery_raw + if self.battery_level is not None: + self.status_fields["battery_level"] = self.battery_level + if "battery_state" in vals: + self.battery_state = vals["battery_state"] + if (signal := vals.get("signal_strength")) is not None: + self._update_signal_strength(signal) + if (trend := _safe_int(vals.get("trend_raw"))) is not None: + self.trend_raw = trend + return + + self.raw_status = value + + @property + def temperature_c(self) -> float | None: + """Return current pool temperature in Celsius for compatibility.""" + return _mk_to_celsius(self.temp_mk_current) + + @property + def water_temperature_c(self) -> float | None: + """Return current pool water temperature in Celsius.""" + return self.water_temp_c + + @property + def water_temperature_c_max(self) -> float | None: + """Return daily maximum pool water temperature in Celsius.""" + return _mk_to_celsius(self.temp_mk_daily_max) + + @property + def water_temperature_c_min(self) -> float | None: + """Return daily minimum pool water temperature in Celsius.""" + return _mk_to_celsius(self.temp_mk_daily_min) + + def supports_sensor(self, sensor_key: str) -> bool: + """Disable RF RSSI sensor when not reported by the device.""" + if sensor_key == "rf_rssi": + return False + return super().supports_sensor(sensor_key) + + def __str__(self) -> str: + """Return human readable description including water temperature.""" + base = super().__str__() + details: list[str] = [] + if self.water_temp_c is not None: + details.append(f"{self.water_temp_c:.1f}°C water") + if self.water_temp_f is not None: + details.append(f"{self.water_temp_f:.1f}°F water") + if details: + base = _append_detail_text(base, " / ".join(details)) + if self.battery_level is not None: + base = _append_detail_text(base, f"Battery {self.battery_level}%") + return base + + def iter_sensor_mappings(self) -> Iterable[DeviceSensorMapping]: + """Return sensor mappings exposed by the pool sensor.""" + yield from super().iter_sensor_mappings() + yield attr_mapping("pool_water_temp_max", attr="water_temperature_c_max") + yield attr_mapping("pool_water_temp_min", attr="water_temperature_c_min") + + +class DiivooGatewayHub(HomgarHubDevice): + """Diivoo Wi-Fi gateway hub.""" + + MODEL_CODES: ClassVar[list[int]] = [256] + FRIENDLY_DESC: ClassVar[str] = "Diivoo WiFi Hub" + HAS_BATTERY: ClassVar[bool] = False + INCLUDE_BASE_TEMPERATURE: ClassVar[bool] = False + INCLUDE_BASE_HUMIDITY: ClassVar[bool] = False class RainPoint2ZoneTimer(HomgarSubDevice): - MODEL_CODES = [261] - FRIENDLY_DESC = "2-Zone Water Timer" + """RainPoint two-zone water timer.""" - def _parse_device_specific_status_d_value(self, s): - """ - TODO deduce meaning of these fields. - Observed example value: - 0,9,0,0,0,0|0,1291,0,0,0,0 + MODEL_CODES: ClassVar[list[int]] = [261] + FRIENDLY_DESC: ClassVar[str] = "2-Zone Water Timer" + + def __init__(self, **kwargs: Any) -> None: + """Initialise placeholder attributes for the timer.""" + super().__init__(**kwargs) + self.zone_status: str | None = None - What we know so far: - left/right zone separated by '|' character - fields for each zone: ?,last-usage[.1l],?,?,?,? + def _parse_device_specific_status_d_value(self, value: str) -> None: + """Store the raw zone status for further analysis. + + Observed example value: ``0,9,0,0,0,0|0,1291,0,0,0,0``. """ - pass + + self.zone_status = value -MODEL_CODE_MAPPING = { - code: clazz - for clazz in ( +MODEL_CODE_MAPPING: dict[int, type[HomgarDevice]] = { + code: device_class + for device_class in ( RainPointDisplayHub, + RainPointGatewayHub, RainPointSoilMoistureSensor, RainPointRainSensor, RainPointAirSensor, - RainPoint2ZoneTimer - ) for code in clazz.MODEL_CODES + RainPointCO2Sensor, + DiivooWaterFlowMeter, + RainPointPoolSensor, + DiivooGatewayHub, + RainPoint2ZoneTimer, + ) + for code in device_class.MODEL_CODES } diff --git a/homgarapi/dp_decoder.py b/homgarapi/dp_decoder.py new file mode 100644 index 0000000..2fa0d64 --- /dev/null +++ b/homgarapi/dp_decoder.py @@ -0,0 +1,193 @@ +"""Helpers for decoding HomGar/Tuya raw device status payloads.""" + +from __future__ import annotations + +from collections.abc import Mapping +from dataclasses import dataclass +from functools import lru_cache +from typing import Any + +from .dp_spec_builder import get_model_dp_specs + + +@dataclass(frozen=True) +class DecodedStatus: + """Container with decoded datapoint values.""" + + values: dict[str, Any] + raw_items: list[tuple[int, bytes]] + + +@lru_cache(maxsize=64) +def _get_dp_specs_by_model(model_code: int) -> dict[int, Mapping[str, Any]]: + """Return datapoint specifications for the provided model code.""" + raw_specs = get_model_dp_specs(model_code) + return {dp_code: dict(spec) for dp_code, spec in raw_specs.items()} + +_DP_OVERRIDES: dict[int, dict[int, str]] = { + 87: { + 4: "STA_HOUR_RAIN", + 5: "STA_DAY_RAIN", + 6: "STA_7DAY_RAIN", + }, + 268: {10: "signal_strength"}, +} + +_BATTERY_STATE_LABELS: dict[int, str] = { + 1: "normal", + 2: "low", + 3: "critical", +} + + +def _strip_prefix(value: str) -> tuple[bytes, bool]: + """Strip the leading header (`10#…`) and return bytes + z3 flag.""" + if not value: + return b"", False + + z3 = False + payload_hex = value + if "#" in value: + hdr, payload_hex = value.split("#", 1) + if len(hdr) >= 2: + z3 = hdr[1] == "1" + payload_hex = payload_hex.strip() + if len(payload_hex) % 2 == 1: + payload_hex = "0" + payload_hex + return bytes.fromhex(payload_hex), z3 + + +def _iter_tlv_items(data: bytes, z3: bool) -> list[tuple[int, bytes]]: + """Yield `(dp_code, raw_bytes)` pairs from the encoded payload.""" + items: list[tuple[int, bytes]] = [] + idx = 0 + + if z3 and idx < len(data): + idx += 1 # drop leading dp id for z3 streams + + while idx < len(data): + header = data[idx] + idx += 1 + + if (header >> 7) & 1 == 0: + dp_code = (header >> 4) & 0x07 + items.append((dp_code, b"")) + continue + + code_bits = (header >> 2) & 0x1F + length = (header & 0x03) + 1 + + if code_bits <= 30: + dp_code = code_bits + 8 + else: + if idx >= len(data): + break + dp_code = data[idx] + idx += 1 + + if idx + length > len(data): + break + + value = data[idx : idx + length] + idx += length + items.append((dp_code, value)) + + return items + + +def _decode_numeric(value: bytes, specs: Mapping[str, Any]) -> float | int: + """Decode Tuya numeric value respecting decimal and signedness.""" + decimal_raw = specs.get("decimal") + decimal: int + if isinstance(decimal_raw, int): + decimal = decimal_raw + elif isinstance(decimal_raw, str) and decimal_raw.isdigit(): + decimal = int(decimal_raw) + else: + decimal = 0 + + data_type_sub = specs.get("dataTypeSub") + signed = str(data_type_sub) == "6" + + raw: int = int.from_bytes(value, byteorder="little", signed=signed) + if decimal: + result: float = float(raw) / (10**decimal) + return result + return raw + + +def decode_status_payload( + value: str, + *, + model_code: int, + prefer_celsius: bool = True, +) -> DecodedStatus: + """Decode a HomGar raw status payload.""" + data, z3 = _strip_prefix(value) + items = _iter_tlv_items(data, z3) + specs_map = _get_dp_specs_by_model(model_code) + + decoded: dict[str, Any] = {} + + for dp_code, raw in items: + specs = specs_map.get(dp_code) + identity = specs.get("identity") if specs else None + override_identity = _DP_OVERRIDES.get(model_code, {}).get(dp_code) + if override_identity: + identity = override_identity + + if not identity: + decoded[f"dp_{dp_code}"] = raw + continue + + data_type = specs.get("dataType") if specs else None + + processed: Any + if identity == "signal_strength": + processed = int.from_bytes(raw or b"\x00", byteorder="little", signed=False) + elif data_type == 1 and raw and specs: + processed = _decode_numeric(raw, specs) + elif data_type == 2 and raw: + processed = int.from_bytes(raw, byteorder="little", signed=False) + else: + processed = raw + + if identity == "STA_TEM": + temp_f = float(processed) + decoded["temperature_f"] = temp_f + if prefer_celsius: + decoded["temperature_c"] = round((temp_f - 32.0) * 5.0 / 9.0, 1) + else: + decoded["temperature_c"] = temp_f + elif identity in {"STA_HOUR_RAIN", "STA_DAY_RAIN", "STA_7DAY_RAIN"}: + if isinstance(processed, bytes): + processed_value = int.from_bytes(processed, byteorder="little", signed=False) + else: + processed_value = int(processed) + decoded[identity] = processed_value + elif identity == "STA_RH": + decoded["humidity_pct"] = processed + elif identity == "STA_ILLUMINANCE": + decoded["illuminance_lux"] = processed + elif identity == "STA_CO2": + if isinstance(processed, bytes): + processed_value = int.from_bytes( + processed, + byteorder="little", + signed=False, + ) + else: + processed_value = int(processed) + decoded[identity] = processed_value + continue + elif identity == "STA_BAT": + decoded["battery_state_raw"] = processed + decoded["battery_state"] = _BATTERY_STATE_LABELS.get(processed, processed) + elif identity == "STA_RSSI": + decoded["signal_strength"] = processed + elif identity == "STA_TREND": + decoded["trend_raw"] = processed + else: + decoded[identity] = processed + + return DecodedStatus(values=decoded, raw_items=items) diff --git a/homgarapi/dp_spec_builder.py b/homgarapi/dp_spec_builder.py new file mode 100644 index 0000000..a0d7d3c --- /dev/null +++ b/homgarapi/dp_spec_builder.py @@ -0,0 +1,226 @@ +"""Utilities for constructing dynamic datapoint specs from product model metadata.""" + +from __future__ import annotations + +from collections.abc import Iterable, Mapping +import json +from pathlib import Path +from typing import Any + +from .constants import PRODUCT_MODEL_SPECS + +_DEFAULT_PRODUCT_MODELS_PATH = Path(__file__).parent / "product_models.json" +_DEFAULT_MODEL_SPECS_PATH = Path(__file__).parent / "model_specs.json" + +_DYNAMIC_MODEL_SPECS: dict[int, dict[int, dict[str, Any]]] = {} +_MODEL_SPEC_CACHE_STATE: dict[str, bool] = {"loaded": False} +_MODELS_PAYLOAD_CACHE: dict[str, list[Mapping[str, Any]]] = {} + + +def _ensure_cached_model_specs_loaded() -> None: + """Populate dynamic specs from the generated cache file if present.""" + + if _MODEL_SPEC_CACHE_STATE["loaded"]: + return + _MODEL_SPEC_CACHE_STATE["loaded"] = True + if not _DEFAULT_MODEL_SPECS_PATH.exists(): + return + try: + payload = json.loads(_DEFAULT_MODEL_SPECS_PATH.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return + + if not isinstance(payload, Mapping): + return + + for model_code_str, dp_map in payload.items(): + try: + model_code = int(model_code_str) + except (TypeError, ValueError): + continue + if model_code in PRODUCT_MODEL_SPECS: + continue + if not isinstance(dp_map, Mapping): + continue + converted: dict[int, dict[str, Any]] = {} + for dp_code_str, spec in dp_map.items(): + try: + dp_code = int(dp_code_str) + except (TypeError, ValueError): + continue + if isinstance(spec, Mapping): + converted[dp_code] = dict(spec) + if converted: + _DYNAMIC_MODEL_SPECS[model_code] = converted + + +def _load_product_models_payload(path: Path) -> list[Mapping[str, Any]]: + """Return the list of model definitions from a product models JSON file.""" + + source_key = str(path.resolve()) + if source_key in _MODELS_PAYLOAD_CACHE: + return _MODELS_PAYLOAD_CACHE[source_key] + try: + raw = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return [] + + models_candidate: Any = [] + if isinstance(raw, Mapping): + if isinstance(raw.get("models"), list): + models_candidate = raw["models"] + else: + data = raw.get("data") + if isinstance(data, Mapping) and isinstance(data.get("models"), list): + models_candidate = data["models"] + + if isinstance(models_candidate, list): + models: list[Mapping[str, Any]] = [ + entry for entry in models_candidate if isinstance(entry, Mapping) + ] + else: + models = [] + + _MODELS_PAYLOAD_CACHE[source_key] = models + return models + + +def load_product_models_payload(path: Path | None = None) -> list[Mapping[str, Any]]: + """Load the raw product models payload from disk.""" + + target = path or _DEFAULT_PRODUCT_MODELS_PATH + if not target.exists(): + return [] + return list(_load_product_models_payload(target)) + + +def extract_model_specs( + models: Iterable[Mapping[str, Any]], + model_codes: Iterable[int], +) -> dict[int, dict[int, dict[str, Any]]]: + """Build a trimmed datapoint spec mapping for the requested model codes.""" + + requested = {int(code) for code in model_codes} + include_all = not requested + result: dict[int, dict[int, dict[str, Any]]] = {} + + for model in models: + model_code_raw = model.get("modelCode") + if isinstance(model_code_raw, int): + model_code = model_code_raw + elif isinstance(model_code_raw, str) and model_code_raw.isdigit(): + model_code = int(model_code_raw) + else: + continue + if not include_all and model_code not in requested: + continue + dp_entries = model.get("dp") + if not isinstance(dp_entries, list): + continue + specs_map: dict[int, dict[str, Any]] = {} + for dp in dp_entries: + if not isinstance(dp, Mapping): + continue + dp_code_raw = dp.get("dpCode") + if isinstance(dp_code_raw, int): + dp_code = dp_code_raw + elif isinstance(dp_code_raw, str) and dp_code_raw.isdigit(): + dp_code = int(dp_code_raw) + else: + continue + raw_specs = dp.get("specs") + specs_mapping = raw_specs if isinstance(raw_specs, Mapping) else {} + specs_map[dp_code] = { + "identity": dp.get("identity"), + "dataType": specs_mapping.get("dataType"), + "dataTypeSub": specs_mapping.get("dataTypeSub"), + "length": specs_mapping.get("length"), + "decimal": specs_mapping.get("decimal"), + } + result[model_code] = specs_map + return result + + +def ensure_model_specs( + model_codes: Iterable[int], + *, + source_path: Path | None = None, +) -> dict[int, dict[int, dict[str, Any]]]: + """Ensure datapoint specs are available for the requested model codes.""" + + codes = [int(code) for code in model_codes] + _ensure_cached_model_specs_loaded() + + resolved: dict[int, dict[int, dict[str, Any]]] = {} + missing: set[int] = set() + + for code in codes: + if code in PRODUCT_MODEL_SPECS: + resolved[code] = { + int(dp_code): dict(spec) + for dp_code, spec in PRODUCT_MODEL_SPECS[code].items() + } + elif code in _DYNAMIC_MODEL_SPECS: + resolved[code] = _DYNAMIC_MODEL_SPECS[code] + else: + missing.add(code) + + if not missing: + return resolved + + path = source_path or _DEFAULT_PRODUCT_MODELS_PATH + if not path.exists(): + return resolved + + models = _load_product_models_payload(path) + extracted = extract_model_specs(models, missing) + + for code in missing: + specs = extracted.get(code, {}) + if code not in PRODUCT_MODEL_SPECS: + _DYNAMIC_MODEL_SPECS[code] = specs + resolved[code] = specs + + return resolved + + +def get_model_dp_specs( + model_code: int, + *, + source_path: Path | None = None, +) -> dict[int, dict[str, Any]]: + """Return datapoint specifications for a single model code.""" + + specs = ensure_model_specs([model_code], source_path=source_path) + return specs.get(model_code, {}) + + +def save_model_specs( + specs: Mapping[int, Mapping[int, Mapping[str, Any]]], + path: Path | None = None, + *, + update_cache: bool = True, +) -> None: + """Persist a trimmed spec cache to disk.""" + + target = path or _DEFAULT_MODEL_SPECS_PATH + serialisable = { + str(model_code): {str(dp_code): dict(dp_spec) for dp_code, dp_spec in dp_map.items()} + for model_code, dp_map in specs.items() + } + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(json.dumps(serialisable, indent=2, sort_keys=True), encoding="utf-8") + + if update_cache and (path is None or target == _DEFAULT_MODEL_SPECS_PATH): + _ensure_cached_model_specs_loaded() + for model_code, dp_map in specs.items(): + if model_code in PRODUCT_MODEL_SPECS: + continue + converted_specs: dict[int, dict[str, Any]] = {} + for dp_code, spec in dp_map.items(): + try: + dp_code_int = int(dp_code) + except (TypeError, ValueError): + continue + converted_specs[dp_code_int] = dict(spec) + _DYNAMIC_MODEL_SPECS[int(model_code)] = converted_specs diff --git a/homgarapi/logutil.py b/homgarapi/logutil.py index 2746bc7..51fe552 100644 --- a/homgarapi/logutil.py +++ b/homgarapi/logutil.py @@ -1,8 +1,16 @@ +"""Logging helpers for the HomGar API package.""" + import logging from pathlib import Path TRACE = logging.DEBUG - 1 +"""Custom TRACE logging level used by the HomGar client.""" def get_logger(file: str) -> logging.Logger: + """Return a module-level logger by stem name of the provided path. + + :param file: File path whose stem will be used as the logger name. + :returns: A logger instance tied to the provided filename stem. + """ return logging.getLogger(Path(file).stem)