From f9b5714e2963fe1303ad8f7726eb88c20947ad0e Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Thu, 1 Jan 2026 23:05:59 +0100 Subject: [PATCH 01/19] feat: Enhance APIClient to support legacy and modern API detection - Added automatic detection of API generation (modern vs legacy) in APIClient. - Implemented endpoint mapping for legacy Communication and Print Board. - Updated raw GET and POST methods to handle different API structures. - Introduced properties for API version and board type. - Enhanced error handling for connection issues and 404 responses. - Updated models to normalize node fields between API versions. - Added tests for both modern and legacy API interactions, including node actions and configurations. --- src/ducopy/cli.py | 206 ++++++++++++++++-- src/ducopy/rest/client.py | 435 ++++++++++++++++++++++++++++++++++++-- src/ducopy/rest/models.py | 19 +- src/ducopy/rest/utils.py | 40 ++-- tests/test_client.py | 184 +++++++++++++--- tests/test_restapi.py | 12 +- 6 files changed, 817 insertions(+), 79 deletions(-) diff --git a/src/ducopy/cli.py b/src/ducopy/cli.py index dcd7684..6da52b7 100644 --- a/src/ducopy/cli.py +++ b/src/ducopy/cli.py @@ -72,8 +72,17 @@ def validate_url(url: str) -> str: def print_output(data: Any, format: str) -> None: # noqa: ANN401 """Print output in the specified format.""" - if isinstance(data, BaseModel): # Check if data is a Pydantic model instance - data = data.dict() # Use `.dict()` for JSON serialization + # Recursively convert Pydantic models to dicts + def convert_to_dict(obj): + if isinstance(obj, BaseModel): + return obj.dict() if hasattr(obj, 'dict') else obj.model_dump() + elif isinstance(obj, dict): + return {k: convert_to_dict(v) for k, v in obj.items()} + elif isinstance(obj, (list, tuple)): + return [convert_to_dict(item) for item in obj] + return obj + + data = convert_to_dict(data) if format == "json": typer.echo(json.dumps(data, indent=4)) @@ -207,18 +216,21 @@ def raw_patch( def change_action_node( base_url: str, node_id: int, - action: Annotated[str, typer.Option(help="The action key to include in the JSON body")], - value: Annotated[str, typer.Option(help="The value key to include in the JSON body")], + action: Annotated[str, typer.Option(help="The action key (Connectivity Board only, use any value for Communication and Print Board)")], + value: Annotated[str, typer.Option(help="The state/value to set (e.g., AUTO, MAN1, AUT1)")], format: Annotated[str, typer.Option(help="Output format: pretty or json")] = "pretty", ) -> None: """ - Perform a POST action by sending a JSON body to the endpoint. + Change the action/state for a specific node. + + - Connectivity Board: Sends a POST request with JSON body to /action/nodes/{node_id} + - Communication and Print Board: Sends a GET request to /nodesetoperstate?node={node_id}&value={value} Args: base_url (str): The base URL of the API. node_id (int): The ID of the node to perform the action on. - action (str): The action key to include in the JSON body. - value (str): The value key to include in the JSON body. + action (str): The action key (used only for Connectivity Board validation). + value (str): The state/value to set (e.g., AUTO, MAN1, AUT1, MAN2, AUT2, etc.). format (str): Output format: pretty or json. """ base_url = validate_url(base_url) @@ -227,8 +239,8 @@ def change_action_node( response = facade.change_action_node(action=action, value=value, node_id=node_id) print_output(response, format) except Exception as e: - logger.error("Error performing POST action for node {}: {}", node_id, e) - typer.echo(f"Failed to perform POST action for node {node_id}: {e}") + logger.error("Error changing node action for node {}: {}", node_id, e) + typer.echo(f"Failed to change node action for node {node_id}: {e}") raise typer.Exit(code=1) @@ -280,8 +292,26 @@ def get_config_nodes( base_url = validate_url(base_url) facade = DucoPy(base_url) try: - response = facade.get_config_nodes() - print_output(response, format) + # Get generation info + generation_info = { + "generation": facade.client.generation, + "board_type": facade.client.board_type, + "api_version": facade.client.api_version, + "public_api_version": facade.client.public_api_version, + "is_modern_api": facade.client.is_modern_api, + "is_legacy_api": facade.client.is_legacy_api, + } + + # Get config data + config_data = facade.get_config_nodes() + + # Combine both + output = { + "generation_info": generation_info, + "config_nodes": config_data, + } + + print_output(output, format) except Exception as e: logger.error("Error fetching configuration for all nodes: {}", str(e)) typer.echo(f"Failed to fetch configuration for all nodes: {e}") @@ -306,10 +336,30 @@ def get_info( parameter: str = None, format: Annotated[str, typer.Option(help="Output format: pretty or json")] = "pretty", ) -> None: - """Retrieve general API information with optional filters.""" + """Retrieve general API information with optional filters. Also displays API generation info.""" base_url = validate_url(base_url) facade = DucoPy(base_url) - print_output(facade.get_info(module=module, submodule=submodule, parameter=parameter), format) + + # Get the generation info + generation_info = { + "generation": facade.client.generation, + "board_type": facade.client.board_type, + "api_version": facade.client.api_version, + "public_api_version": facade.client.public_api_version, + "is_modern_api": facade.client.is_modern_api, + "is_legacy_api": facade.client.is_legacy_api, + } + + # Get the regular info + info_data = facade.get_info(module=module, submodule=submodule, parameter=parameter) + + # Combine both + output = { + "generation_info": generation_info, + "api_info": info_data, + } + + print_output(output, format) @app.command() @@ -319,7 +369,27 @@ def get_nodes( """Retrieve list of all nodes.""" base_url = validate_url(base_url) facade = DucoPy(base_url) - print_output(facade.get_nodes(), format) + + # Get generation info + generation_info = { + "generation": facade.client.generation, + "board_type": facade.client.board_type, + "api_version": facade.client.api_version, + "public_api_version": facade.client.public_api_version, + "is_modern_api": facade.client.is_modern_api, + "is_legacy_api": facade.client.is_legacy_api, + } + + # Get nodes data + nodes_data = facade.get_nodes() + + # Combine both + output = { + "generation_info": generation_info, + "nodes": nodes_data, + } + + print_output(output, format) @app.command() @@ -329,7 +399,27 @@ def get_node_info( """Retrieve information for a specific node by ID.""" base_url = validate_url(base_url) facade = DucoPy(base_url) - print_output(facade.get_node_info(node_id=node_id), format) + + # Get generation info + generation_info = { + "generation": facade.client.generation, + "board_type": facade.client.board_type, + "api_version": facade.client.api_version, + "public_api_version": facade.client.public_api_version, + "is_modern_api": facade.client.is_modern_api, + "is_legacy_api": facade.client.is_legacy_api, + } + + # Get node info + node_data = facade.get_node_info(node_id=node_id) + + # Combine both + output = { + "generation_info": generation_info, + "node_info": node_data, + } + + print_output(output, format) @app.command() @@ -339,7 +429,27 @@ def get_config_node( """Retrieve configuration settings for a specific node.""" base_url = validate_url(base_url) facade = DucoPy(base_url) - print_output(facade.get_config_node(node_id=node_id), format) + + # Get generation info + generation_info = { + "generation": facade.client.generation, + "board_type": facade.client.board_type, + "api_version": facade.client.api_version, + "public_api_version": facade.client.public_api_version, + "is_modern_api": facade.client.is_modern_api, + "is_legacy_api": facade.client.is_legacy_api, + } + + # Get config data + config_data = facade.get_config_node(node_id=node_id) + + # Combine both + output = { + "generation_info": generation_info, + "config": config_data, + } + + print_output(output, format) @app.command() @@ -351,7 +461,27 @@ def get_action( """Retrieve action data with an optional filter.""" base_url = validate_url(base_url) facade = DucoPy(base_url) - print_output(facade.get_action(action=action), format) + + # Get generation info + generation_info = { + "generation": facade.client.generation, + "board_type": facade.client.board_type, + "api_version": facade.client.api_version, + "public_api_version": facade.client.public_api_version, + "is_modern_api": facade.client.is_modern_api, + "is_legacy_api": facade.client.is_legacy_api, + } + + # Get action data + action_data = facade.get_action(action=action) + + # Combine both + output = { + "generation_info": generation_info, + "action": action_data, + } + + print_output(output, format) @app.command() @@ -364,7 +494,27 @@ def get_actions_node( """Retrieve actions available for a specific node.""" base_url = validate_url(base_url) facade = DucoPy(base_url) - print_output(facade.get_actions_node(node_id=node_id, action=action), format) + + # Get generation info + generation_info = { + "generation": facade.client.generation, + "board_type": facade.client.board_type, + "api_version": facade.client.api_version, + "public_api_version": facade.client.public_api_version, + "is_modern_api": facade.client.is_modern_api, + "is_legacy_api": facade.client.is_legacy_api, + } + + # Get actions data + actions_data = facade.get_actions_node(node_id=node_id, action=action) + + # Combine both + output = { + "generation_info": generation_info, + "actions": actions_data, + } + + print_output(output, format) @app.command() @@ -377,6 +527,26 @@ def get_logs( print_output(facade.get_logs(), format) +@app.command() +def check_generation( + base_url: str, format: Annotated[str, typer.Option(help="Output format: pretty or json")] = "pretty" +) -> None: + """Check the board type (Connectivity Board or Communication and Print Board).""" + base_url = validate_url(base_url) + facade = DucoPy(base_url) + + generation_info = { + "generation": facade.client.generation, + "board_type": facade.client.board_type, + "api_version": facade.client.api_version, + "public_api_version": facade.client.public_api_version, + "is_modern_api": facade.client.is_modern_api, + "is_legacy_api": facade.client.is_legacy_api, + } + + print_output(generation_info, format) + + def entry_point() -> None: """Entry point for the CLI.""" app() # Run the Typer app diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index d5fc1e7..f6cd22d 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -43,6 +43,7 @@ ParameterConfig, NodesInfoResponse, ActionsChangeResponse, + PYDANTIC_V2, ) from ducopy.rest.utils import DucoUrlSession from loguru import logger @@ -53,13 +54,39 @@ class APIClient: - def __init__(self, base_url: HttpUrl, verify: bool = True) -> None: + # Mapping of Connectivity Board endpoints to Communication and Print Board endpoints + # Connectivity Board = modern API (family includes V1 and V2 boards) + # Communication and Print Board = legacy API + GEN1_ENDPOINT_MAP = { + "/info": "/boxinfoget", + "/api": "/boxinfoget", + "/info/nodes": "/nodelist", + "/config/nodes": "/boxinfoget", + } + # Pattern for node-specific endpoints (Connectivity Board -> Communication and Print Board) + # /info/nodes/{id} -> /nodeinfoget?node={id} + + def __init__(self, base_url: HttpUrl, verify: bool = True, auto_detect: bool = True) -> None: self.base_url = base_url if verify: - self.session = DucoUrlSession(base_url, verify=self._duco_pem()) + self.session = DucoUrlSession(base_url, verify=self._duco_pem(), endpoint_mapper=self._map_endpoint) else: - self.session = DucoUrlSession(base_url, verify=verify) + self.session = DucoUrlSession(base_url, verify=verify, endpoint_mapper=self._map_endpoint) + + # API generation tracking + self._api_version = None + self._public_api_version = None + self._generation = None + self._board_type = None + logger.info("APIClient initialized with base URL: {}", base_url) + + # Automatically detect generation if requested + if auto_detect: + try: + self.detect_generation() + except Exception as e: + logger.warning("Failed to auto-detect API generation: {}", e) def _duco_pem(self) -> str: """Enable certificate pinning.""" @@ -68,6 +95,258 @@ def _duco_pem(self) -> str: return str(pem_path) + def detect_generation(self) -> dict: + """ + Detect the API generation by trying HTTPS and HTTP with /info endpoint. + + This method determines whether we're communicating with: + - Connectivity Board (modern API): HTTPS, /info endpoint exists + Includes V1 and V2 variants + - Communication and Print Board (legacy API): HTTP only, /info returns 404 + + Detection logic: + 1. Try HTTPS with /info - if successful, it's Connectivity Board + 2. If HTTPS fails, try HTTP with /info + 3. If /info returns 404, it's Communication and Print Board + 4. If /info succeeds on HTTP, check version to determine board type + + Returns: + dict: API information including version details and board type + """ + logger.info("Detecting API generation...") + + # Check if we're using HTTPS + is_https = str(self.base_url).startswith('https://') + + try: + # Try to get /info endpoint directly (without mapping) + logger.debug("Attempting to fetch /info endpoint...") + response = self.session.get("/info") + response.raise_for_status() + info_response = response.json() + + # If we got here, /info exists + if is_https: + # HTTPS + /info exists = Connectivity Board (modern API) + self._generation = "modern" + self._board_type = "Connectivity Board" + logger.info("Detected Connectivity Board (modern API) - HTTPS with /info endpoint") + else: + # HTTP + /info exists - need to check version + logger.debug("Got /info on HTTP, checking version...") + try: + api_response = self.session.get("/api") + api_response.raise_for_status() + api_info = api_response.json() + + self._api_version = api_info.get("ApiVersion", {}).get("Val") + self._public_api_version = api_info.get("PublicApiVersion", {}).get("Val") + + if self._public_api_version: + version_str = str(self._public_api_version) + if "2." in version_str: + self._generation = "modern" + self._board_type = "Connectivity Board" + elif "1." in version_str: + self._generation = "legacy" + self._board_type = "Communication and Print Board" + else: + self._generation = "unknown" + self._board_type = "Unknown Board" + else: + self._generation = "legacy" + self._board_type = "Communication and Print Board" + except Exception: + # /info works but /api doesn't - assume modern + self._generation = "modern" + self._board_type = "Connectivity Board" + + logger.info( + "API generation detected: {} (Protocol: {}, Board: {})", + self._generation, + "HTTPS" if is_https else "HTTP", + self._board_type + ) + + return { + "generation": self._generation, + "api_version": self._api_version, + "public_api_version": self._public_api_version, + "protocol": "HTTPS" if is_https else "HTTP", + "board_type": self._board_type + } + + except Exception as e: + error_message = str(e) + + # Check if it's a 404 error on /info + if "404" in error_message: + # /info returns 404 = Communication and Print Board (legacy API) + self._generation = "legacy" + self._board_type = "Communication and Print Board" + logger.info("Detected Communication and Print Board (legacy API) - /info endpoint not found (404)") + + return { + "generation": self._generation, + "api_version": None, + "public_api_version": None, + "protocol": "HTTPS" if is_https else "HTTP", + "board_type": self._board_type + } + + # Check if it's a timeout or connection error with HTTPS + if is_https and ("timeout" in error_message.lower() or "connection" in error_message.lower()): + logger.warning("HTTPS connection failed. Communication and Print Board only supports HTTP.") + raise ConnectionError( + "Failed to connect via HTTPS. The Communication and Print Board only supports HTTP connections. " + "Please use 'http://' instead of 'https://' in the URL." + ) from e + + # Other error - connection failed or other issue + logger.error("Failed to detect API generation: {}", e) + self._generation = "unknown" + raise + + @property + def generation(self) -> str | None: + """ + Get the detected API generation. + + Returns: + str | None: 'modern', 'legacy', 'unknown', or None if not detected yet + """ + return self._generation + + @property + def api_version(self) -> str | None: + """ + Get the full API version string. + + Returns: + str | None: The API version or None if not detected yet + """ + return self._api_version + + @property + def public_api_version(self) -> str | None: + """ + Get the public API version string. + + Returns: + str | None: The public API version or None if not detected yet + """ + return self._public_api_version + + @property + def board_type(self) -> str | None: + """ + Get the detected board type. + + Returns: + str | None: 'Connectivity Board', 'Communication and Print Board', or None + """ + return self._board_type + + @property + def is_modern_api(self) -> bool: + """ + Check if the board uses the modern API (Connectivity Board). + + The Connectivity Board family includes V1 and V2 variants. + + Returns: + bool: True if Connectivity Board, False otherwise + """ + return self._generation == "modern" + + @property + def is_legacy_api(self) -> bool: + """ + Check if the board uses the legacy API (Communication and Print Board). + + Returns: + bool: True if Communication and Print Board, False otherwise + """ + return self._generation == "legacy" + + def _map_endpoint(self, endpoint: str) -> str: + """ + Map Connectivity Board endpoints to Communication and Print Board equivalents if using legacy API. + + Args: + endpoint: The Connectivity Board endpoint + + Returns: + str: The appropriate endpoint for the current board type + """ + # Don't map if generation hasn't been detected yet + if self._generation is None: + return endpoint + + if self.is_legacy_api: + # Handle direct mappings + if endpoint in self.GEN1_ENDPOINT_MAP: + mapped = self.GEN1_ENDPOINT_MAP[endpoint] + logger.debug("Mapped endpoint {} to Communication and Print Board endpoint: {}", endpoint, mapped) + return mapped + + # Handle pattern-based mappings for node-specific endpoints + # /config/nodes/{id} -> /nodeconfigget?node={id} + if endpoint.startswith("/config/nodes/"): + node_id = endpoint.split("/")[-1] + mapped = f"/nodeconfigget?node={node_id}" + logger.debug("Mapped endpoint {} to Communication and Print Board endpoint: {}", endpoint, mapped) + return mapped + + return endpoint + + def _transform_gen1_node_info(self, gen1_data: dict) -> dict: + """ + Transform Communication and Print Board node info response to Connectivity Board NodeInfo format. + + Communication and Print Board format: + {"node": 4, "devtype": "UNKN", "addr": 0, "state": "AUTO", "ovrl": 255, "cerr": 0, ...} + + Connectivity Board format: + {"Node": 4, "General": {"Type": {"Val": "..."}, "Addr": 0}, "Ventilation": {...}, ...} + """ + # Extract all sensor fields (snsr, co2, temp, rh, etc.) + sensor_fields = {} + known_sensor_keys = ["snsr", "co2", "temp", "rh", "CO2", "Temp", "RH", "Snsr"] + for key in known_sensor_keys: + if key in gen1_data: + sensor_fields[key.lower()] = gen1_data[key] + + # Also check for any other potential sensor fields by looking for numeric values + # that aren't already captured in other sections + known_non_sensor_keys = {"node", "devtype", "addr", "state", "ovrl", "cerr", "cntdwn", "endtime", "mode"} + for key, value in gen1_data.items(): + if key not in known_non_sensor_keys and isinstance(value, (int, float)): + sensor_fields[key.lower()] = value + + return { + "Node": gen1_data.get("node"), + "General": { + "Type": { + "Id": None, + "Val": gen1_data.get("devtype", "UNKN") + }, + "Addr": gen1_data.get("addr", 0) + }, + "NetworkDuco": { + "CommErrorCtr": gen1_data.get("cerr", 0) + } if gen1_data.get("cerr") is not None else None, + "Ventilation": { + "State": gen1_data.get("state"), + "FlowLvlOvrl": gen1_data.get("ovrl", 0), + "TimeStateRemain": gen1_data.get("cntdwn") if gen1_data.get("cntdwn", 0) != 0 else None, + "TimeStateEnd": gen1_data.get("endtime") if gen1_data.get("endtime", 0) != 0 else None, + "Mode": gen1_data.get("mode") if gen1_data.get("mode") != "-" else None, + "FlowLvlTgt": None, # Not available on Communication and Print Board + } if any(k in gen1_data for k in ["state", "ovrl", "mode"]) else None, + "Sensor": sensor_fields if sensor_fields else None, + } + def raw_get(self, endpoint: str, params: dict = None) -> dict: """ Perform a raw GET request to the specified endpoint. @@ -79,10 +358,13 @@ def raw_get(self, endpoint: str, params: dict = None) -> dict: Returns: dict: JSON response from the server. """ - logger.info("Performing raw GET request to endpoint: {} with params: {}", endpoint, params) - response = self.session.get(endpoint, params=params) + # Map endpoint if using Communication and Print Board + mapped_endpoint = self._map_endpoint(endpoint) + + logger.info("Performing raw GET request to endpoint: {} with params: {}", mapped_endpoint, params) + response = self.session.get(mapped_endpoint, params=params) response.raise_for_status() - logger.debug("Received response for raw GET request to endpoint: {}", endpoint) + logger.debug("Received response for raw GET request to endpoint: {}", mapped_endpoint) return response.json() def raw_post(self, endpoint: str, data: str | None = None) -> dict: @@ -123,16 +405,41 @@ def raw_patch(self, endpoint: str, data: str | None = None) -> dict: def post_action_node(self, action: str, value: str, node_id: int) -> ActionsChangeResponse: """ - Perform a POST action by sending a JSON body to the endpoint. + Perform an action on a node. + + For Connectivity Board: POST to /action/nodes/{node_id} with JSON body + For Communication and Print Board: GET to /nodesetoperstate?node={node_id}&value={value} Args: - action (str): The action key to include in the JSON body. - value (Any): The value key to include in the JSON body. + action (str): The action key (Connectivity Board only, ignored for Communication and Print Board). + value (str): The value/state to set (e.g., 'AUTO', 'AUT1', 'MAN1', etc.). node_id (int): The ID of the node to perform the action on. Returns: - dict: JSON response from the server. + ActionsChangeResponse: Response indicating success or failure. """ + # Communication and Print Board uses a simpler GET-based API + if self.is_legacy_api: + endpoint = "/nodesetoperstate" + logger.info("Setting node {} operation state to {} (Communication and Print Board)", node_id, value) + + response = self.session.get(endpoint, params={"node": node_id, "value": value}) + response.raise_for_status() + + # Parse HTML response for SUCCESS or FAILURE/FAILED + html_content = response.text.strip() + if "SUCCESS" in html_content.upper(): + logger.info("Successfully changed node {} state to {}", node_id, value) + # Return compatible response format + return ActionsChangeResponse(Action=value, Result="Success") + elif "FAIL" in html_content.upper(): + logger.error("Failed to change node {} state to {}: {}", node_id, value, html_content) + raise ValueError(f"Failed to change node {node_id} state to {value}: {html_content}") + else: + logger.warning("Unexpected response from node state change: {}", html_content[:100]) + raise ValueError(f"Unexpected response from board: {html_content[:100]}") + + # Connectivity Board uses the modern POST API with validation # Fetch available actions for the node logger.info("Fetching available actions for node ID: {}", node_id) available_actions = self.get_actions_node(node_id=node_id) @@ -184,6 +491,12 @@ def patch_config_node(self, node_id: int, config: ConfigNodeRequest) -> ConfigNo Returns: ConfigNodeResponse: The updated configuration response from the server. """ + if self.is_legacy_api: + raise NotImplementedError( + "Updating node configuration is not available on the Communication and Print Board. " + "This feature is only available on the Connectivity Board." + ) + logger.info("Updating configuration for node ID: {}", node_id) # Fetch current configuration of the node @@ -255,6 +568,31 @@ def get_config_nodes(self) -> NodesResponse: """ endpoint = "/config/nodes" logger.info("Fetching configuration for all nodes from endpoint: {}", endpoint) + + # Communication and Print Board doesn't have a /config/nodes endpoint - fetch each node individually + if self._generation == "legacy": + # First, get the node list + nodes_response = self.get_nodes() + node_configs = [] + node_ids = [node.Node for node in nodes_response.Nodes] if nodes_response.Nodes else [] + logger.info("Communication and Print Board detected - fetching config for {} nodes", len(node_ids)) + + for node_id in node_ids: + try: + config = self.get_config_node(node_id) + # ConfigNodeResponse and NodeConfig have the same fields, convert via dict + if PYDANTIC_V2: + node_configs.append(config.model_dump()) + else: + node_configs.append(config.dict()) + except Exception as e: + logger.warning("Failed to fetch config for node {}: {}", node_id, e) + # Continue with other nodes even if one fails + + data = {"Nodes": node_configs} + return NodesResponse(**data) + + # Connectivity Board has the /config/nodes endpoint response = self.session.get(endpoint) response.raise_for_status() logger.debug("Received configuration data for all nodes") @@ -263,7 +601,8 @@ def get_config_nodes(self) -> NodesResponse: def get_api_info(self) -> dict: """Fetch API version and available endpoints.""" logger.info("Fetching API information") - response = self.session.get("/api") + endpoint = self._map_endpoint("/api") + response = self.session.get(endpoint) response.raise_for_status() logger.debug("Received API information") return response.json() @@ -271,38 +610,84 @@ def get_api_info(self) -> dict: def get_info(self, module: str = None, submodule: str = None, parameter: str = None) -> dict: """Fetch general API information.""" params = {k: v for k, v in {"module": module, "submodule": submodule, "parameter": parameter}.items() if v} - logger.info("Fetching info with parameters: {}", params) - response = self.session.get("/info", params=params) + + # Map endpoint for Communication and Print Board + endpoint = self._map_endpoint("/info") + logger.info("get_info() called - generation: {}, using endpoint: {}", self._generation, endpoint) + + response = self.session.get(endpoint, params=params) response.raise_for_status() - logger.debug("Received general info") + logger.debug("Received general info from endpoint: {}", endpoint) return response.json() def get_nodes(self) -> NodesInfoResponse: """Retrieve list of all nodes.""" logger.info("Fetching list of all nodes") - response = self.session.get("/info/nodes") + endpoint = self._map_endpoint("/info/nodes") + response = self.session.get(endpoint) response.raise_for_status() logger.debug("Received nodes data") - return NodesInfoResponse(**response.json()) + + data = response.json() + + # Communication and Print Board returns {"nodelist": [1, 2, 3]} instead of {"Nodes": [...]} + # Fetch full info for each node to match Connectivity Board response structure + if self._generation == "legacy" and "nodelist" in data: + node_ids = data["nodelist"] + nodes = [] + logger.info("Communication and Print Board detected - fetching details for {} nodes", len(node_ids)) + for node_id in node_ids: + try: + node_info = self.get_node_info(node_id) + # Append the NodeInfo object directly, no need to convert to dict + nodes.append(node_info) + except Exception as e: + logger.warning("Failed to fetch info for node {}: {}", node_id, e) + # Continue with other nodes even if one fails + data = {"Nodes": nodes} + + return NodesInfoResponse(**data) def get_node_info(self, node_id: int) -> NodeInfo: """Retrieve detailed information for a specific node.""" logger.info("Fetching info for node ID: {}", node_id) - response = self.session.get(f"/info/nodes/{node_id}") + + # Communication and Print Board uses /nodeinfoget?node=X instead of /info/nodes/X + if self._generation == "legacy": + endpoint = "/nodeinfoget" + response = self.session.get(endpoint, params={"node": node_id}) + else: + endpoint = self._map_endpoint(f"/info/nodes/{node_id}") + response = self.session.get(endpoint) + response.raise_for_status() logger.debug("Received node info for node ID: {}", node_id) - return NodeInfo(**response.json()) # Direct instantiation for Pydantic 1.x + + data = response.json() + + # Transform Communication and Print Board response to Connectivity Board format + if self._generation == "legacy": + data = self._transform_gen1_node_info(data) + + return NodeInfo(**data) # Direct instantiation for Pydantic 1.x def get_config_node(self, node_id: int) -> ConfigNodeResponse: """Retrieve configuration settings for a specific node.""" logger.info("Fetching configuration for node ID: {}", node_id) - response = self.session.get(f"/config/nodes/{node_id}") + endpoint = self._map_endpoint(f"/config/nodes/{node_id}") + response = self.session.get(endpoint) response.raise_for_status() logger.debug("Received config for node ID: {}", node_id) return ConfigNodeResponse(**response.json()) # Direct instantiation for Pydantic 1.x def get_action(self, action: str = None) -> dict: """Retrieve action data.""" + if self.is_legacy_api: + raise NotImplementedError( + "Action retrieval is not available on the Communication and Print Board. " + "This feature is only available on the Connectivity Board." + ) + logger.info("Fetching action data for action: {}", action) params = {"action": action} if action else {} response = self.session.get("/action", params=params) @@ -312,6 +697,12 @@ def get_action(self, action: str = None) -> dict: def get_actions_node(self, node_id: int, action: str = None) -> ActionsResponse: """Retrieve available actions for a specific node.""" + if self.is_legacy_api: + raise NotImplementedError( + "Node actions are not available on the Communication and Print Board. " + "This feature is only available on the Connectivity Board." + ) + logger.info("Fetching actions for node ID: {} with action filter: {}", node_id, action) params = {"action": action} if action else {} response = self.session.get(f"/action/nodes/{node_id}", params=params) @@ -321,6 +712,12 @@ def get_actions_node(self, node_id: int, action: str = None) -> ActionsResponse: def get_logs(self) -> dict: """Retrieve API logs.""" + if self.is_legacy_api: + raise NotImplementedError( + "API logs are not available on the Communication and Print Board. " + "This feature is only available on the Connectivity Board." + ) + logger.info("Fetching API logs") response = self.session.get("/log/api") response.raise_for_status() diff --git a/src/ducopy/rest/models.py b/src/ducopy/rest/models.py index 43b6ea7..ba1ea6f 100644 --- a/src/ducopy/rest/models.py +++ b/src/ducopy/rest/models.py @@ -117,6 +117,13 @@ class NodeConfig(BaseModel): FlowLvlSwitch: ParameterConfig | None = None Name: ParameterConfig | None = None + @unified_validator() + def normalize_node_field(cls, values: dict[str, Any]) -> dict[str, Any]: + """Normalize 'node' (Communication and Print Board) to 'Node' (Connectivity Board)""" + if "node" in values and "Node" not in values: + values["Node"] = values.pop("node") + return values + class NodesResponse(BaseModel): Nodes: list[NodeConfig] @@ -222,6 +229,13 @@ class ConfigNodeResponse(BaseModel): FlowLvlSwitch: ParameterConfig | None = None Name: ParameterConfig | None = None + @unified_validator() + def normalize_node_field(cls, values: dict[str, Any]) -> dict[str, Any]: + """Normalize 'node' (Communication and Print Board) to 'Node' (Connectivity Board)""" + if "node" in values and "Node" not in values: + values["Node"] = values.pop("node") + return values + class ConfigNodeRequest(BaseModel): Name: str | None = None @@ -265,5 +279,6 @@ class ActionsResponse(BaseModel): class ActionsChangeResponse(BaseModel): - Code: int - Result: str \ No newline at end of file + Code: int | None = None + Result: str + Action: str | None = None \ No newline at end of file diff --git a/src/ducopy/rest/utils.py b/src/ducopy/rest/utils.py index 468295e..b326827 100644 --- a/src/ducopy/rest/utils.py +++ b/src/ducopy/rest/utils.py @@ -67,16 +67,19 @@ def cert_verify(self, conn: requests.adapters.HTTPAdapter, url: str, verify: boo class DucoUrlSession(requests.Session): - def __init__(self, base_url: str, verify: bool | str = True) -> None: + def __init__(self, base_url: str, verify: bool | str = True, endpoint_mapper: callable = None, timeout: int = 10) -> None: """ Initializes the BaseUrlSession with a base URL and optional SSL verification setting. Args: base_url (str): The base URL to prepend to relative URLs. verify (bool | str): Path to the certificate or a boolean indicating SSL verification. + endpoint_mapper (callable): Optional function to map endpoints for API generation compatibility. + timeout (int): Request timeout in seconds. Defaults to 10 seconds. """ super().__init__() self.base_url = base_url + self.timeout = timeout if isinstance(verify, str): # Configure SSLContext to ignore hostname verification @@ -94,6 +97,7 @@ def __init__(self, base_url: str, verify: bool | str = True) -> None: self.api_key: str | None = None self.api_key_timestamp: float = 0.0 self.api_key_cache_duration: int = 60 + self.endpoint_mapper: callable | None = endpoint_mapper logger.info("Initialized DucoUrlSession for base URL: {}", base_url) @@ -101,20 +105,31 @@ def _ensure_apikey(self) -> None: """Refresh API key if expired or missing.""" if not self.api_key or (time.time() - self.api_key_timestamp) > self.api_key_cache_duration: logger.debug("API key is missing or expired. Fetching a new one.") - req = self.request("GET", "/info", ensure_apikey=False) + endpoint = self.endpoint_mapper("/info") if self.endpoint_mapper else "/info" + req = self.request("GET", endpoint, ensure_apikey=False) req.raise_for_status() data = req.json() - ducomac = data["General"]["Lan"]["Mac"]["Val"] - ducoserial = data["General"]["Board"]["SerialBoardBox"]["Val"] - ducotime = data["General"]["Board"]["Time"]["Val"] - - apigen = ApiKeyGenerator() - self.api_key = apigen.generate_api_key(ducoserial, ducomac, ducotime) - self.api_key_timestamp = time.time() - - self.headers.update({"Api-Key": self.api_key}) - logger.debug(f"Api-Key: {self.api_key}") + # Check if this is a Connectivity Board (modern) API response with nested structure + # Connectivity Board has data["General"]["Lan"]["Mac"]["Val"] + # Communication and Print Board has a flatter structure and doesn't use API keys + if "Lan" in data.get("General", {}): + # Connectivity Board API - generate API key + ducomac = data["General"]["Lan"]["Mac"]["Val"] + ducoserial = data["General"]["Board"]["SerialBoardBox"]["Val"] + ducotime = data["General"]["Board"]["Time"]["Val"] + + apigen = ApiKeyGenerator() + self.api_key = apigen.generate_api_key(ducoserial, ducomac, ducotime) + self.api_key_timestamp = time.time() + + self.headers.update({"Api-Key": self.api_key}) + logger.debug(f"Api-Key: {self.api_key}") + else: + # Communication and Print Board - doesn't use API keys + logger.debug("Communication and Print Board detected - API keys not required") + self.api_key = "legacy-no-key-required" + self.api_key_timestamp = time.time() logger.info("API key refreshed at {}", time.ctime(self.api_key_timestamp)) def request( @@ -143,6 +158,7 @@ def request( url = urljoin(self.base_url, url) kwargs.setdefault("verify", self.verify) + kwargs.setdefault("timeout", self.timeout) max_retries = 3 for attempt in range(max_retries): diff --git a/tests/test_client.py b/tests/test_client.py index a1fa572..0d9019c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -25,9 +25,28 @@ def mock_info_endpoint(mock_requests: requests_mock.Mocker) -> None: mock_requests.get(f"{BASE_URL}/info", json=mock_data) +def mock_detection_endpoint_modern(mock_requests: requests_mock.Mocker) -> None: + """Mock the detection endpoint for Connectivity Board (modern API).""" + mock_data = { + "General": { + "Lan": {"Mac": {"Val": "00:00:00:00:00:00"}}, + "Board": {"SerialBoardBox": {"Val": "MOCKSERIAL123456"}, "Time": {"Val": 1730471603}}, + } + } + mock_requests.get(f"{BASE_URL}/info", json=mock_data) + + +def mock_detection_endpoint_legacy(mock_requests: requests_mock.Mocker) -> None: + """Mock the detection endpoint for Communication and Print Board (legacy API).""" + mock_requests.get(f"{BASE_URL}/info", status_code=404) + # Mock /boxinfoget for legacy API key generation + mock_data = {"General": {"Time": 1730471603}} + mock_requests.get(f"{BASE_URL}/boxinfoget", json=mock_data) + + @pytest.fixture def client() -> APIClient: - return APIClient(base_url=BASE_URL) + return APIClient(base_url=BASE_URL, auto_detect=False) @pytest.fixture @@ -37,7 +56,10 @@ def mock_requests() -> Any: # noqa: ANN401 def test_get_api_info(client: APIClient, mock_requests: requests_mock.Mocker) -> None: - mock_info_endpoint(mock_requests) + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + mock_data = load_mock_data("api_info.json") mock_requests.get(f"{BASE_URL}/api", json=mock_data) @@ -46,44 +68,65 @@ def test_get_api_info(client: APIClient, mock_requests: requests_mock.Mocker) -> def test_get_nodes(client: APIClient, mock_requests: requests_mock.Mocker) -> None: - mock_info_endpoint(mock_requests) + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + mock_data = load_mock_data("nodes.json") mock_requests.get(f"{BASE_URL}/info/nodes", json=mock_data) - nodes_response = NodesInfoResponse(**mock_data) # Instantiate NodesInfoResponse directly with data - assert isinstance(nodes_response, NodesInfoResponse) - assert len(nodes_response.Nodes) == len(mock_data["Nodes"]) + # Test the actual client method call + response = client.get_nodes() + assert isinstance(response, NodesInfoResponse) + assert len(response.Nodes) == len(mock_data["Nodes"]) def test_post_action_node(client: APIClient, mock_requests: requests_mock.Mocker) -> None: - mock_info_endpoint(mock_requests) + mock_detection_endpoint_modern(mock_requests) + # Set generation manually since auto-detect is disabled + client._generation = "modern" + client._board_type = "Connectivity Board" + + # Mock get_actions_node for validation + actions_data = load_mock_data("actions_node_1.json") + mock_requests.get(f"{BASE_URL}/action/nodes/1", json=actions_data) + mock_data = load_mock_data("set_actions_node_1.json") mock_requests.post(f"{BASE_URL}/action/nodes/1", json=mock_data) - action_response = ActionsChangeResponse(**mock_data) # Instantiate ActionsChangeResponse directly with data - assert isinstance(action_response, ActionsChangeResponse) - assert action_response.Code == mock_data["Code"] - assert action_response.Result == mock_data["Result"] + # Test the actual client method call - use AUTO which is valid in actions_node_1.json + response = client.post_action_node(action="SetVentilationState", value="AUTO", node_id=1) + assert isinstance(response, ActionsChangeResponse) + assert response.Code == mock_data.get("Code") + assert response.Result == mock_data["Result"] def test_get_node_info(client: APIClient, mock_requests: requests_mock.Mocker) -> None: - mock_info_endpoint(mock_requests) + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + mock_data = load_mock_data("node_1.json") mock_requests.get(f"{BASE_URL}/info/nodes/1", json=mock_data) - node_info = NodeInfo(**mock_data) # Instantiate NodeInfo directly with data - assert isinstance(node_info, NodeInfo) - assert node_info.Node == mock_data["Node"] + # Test the actual client method call + response = client.get_node_info(node_id=1) + assert isinstance(response, NodeInfo) + assert response.Node == mock_data["Node"] def test_get_config_node(client: APIClient, mock_requests: requests_mock.Mocker) -> None: - mock_info_endpoint(mock_requests) + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + mock_data = load_mock_data("config_node_1.json") mock_requests.get(f"{BASE_URL}/config/nodes/1", json=mock_data) - config_response = ConfigNodeResponse(**mock_data) # Instantiate ConfigNodeResponse directly with data - assert isinstance(config_response, ConfigNodeResponse) - assert config_response.Node == mock_data["Node"] + # Test the actual client method call + response = client.get_config_node(node_id=1) + assert isinstance(response, ConfigNodeResponse) + assert response.Node == mock_data["Node"] # Uncomment this test if needed @@ -97,19 +140,112 @@ def test_get_config_node(client: APIClient, mock_requests: requests_mock.Mocker) def test_get_actions_node(client: APIClient, mock_requests: requests_mock.Mocker) -> None: - mock_info_endpoint(mock_requests) + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + mock_data = load_mock_data("actions_node_1.json") mock_requests.get(f"{BASE_URL}/action/nodes/1", json=mock_data) - actions_response = ActionsResponse(**mock_data) # Instantiate ActionsResponse directly with data - assert isinstance(actions_response, ActionsResponse) - assert len(actions_response.Actions) == len(mock_data["Actions"]) + # Test the actual client method call + response = client.get_actions_node(node_id=1) + assert isinstance(response, ActionsResponse) + assert len(response.Actions) == len(mock_data["Actions"]) def test_get_logs(client: APIClient, mock_requests: requests_mock.Mocker) -> None: - mock_info_endpoint(mock_requests) + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + mock_data = load_mock_data("logs.json") mock_requests.get(f"{BASE_URL}/log/api", json=mock_data) logs_response = client.get_logs() assert logs_response == mock_data + +# Tests for Communication and Print Board (legacy API) + + +def test_post_action_node_legacy(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test post_action_node on Communication and Print Board using GET request.""" + mock_detection_endpoint_legacy(mock_requests) + client._generation = "legacy" + client._board_type = "Communication and Print Board" + + # Mock the GET request to /nodesetoperstate + mock_requests.get(f"{BASE_URL}/nodesetoperstate", text="SUCCESS") + + response = client.post_action_node(action="OperState", value="AUTO", node_id=1) + assert isinstance(response, ActionsChangeResponse) + assert response.Result == "Success" + assert response.Action == "AUTO" + assert response.Code is None + + +def test_post_action_node_legacy_failure(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test post_action_node failure on Communication and Print Board.""" + mock_detection_endpoint_legacy(mock_requests) + client._generation = "legacy" + client._board_type = "Communication and Print Board" + + # Mock the GET request to return FAILED + mock_requests.get(f"{BASE_URL}/nodesetoperstate", text="FAILED") + + with pytest.raises(ValueError, match="Failed to change node 1 state to INVALID"): + client.post_action_node(action="OperState", value="INVALID", node_id=1) + + +def test_get_node_info_legacy(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test get_node_info on Communication and Print Board.""" + mock_detection_endpoint_legacy(mock_requests) + client._generation = "legacy" + client._board_type = "Communication and Print Board" + + # Mock the legacy response format + mock_data = { + "node": 1, + "devtype": "VLVRH", + "addr": 2, + "state": "AUTO", + "ovrl": 255, + "cerr": 0, + "mode": "AUTO", + "snsr": 10, + "co2": 570, + "temp": 21.4, + "rh": 45.5 + } + mock_requests.get(f"{BASE_URL}/nodeinfoget", json=mock_data) + + response = client.get_node_info(node_id=1) + assert isinstance(response, NodeInfo) + assert response.Node == 1 + assert response.General.Type.Val == "VLVRH" + assert response.Sensor is not None + + +def test_get_nodes_legacy(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test get_nodes on Communication and Print Board.""" + mock_detection_endpoint_legacy(mock_requests) + client._generation = "legacy" + client._board_type = "Communication and Print Board" + + # Mock nodelist response + mock_requests.get(f"{BASE_URL}/nodelist", json={"nodelist": [1, 2, 3]}) + + # Mock individual node info responses + for node_id in [1, 2, 3]: + mock_data = { + "node": node_id, + "devtype": "VLVRH", + "addr": node_id, + "state": "AUTO", + "ovrl": 255, + "cerr": 0 + } + mock_requests.get(f"{BASE_URL}/nodeinfoget", json=mock_data) + + response = client.get_nodes() + assert isinstance(response, NodesInfoResponse) + assert len(response.Nodes) == 3 \ No newline at end of file diff --git a/tests/test_restapi.py b/tests/test_restapi.py index ccfb1da..a435d1c 100644 --- a/tests/test_restapi.py +++ b/tests/test_restapi.py @@ -88,16 +88,20 @@ def test_set_actions_node(client: APIClient) -> None: """Test setting actions for a specific node action with SSL verification.""" set_action_response = client.post_action_node(action="SetVentilationState", value="MAN1", node_id=1) assert isinstance(set_action_response, ActionsChangeResponse), "Expected ActionsChangeResponse instance" - assert set_action_response.Code == 0, "Action response code should be 0" - assert set_action_response.Result == "SUCCESS", "Action response result should be SUCCESS" + # Code field is optional, check if present + if set_action_response.Code is not None: + assert set_action_response.Code == 0, "Action response code should be 0" + assert set_action_response.Result in ["SUCCESS", "Success"], "Action response result should be SUCCESS or Success" def test_set_actions_node_insecure(client_insecure: APIClient) -> None: """Test fetching configuration settings for a specific node with SSL verification.""" set_action_response = client_insecure.post_action_node(action="SetVentilationState", value="MAN1", node_id=1) assert isinstance(set_action_response, ActionsChangeResponse), "Expected ActionsChangeResponse instance" - assert set_action_response.Code == 0, "Action response code should be 0" - assert set_action_response.Result == "SUCCESS", "Action response result should be SUCCESS" + # Code field is optional, check if present + if set_action_response.Code is not None: + assert set_action_response.Code == 0, "Action response code should be 0" + assert set_action_response.Result in ["SUCCESS", "Success"], "Action response result should be SUCCESS or Success" def test_get_logs(client: APIClient) -> None: From b731fef6c5502e81e322c83945687087718fef6c Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 15:40:38 +0100 Subject: [PATCH 02/19] feat: Update NodeGeneralInfo and NetworkDucoInfo to handle empty dicts and allow None values --- src/ducopy/rest/models.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/src/ducopy/rest/models.py b/src/ducopy/rest/models.py index ba1ea6f..d9d6ece 100644 --- a/src/ducopy/rest/models.py +++ b/src/ducopy/rest/models.py @@ -136,26 +136,36 @@ class GeneralInfo(BaseModel): class NodeGeneralInfo(BaseModel): Type: GeneralInfo - Addr: int = Field(...) + Addr: int | None = None @unified_validator() def validate_addr(cls, values: dict[str, dict | str | int]) -> dict[str, dict | str | int]: - values["Addr"] = extract_val(values.get("Addr", {})) + addr_value = values.get("Addr", {}) + # Handle empty dict from connectivity boards + if addr_value == {}: + values["Addr"] = None + else: + values["Addr"] = extract_val(addr_value) return values class NetworkDucoInfo(BaseModel): - CommErrorCtr: int = Field(...) + CommErrorCtr: int | None = None @unified_validator() def validate_comm_error_ctr(cls, values: dict[str, dict | str | int]) -> dict[str, dict | str | int]: - values["CommErrorCtr"] = extract_val(values.get("CommErrorCtr", {})) + comm_error_value = values.get("CommErrorCtr", {}) + # Handle empty dict from connectivity boards + if comm_error_value == {}: + values["CommErrorCtr"] = None + else: + values["CommErrorCtr"] = extract_val(comm_error_value) return values class VentilationInfo(BaseModel): State: str | None = None - FlowLvlOvrl: int = Field(...) + FlowLvlOvrl: int | None = None TimeStateRemain: int | None = None TimeStateEnd: int | None = None Mode: str | None = None @@ -199,8 +209,8 @@ def extract_sensor_values(cls, values: dict[str, Any]) -> dict[str, Any]: class NodeInfo(BaseModel): Node: int General: NodeGeneralInfo - NetworkDuco: NetworkDucoInfo | None - Ventilation: VentilationInfo | None + NetworkDuco: NetworkDucoInfo | None = None + Ventilation: VentilationInfo | None = None Sensor: SensorData | None = Field(default=None) From 4e082ac4adbb3a8cab45a041588f3ee583efdc84 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 15:48:59 +0100 Subject: [PATCH 03/19] feat: Improve APIClient generation detection and add security warning for HTTP connections --- src/ducopy/rest/client.py | 13 +++++++++---- src/ducopy/rest/models.py | 26 +++++++++++++++++--------- 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index f6cd22d..46c900e 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -83,10 +83,7 @@ def __init__(self, base_url: HttpUrl, verify: bool = True, auto_detect: bool = T # Automatically detect generation if requested if auto_detect: - try: - self.detect_generation() - except Exception as e: - logger.warning("Failed to auto-detect API generation: {}", e) + self.detect_generation() def _duco_pem(self) -> str: """Enable certificate pinning.""" @@ -168,6 +165,14 @@ def detect_generation(self) -> dict: self._board_type ) + # Warn if modern API is being accessed via HTTP + if self._generation == "modern" and not is_https: + logger.warning( + "Connectivity Board detected but connected via HTTP. " + "For better security, consider using HTTPS instead: https://{}", + str(self.base_url).replace("http://", "").rstrip("/") + ) + return { "generation": self._generation, "api_version": self._api_version, diff --git a/src/ducopy/rest/models.py b/src/ducopy/rest/models.py index d9d6ece..702dcca 100644 --- a/src/ducopy/rest/models.py +++ b/src/ducopy/rest/models.py @@ -34,18 +34,26 @@ # SOFTWARE. # # ensure pydantic 1 and 2 support since HA is in a transition phase -try: - from pydantic import BaseModel, Field, root_validator - - PYDANTIC_V2 = False -except ImportError: - from pydantic import BaseModel, Field, model_validator - - PYDANTIC_V2 = True - +from pydantic import BaseModel, Field from typing import Any, Literal from functools import wraps +try: + from pydantic import VERSION + PYDANTIC_V2 = int(VERSION.split('.')[0]) >= 2 +except (ImportError, AttributeError): + # Fallback: try importing model_validator which only exists in V2 + try: + from pydantic import model_validator + PYDANTIC_V2 = True + except ImportError: + PYDANTIC_V2 = False + +if PYDANTIC_V2: + from pydantic import model_validator +else: + from pydantic import root_validator + def unified_validator(*uargs, **ukwargs): # noqa: ANN201, ANN002, ANN003 """ From 8a625a6361dbd695b2b3d7fac13d60cc0e82eae2 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 15:52:01 +0100 Subject: [PATCH 04/19] feat: Suppress InsecureRequestWarning for SSL verification in DucoUrlSession --- pyproject.toml | 3 +++ src/ducopy/rest/utils.py | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index d151c22..c99c176 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,9 @@ ducopy = "ducopy.cli:entry_point" [tool.pytest.ini_options] testpaths = ["tests"] +filterwarnings = [ + "ignore:Unverified HTTPS request:urllib3.exceptions.InsecureRequestWarning" +] [tool.ruff] line-length = 120 diff --git a/src/ducopy/rest/utils.py b/src/ducopy/rest/utils.py index b326827..a1bf13c 100644 --- a/src/ducopy/rest/utils.py +++ b/src/ducopy/rest/utils.py @@ -41,6 +41,8 @@ import time from ducopy.rest.apikeygenerator import ApiKeyGenerator from loguru import logger +import urllib3 +import warnings # Map the URL to the expected hostname in the certificate @@ -93,6 +95,10 @@ def __init__(self, base_url: str, verify: bool | str = True, endpoint_mapper: ca self.mount("http://", adapter) else: self.verify = verify + # Suppress InsecureRequestWarning when SSL verification is intentionally disabled + if not verify: + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + warnings.filterwarnings('ignore', message='Unverified HTTPS request') self.api_key: str | None = None self.api_key_timestamp: float = 0.0 From 7d24369bcef970c32a6ea2cc52c4e30877a2155a Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 16:07:06 +0100 Subject: [PATCH 05/19] feat: Update README to clarify support for Communication and Print Board and simplify command usage --- README.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 0c233e0..f0072cc 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # DucoPy -**DucoPy** is a Python library and CLI tool that allows for full control of a **DucoBox** ventilation unit equipped with a **DucoBox Connectivity Board**. Using DucoPy, you can retrieve information, control settings, and monitor logs of your DucoBox system directly from your Python environment or command line. +**DucoPy** is a Python library and CLI tool that allows for full control of a **DucoBox** ventilation unit equipped with either a **DucoBox Connectivity Board** (modern API) or **Communication and Print Board** (legacy API). Using DucoPy, you can retrieve information, control settings, and monitor logs of your DucoBox system directly from your Python environment or command line. ## Features @@ -67,7 +67,7 @@ Here is a list of the main methods available in the `DucoPy` facade: - `get_api_info() -> dict`: Retrieve general API information. - `get_info(module: str | None = None, submodule: str | None = None, parameter: str | None = None) -> dict`: Retrieve information about modules and parameters. -- `get_nodes() -> NodesResponse`: Retrieve a list of all nodes in the DucoBox system. +- `get_nodes() -> NodesInfoResponse`: Retrieve a list of all nodes in the DucoBox system. - `get_node_info(node_id: int) -> NodeInfo`: Get details about a specific node by its ID. - `get_config_node(node_id: int) -> ConfigNodeResponse`: Get configuration settings for a specific node. - `get_action(action: str | None = None) -> dict`: Retrieve information about a specific action. @@ -95,39 +95,39 @@ This will display a list of available commands. 1. **Retrieve API information** ```bash - ducopy get-api-info --base-url https://your-ducobox-ip + ducopy get-api-info https://your-ducobox-ip ``` 2. **Get details about nodes** ```bash - ducopy get-nodes --base-url https://your-ducobox-ip + ducopy get-nodes https://your-ducobox-ip ``` 3. **Get information for a specific node** ```bash - ducopy get-node-info --base-url https://your-ducobox-ip --node-id 1 + ducopy get-node-info https://your-ducobox-ip --node-id 1 ``` 4. **Get actions available for a node** ```bash - ducopy get-actions-node --base-url https://your-ducobox-ip --node-id 1 + ducopy get-actions-node https://your-ducobox-ip --node-id 1 ``` 5. **Retrieve system logs** ```bash - ducopy get-logs --base-url https://your-ducobox-ip + ducopy get-logs https://your-ducobox-ip ``` ### Output Formatting -All commands support an optional `--output-format` argument to specify the output format (`pretty` or `json`): +All commands support an optional `--format` argument to specify the output format (`pretty` or `json`): ```bash -ducopy get-nodes --base-url https://your-ducobox-ip --output-format json +ducopy get-nodes https://your-ducobox-ip --format json ``` - `pretty` (default): Formats the output in a structured, readable style. @@ -138,7 +138,7 @@ ducopy get-nodes --base-url https://your-ducobox-ip --output-format json To set the logging level, use the `--logging-level` option, which accepts values like `DEBUG`, `INFO`, `WARNING`, `ERROR`, or `CRITICAL`. ```bash -ducopy --logging-level DEBUG get-nodes --base-url https://your-ducobox-ip +ducopy --logging-level DEBUG get-nodes https://your-ducobox-ip ``` ## Contributing From 4087e238cb8cfc70e1893c942fa23b7c10f0f0e7 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 20:37:20 +0100 Subject: [PATCH 06/19] feat: Add support for Communication and Print Board in APIClient and models --- src/ducopy/cli.py | 7 ++++-- src/ducopy/rest/client.py | 50 +++++++++++++++++++++++++++++++++------ src/ducopy/rest/models.py | 10 ++++++++ 3 files changed, 58 insertions(+), 9 deletions(-) diff --git a/src/ducopy/cli.py b/src/ducopy/cli.py index 6da52b7..a236c56 100644 --- a/src/ducopy/cli.py +++ b/src/ducopy/cli.py @@ -40,9 +40,10 @@ import sys from typing import Any, Annotated from ducopy.ducopy import DucoPy -from rich import print as rich_print +from rich.console import Console from rich.pretty import Pretty from urllib.parse import urlparse +import pprint from ducopy.rest.models import ConfigNodeRequest @@ -87,7 +88,9 @@ def convert_to_dict(obj): if format == "json": typer.echo(json.dumps(data, indent=4)) else: - rich_print(Pretty(data)) + # Use Rich Console with wider width to keep NetworkDuco and similar dicts on one line + console = Console(width=200) + console.print(Pretty(data, expand_all=False)) @app.callback() diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 46c900e..0060e49 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -315,19 +315,46 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: Connectivity Board format: {"Node": 4, "General": {"Type": {"Val": "..."}, "Addr": 0}, "Ventilation": {...}, ...} """ - # Extract all sensor fields (snsr, co2, temp, rh, etc.) + # Network-related fields that should go to NetworkDuco + network_fields = { + "subtype": "Subtype", + "sub": "Sub", + "prnt": "Prnt", + "asso": "Asso", + "rssi_n2m": "RssiN2M", + "hop_via": "HopVia", + "rssi_n2h": "RssiN2H", + "show": "Show", + "link": "Link", + } + + # Extract all sensor fields (co2, temp, rh, etc.) - exclude snsr as it's metadata sensor_fields = {} - known_sensor_keys = ["snsr", "co2", "temp", "rh", "CO2", "Temp", "RH", "Snsr"] + # Map lowercase keys to proper capitalized names + sensor_key_mapping = { + "co2": "Co2", + "temp": "Temp", + "rh": "Rh" + } + known_sensor_keys = ["co2", "temp", "rh", "CO2", "Temp", "RH"] for key in known_sensor_keys: if key in gen1_data: - sensor_fields[key.lower()] = gen1_data[key] + normalized_key = key.lower() + proper_key = sensor_key_mapping.get(normalized_key, normalized_key.capitalize()) + sensor_fields[proper_key] = gen1_data[key] # Also check for any other potential sensor fields by looking for numeric values # that aren't already captured in other sections - known_non_sensor_keys = {"node", "devtype", "addr", "state", "ovrl", "cerr", "cntdwn", "endtime", "mode"} + known_non_sensor_keys = {"node", "devtype", "addr", "state", "ovrl", "cerr", "cntdwn", "endtime", "mode", "trgt", "actl", "snsr"} + known_non_sensor_keys.update(network_fields.keys()) # Exclude network fields for key, value in gen1_data.items(): if key not in known_non_sensor_keys and isinstance(value, (int, float)): - sensor_fields[key.lower()] = value + normalized_key = key.lower() + proper_key = sensor_key_mapping.get(normalized_key, normalized_key.capitalize()) + sensor_fields[proper_key] = value + + # Remove zero values from sensor data + sensor_fields = {k: v for k, v in sensor_fields.items() if v != 0 and v != 0.0} return { "Node": gen1_data.get("node"), @@ -339,7 +366,16 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: "Addr": gen1_data.get("addr", 0) }, "NetworkDuco": { - "CommErrorCtr": gen1_data.get("cerr", 0) + "CommErrorCtr": gen1_data.get("cerr", 0), + "Subtype": gen1_data.get("subtype"), + "Sub": gen1_data.get("sub"), + "Prnt": gen1_data.get("prnt"), + "Asso": gen1_data.get("asso"), + "RssiN2M": gen1_data.get("rssi_n2m"), + "HopVia": gen1_data.get("hop_via"), + "RssiN2H": gen1_data.get("rssi_n2h"), + "Show": gen1_data.get("show"), + "Link": gen1_data.get("link"), } if gen1_data.get("cerr") is not None else None, "Ventilation": { "State": gen1_data.get("state"), @@ -347,7 +383,7 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: "TimeStateRemain": gen1_data.get("cntdwn") if gen1_data.get("cntdwn", 0) != 0 else None, "TimeStateEnd": gen1_data.get("endtime") if gen1_data.get("endtime", 0) != 0 else None, "Mode": gen1_data.get("mode") if gen1_data.get("mode") != "-" else None, - "FlowLvlTgt": None, # Not available on Communication and Print Board + "FlowLvlTgt": gen1_data.get("trgt"), } if any(k in gen1_data for k in ["state", "ovrl", "mode"]) else None, "Sensor": sensor_fields if sensor_fields else None, } diff --git a/src/ducopy/rest/models.py b/src/ducopy/rest/models.py index 702dcca..3859d8f 100644 --- a/src/ducopy/rest/models.py +++ b/src/ducopy/rest/models.py @@ -159,6 +159,16 @@ def validate_addr(cls, values: dict[str, dict | str | int]) -> dict[str, dict | class NetworkDucoInfo(BaseModel): CommErrorCtr: int | None = None + # Network topology fields from Communication and Print Board + Subtype: int | None = None + Sub: int | None = None + Prnt: int | None = None + Asso: int | None = None + RssiN2M: int | None = None # RSSI node to master + HopVia: int | None = None + RssiN2H: int | None = None # RSSI node to hop + Show: int | None = None + Link: int | None = None @unified_validator() def validate_comm_error_ctr(cls, values: dict[str, dict | str | int]) -> dict[str, dict | str | int]: From 0572d811ebc2bedb6408f0ce313123357852a070 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 21:35:38 +0100 Subject: [PATCH 07/19] feat: Enhance APIClient to transform modern node info and support flexible base URL handling in tests --- src/ducopy/rest/client.py | 57 ++++++++++++++++++++++++++++++++++++++- tests/test_restapi.py | 28 +++++++++++++++++-- 2 files changed, 82 insertions(+), 3 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 0060e49..5d8e3a1 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -686,9 +686,61 @@ def get_nodes(self) -> NodesInfoResponse: logger.warning("Failed to fetch info for node {}: {}", node_id, e) # Continue with other nodes even if one fails data = {"Nodes": nodes} + elif self._generation == "modern" and "Nodes" in data: + # Transform each node in the Connectivity Board response + transformed_nodes = [] + for node in data["Nodes"]: + transformed_node = self._transform_modern_node_info(node) + transformed_nodes.append(transformed_node) + data["Nodes"] = transformed_nodes return NodesInfoResponse(**data) + def _transform_modern_node_info(self, data: dict) -> dict: + """ + Transform Connectivity Board node info response to move network fields to NetworkDuco. + + The Connectivity Board returns SubType, NetworkType, Parent, Asso in General section, + but they should be in NetworkDuco section for consistency. + """ + # Map modern API field names to model field names + # Modern API uses: SubType, NetworkType, Parent, Asso + # Model uses: Subtype, (no NetworkType stored separately), Prnt, Asso + network_field_mapping = { + "SubType": "Subtype", + "Parent": "Prnt", + "Asso": "Asso", + # NetworkType is informational but not stored in the model currently + } + + if "General" in data: + # Extract network fields from General + network_data = {} + for api_field, model_field in network_field_mapping.items(): + if api_field in data["General"]: + field_data = data["General"].pop(api_field) + # Extract the Val if it's a dict with Val key, otherwise use as-is + if isinstance(field_data, dict) and "Val" in field_data: + network_data[model_field] = field_data["Val"] + else: + network_data[model_field] = field_data + + # NetworkType is informational - can be logged but not stored in model + if "NetworkType" in data["General"]: + network_type = data["General"].pop("NetworkType") + logger.debug("NetworkType: {}", network_type.get("Val") if isinstance(network_type, dict) else network_type) + + # Only create NetworkDuco section if we have network data + if network_data: + # Create or update NetworkDuco section + if "NetworkDuco" not in data or data["NetworkDuco"] is None: + data["NetworkDuco"] = {} + + # Add network fields to NetworkDuco + data["NetworkDuco"].update(network_data) + + return data + def get_node_info(self, node_id: int) -> NodeInfo: """Retrieve detailed information for a specific node.""" logger.info("Fetching info for node ID: {}", node_id) @@ -706,9 +758,12 @@ def get_node_info(self, node_id: int) -> NodeInfo: data = response.json() - # Transform Communication and Print Board response to Connectivity Board format + # Transform responses to ensure consistent structure if self._generation == "legacy": data = self._transform_gen1_node_info(data) + else: + # Also transform modern API to move network fields to NetworkDuco + data = self._transform_modern_node_info(data) return NodeInfo(**data) # Direct instantiation for Pydantic 1.x diff --git a/tests/test_restapi.py b/tests/test_restapi.py index a435d1c..ab25b26 100644 --- a/tests/test_restapi.py +++ b/tests/test_restapi.py @@ -11,7 +11,15 @@ def client() -> APIClient: if not duco_ip: pytest.skip("DUCOBOX_IP environment variable is not set, skipping tests.") - base_url = f"https://{duco_ip}" + # Support both full URLs and IP addresses + # If DUCOBOX_IP already has a protocol, use it as-is + # Otherwise, try HTTPS first (for Connectivity Board), fallback to HTTP (for Communication Board) + if duco_ip.startswith("http://") or duco_ip.startswith("https://"): + base_url = duco_ip + else: + # Default to HTTPS for Connectivity Board + base_url = f"https://{duco_ip}" + client = APIClient(base_url=base_url, verify=True) # SSL verification enabled yield client client.close() @@ -24,7 +32,15 @@ def client_insecure() -> APIClient: if not duco_ip: pytest.skip("DUCOBOX_IP environment variable is not set, skipping tests.") - base_url = f"https://{duco_ip}" + # Support both full URLs and IP addresses + # If DUCOBOX_IP already has a protocol, use it as-is + # Otherwise, try HTTPS first (for Connectivity Board), fallback to HTTP (for Communication Board) + if duco_ip.startswith("http://") or duco_ip.startswith("https://"): + base_url = duco_ip + else: + # Default to HTTPS for Connectivity Board + base_url = f"https://{duco_ip}" + client = APIClient(base_url=base_url, verify=False) # SSL verification disabled yield client client.close() @@ -106,18 +122,24 @@ def test_set_actions_node_insecure(client_insecure: APIClient) -> None: def test_get_logs(client: APIClient) -> None: """Test fetching API logs with SSL verification.""" + if client.is_legacy_api: + pytest.skip("Logs are not available on Communication and Print Board (legacy API)") logs_response = client.get_logs() assert isinstance(logs_response, dict), "Logs response should be a dictionary" def test_get_logs_insecure(client_insecure: APIClient) -> None: """Test fetching API logs without SSL verification.""" + if client_insecure.is_legacy_api: + pytest.skip("Logs are not available on Communication and Print Board (legacy API)") logs_response = client_insecure.get_logs() assert isinstance(logs_response, dict), "Logs response should be a dictionary" def test_get_actions_node(client: APIClient) -> None: """Test fetching available actions for a specific node with SSL verification.""" + if client.is_legacy_api: + pytest.skip("Node actions are not available on Communication and Print Board (legacy API)") actions_response = client.get_actions_node(node_id=1) assert isinstance(actions_response, ActionsResponse), "Expected ActionsResponse instance" assert actions_response.Node == 1, "Actions response should match node ID 1" @@ -125,6 +147,8 @@ def test_get_actions_node(client: APIClient) -> None: def test_get_actions_node_insecure(client_insecure: APIClient) -> None: """Test fetching available actions for a specific node without SSL verification.""" + if client_insecure.is_legacy_api: + pytest.skip("Node actions are not available on Communication and Print Board (legacy API)") actions_response = client_insecure.get_actions_node(node_id=1) assert isinstance(actions_response, ActionsResponse), "Expected ActionsResponse instance" assert actions_response.Node == 1, "Actions response should match node ID 1" From 584e9bc501c7a3b89f8e66285d64818aca6433f7 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 23:06:23 +0100 Subject: [PATCH 08/19] feat: Update APIClient to fetch /info and /api endpoints without requiring API key --- src/ducopy/rest/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 5d8e3a1..692eb32 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -116,9 +116,9 @@ def detect_generation(self) -> dict: is_https = str(self.base_url).startswith('https://') try: - # Try to get /info endpoint directly (without mapping) + # Try to get /info endpoint directly (without mapping, without API key) logger.debug("Attempting to fetch /info endpoint...") - response = self.session.get("/info") + response = self.session.request("GET", "/info", ensure_apikey=False) response.raise_for_status() info_response = response.json() @@ -132,7 +132,7 @@ def detect_generation(self) -> dict: # HTTP + /info exists - need to check version logger.debug("Got /info on HTTP, checking version...") try: - api_response = self.session.get("/api") + api_response = self.session.request("GET", "/api", ensure_apikey=False) api_response.raise_for_status() api_info = api_response.json() From 781e92701110a1e65fdd7d1fdd74398e99046f9a Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 23:20:03 +0100 Subject: [PATCH 09/19] feat: Update APIClient to wrap sensor values in {"Val": value} format for Connectivity Board compatibility --- src/ducopy/rest/client.py | 43 ++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 692eb32..8fa6d51 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -329,6 +329,7 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: } # Extract all sensor fields (co2, temp, rh, etc.) - exclude snsr as it's metadata + # Wrap values in {"Val": value} format to match Connectivity Board structure sensor_fields = {} # Map lowercase keys to proper capitalized names sensor_key_mapping = { @@ -341,7 +342,7 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: if key in gen1_data: normalized_key = key.lower() proper_key = sensor_key_mapping.get(normalized_key, normalized_key.capitalize()) - sensor_fields[proper_key] = gen1_data[key] + sensor_fields[proper_key] = {"Val": gen1_data[key]} # Also check for any other potential sensor fields by looking for numeric values # that aren't already captured in other sections @@ -351,10 +352,10 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: if key not in known_non_sensor_keys and isinstance(value, (int, float)): normalized_key = key.lower() proper_key = sensor_key_mapping.get(normalized_key, normalized_key.capitalize()) - sensor_fields[proper_key] = value + sensor_fields[proper_key] = {"Val": value} - # Remove zero values from sensor data - sensor_fields = {k: v for k, v in sensor_fields.items() if v != 0 and v != 0.0} + # Remove zero values from sensor data (check the Val inside the dict) + sensor_fields = {k: v for k, v in sensor_fields.items() if v.get("Val") != 0 and v.get("Val") != 0.0} return { "Node": gen1_data.get("node"), @@ -363,27 +364,27 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: "Id": None, "Val": gen1_data.get("devtype", "UNKN") }, - "Addr": gen1_data.get("addr", 0) + "Addr": {"Val": gen1_data.get("addr", 0)} if gen1_data.get("addr") is not None else None }, "NetworkDuco": { - "CommErrorCtr": gen1_data.get("cerr", 0), - "Subtype": gen1_data.get("subtype"), - "Sub": gen1_data.get("sub"), - "Prnt": gen1_data.get("prnt"), - "Asso": gen1_data.get("asso"), - "RssiN2M": gen1_data.get("rssi_n2m"), - "HopVia": gen1_data.get("hop_via"), - "RssiN2H": gen1_data.get("rssi_n2h"), - "Show": gen1_data.get("show"), - "Link": gen1_data.get("link"), + "CommErrorCtr": {"Val": gen1_data.get("cerr", 0)} if gen1_data.get("cerr") is not None else None, + "Subtype": {"Val": gen1_data.get("subtype")} if gen1_data.get("subtype") is not None else None, + "Sub": {"Val": gen1_data.get("sub")} if gen1_data.get("sub") is not None else None, + "Prnt": {"Val": gen1_data.get("prnt")} if gen1_data.get("prnt") is not None else None, + "Asso": {"Val": gen1_data.get("asso")} if gen1_data.get("asso") is not None else None, + "RssiN2M": {"Val": gen1_data.get("rssi_n2m")} if gen1_data.get("rssi_n2m") is not None else None, + "HopVia": {"Val": gen1_data.get("hop_via")} if gen1_data.get("hop_via") is not None else None, + "RssiN2H": {"Val": gen1_data.get("rssi_n2h")} if gen1_data.get("rssi_n2h") is not None else None, + "Show": {"Val": gen1_data.get("show")} if gen1_data.get("show") is not None else None, + "Link": {"Val": gen1_data.get("link")} if gen1_data.get("link") is not None else None, } if gen1_data.get("cerr") is not None else None, "Ventilation": { - "State": gen1_data.get("state"), - "FlowLvlOvrl": gen1_data.get("ovrl", 0), - "TimeStateRemain": gen1_data.get("cntdwn") if gen1_data.get("cntdwn", 0) != 0 else None, - "TimeStateEnd": gen1_data.get("endtime") if gen1_data.get("endtime", 0) != 0 else None, - "Mode": gen1_data.get("mode") if gen1_data.get("mode") != "-" else None, - "FlowLvlTgt": gen1_data.get("trgt"), + "State": {"Val": gen1_data.get("state")} if gen1_data.get("state") else None, + "FlowLvlOvrl": {"Val": gen1_data.get("ovrl", 0)} if gen1_data.get("ovrl") is not None else None, + "TimeStateRemain": {"Val": gen1_data.get("cntdwn")} if gen1_data.get("cntdwn", 0) != 0 else None, + "TimeStateEnd": {"Val": gen1_data.get("endtime")} if gen1_data.get("endtime", 0) != 0 else None, + "Mode": {"Val": gen1_data.get("mode")} if gen1_data.get("mode") and gen1_data.get("mode") != "-" else None, + "FlowLvlTgt": {"Val": gen1_data.get("trgt")} if gen1_data.get("trgt") is not None else None, } if any(k in gen1_data for k in ["state", "ovrl", "mode"]) else None, "Sensor": sensor_fields if sensor_fields else None, } From 2e1a53338512baa0d9927a5a6deb7e8d345857a9 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sat, 3 Jan 2026 23:26:26 +0100 Subject: [PATCH 10/19] feat: Implement transformation of legacy API responses to modern format for Communication and Print Board --- src/ducopy/rest/client.py | 33 ++++++++++++++++++++++++++++++++- tests/test_client.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 8fa6d51..2c74831 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -305,6 +305,29 @@ def _map_endpoint(self, endpoint: str) -> str: return endpoint + def _transform_gen1_info(self, gen1_data: dict) -> dict: + """ + Transform Communication and Print Board info response to Connectivity Board format. + Wraps all flat values in {"Val": value} structure to match modern API format. + + Args: + gen1_data: Legacy API response data with flat structure + + Returns: + dict: Transformed data with values wrapped in {"Val": value} format + """ + def wrap_value(value): + """Wrap a value in {"Val": value} format if not None.""" + if value is None: + return None + if isinstance(value, dict): + # Recursively wrap nested dicts + return {k: wrap_value(v) for k, v in value.items()} + return {"Val": value} + + # Recursively wrap all values in the data structure + return {k: wrap_value(v) for k, v in gen1_data.items()} + def _transform_gen1_node_info(self, gen1_data: dict) -> dict: """ Transform Communication and Print Board node info response to Connectivity Board NodeInfo format. @@ -660,7 +683,15 @@ def get_info(self, module: str = None, submodule: str = None, parameter: str = N response = self.session.get(endpoint, params=params) response.raise_for_status() logger.debug("Received general info from endpoint: {}", endpoint) - return response.json() + + data = response.json() + + # Transform legacy API response to match modern API format + if self._generation == "legacy": + logger.debug("Transforming legacy info response to modern format") + data = self._transform_gen1_info(data) + + return data def get_nodes(self) -> NodesInfoResponse: """Retrieve list of all nodes.""" diff --git a/tests/test_client.py b/tests/test_client.py index 0d9019c..9ae5d49 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -248,4 +248,36 @@ def test_get_nodes_legacy(client: APIClient, mock_requests: requests_mock.Mocker response = client.get_nodes() assert isinstance(response, NodesInfoResponse) - assert len(response.Nodes) == 3 \ No newline at end of file + assert len(response.Nodes) == 3 + + +def test_get_info_legacy(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test get_info on Communication and Print Board with value wrapping.""" + mock_detection_endpoint_legacy(mock_requests) + client._generation = "legacy" + client._board_type = "Communication and Print Board" + + # Mock the legacy response format with flat values + mock_data = { + "General": { + "Time": 1730471603, + "SerialNumber": "ABC123" + }, + "Network": { + "IpAddress": "192.168.1.100", + "MacAddress": "00:11:22:33:44:55" + } + } + mock_requests.get(f"{BASE_URL}/boxinfoget", json=mock_data) + + response = client.get_info() + + # Verify the response has been transformed to {"Val": value} format + assert isinstance(response, dict) + assert "General" in response + assert "Time" in response["General"] + assert response["General"]["Time"] == {"Val": 1730471603} + assert response["General"]["SerialNumber"] == {"Val": "ABC123"} + assert "Network" in response + assert response["Network"]["IpAddress"] == {"Val": "192.168.1.100"} + assert response["Network"]["MacAddress"] == {"Val": "00:11:22:33:44:55"} From a1fb052b13728cb049b074243aab2168b9f14acb Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sun, 4 Jan 2026 16:59:52 +0100 Subject: [PATCH 11/19] feat: Add normalization method for node data structure in APIClient --- src/ducopy/rest/client.py | 43 ++++++++++++++++++++++++++++++++++++++- tests/test_client.py | 28 +++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 2c74831..b300eac 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -728,6 +728,44 @@ def get_nodes(self) -> NodesInfoResponse: return NodesInfoResponse(**data) + def _normalize_node_structure(self, data: dict) -> dict: + """ + Normalize and validate node data structure to ensure consistency. + + This method ensures that: + 1. All expected dict fields (General, NetworkDuco, Ventilation, Sensor) are dicts, not other types + 2. All values follow the {"Val": value} pattern where expected + 3. Invalid or unexpected types are corrected + + This provides a guaranteed consistent structure to integrations regardless of API quirks. + """ + # Ensure General is a dictionary + if "General" in data: + if not isinstance(data["General"], dict): + logger.warning("General field is type {} instead of dict, normalizing: {}", type(data["General"]).__name__, data["General"]) + # Convert invalid General to dict with Type field + data["General"] = {"Type": {"Val": str(data["General"])}} + + # Ensure NetworkDuco is a dictionary or None + if "NetworkDuco" in data: + if data["NetworkDuco"] is not None and not isinstance(data["NetworkDuco"], dict): + logger.warning("NetworkDuco field is type {} instead of dict, setting to None: {}", type(data["NetworkDuco"]).__name__, data["NetworkDuco"]) + data["NetworkDuco"] = None + + # Ensure Ventilation is a dictionary or None + if "Ventilation" in data: + if data["Ventilation"] is not None and not isinstance(data["Ventilation"], dict): + logger.warning("Ventilation field is type {} instead of dict, setting to None: {}", type(data["Ventilation"]).__name__, data["Ventilation"]) + data["Ventilation"] = None + + # Ensure Sensor is a dictionary or None + if "Sensor" in data: + if data["Sensor"] is not None and not isinstance(data["Sensor"], dict): + logger.warning("Sensor field is type {} instead of dict, setting to None: {}", type(data["Sensor"]).__name__, data["Sensor"]) + data["Sensor"] = None + + return data + def _transform_modern_node_info(self, data: dict) -> dict: """ Transform Connectivity Board node info response to move network fields to NetworkDuco. @@ -735,6 +773,9 @@ def _transform_modern_node_info(self, data: dict) -> dict: The Connectivity Board returns SubType, NetworkType, Parent, Asso in General section, but they should be in NetworkDuco section for consistency. """ + # First, normalize the structure to handle any API quirks + data = self._normalize_node_structure(data) + # Map modern API field names to model field names # Modern API uses: SubType, NetworkType, Parent, Asso # Model uses: Subtype, (no NetworkType stored separately), Prnt, Asso @@ -745,7 +786,7 @@ def _transform_modern_node_info(self, data: dict) -> dict: # NetworkType is informational but not stored in the model currently } - if "General" in data: + if "General" in data and isinstance(data["General"], dict): # Extract network fields from General network_data = {} for api_field, model_field in network_field_mapping.items(): diff --git a/tests/test_client.py b/tests/test_client.py index 9ae5d49..c282814 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -281,3 +281,31 @@ def test_get_info_legacy(client: APIClient, mock_requests: requests_mock.Mocker) assert "Network" in response assert response["Network"]["IpAddress"] == {"Val": "192.168.1.100"} assert response["Network"]["MacAddress"] == {"Val": "00:11:22:33:44:55"} + + +def test_normalize_node_structure(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test that the library normalizes invalid node structures from the API.""" + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + + # Mock a response with invalid General field (integer instead of dict) + # This simulates an API quirk where General is returned as an int + mock_data = { + "Node": 1, + "General": 12345, # Invalid: should be a dict + "NetworkDuco": None, + "Ventilation": None, + "Sensor": None + } + mock_requests.get(f"{BASE_URL}/info/nodes/1", json=mock_data) + + # The library should normalize this and not crash + response = client.get_node_info(node_id=1) + + # Verify the response was normalized + assert isinstance(response, NodeInfo) + assert response.Node == 1 + # General should be normalized to a dict + assert response.General is not None + assert isinstance(response.General.dict(), dict) From 865b44b558f3ea6a4df64681016067125ba519d9 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sun, 4 Jan 2026 18:28:43 +0100 Subject: [PATCH 12/19] feat: Enhance error handling in APIClient for action responses and node fetching --- src/ducopy/rest/client.py | 64 ++++++++++++++++++++++-- tests/test_client.py | 101 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 4 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index b300eac..c72698e 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -543,7 +543,27 @@ def post_action_node(self, action: str, value: str, node_id: int) -> ActionsChan "Received response for POST action from Node: {} with Action: {} and Val: {}", node_id, action, value ) - return ActionsChangeResponse(**response.json()) + response_data = response.json() + action_response = ActionsChangeResponse(**response_data) + + # Validate the response indicates success + # Code should be 0 for success, non-zero for failure + # Result should contain "SUCCESS" (case-insensitive) + if action_response.Code is not None and action_response.Code != 0: + logger.error("Action failed with error code {}: {}", action_response.Code, action_response.Result) + raise ValueError( + f"Failed to perform action '{action}' on node {node_id}. " + f"Error code: {action_response.Code}, Result: {action_response.Result}" + ) + + if "SUCCESS" not in action_response.Result.upper(): + logger.error("Action failed with result: {}", action_response.Result) + raise ValueError( + f"Failed to perform action '{action}' on node {node_id}. Result: {action_response.Result}" + ) + + logger.info("Successfully performed action '{}' on node {} with value '{}'", action, node_id, value) + return action_response def patch_config_node(self, node_id: int, config: ConfigNodeRequest) -> ConfigNodeResponse: """ @@ -636,9 +656,10 @@ def get_config_nodes(self) -> NodesResponse: # Communication and Print Board doesn't have a /config/nodes endpoint - fetch each node individually if self._generation == "legacy": - # First, get the node list + # First, get the node list - this will raise an exception if it fails nodes_response = self.get_nodes() node_configs = [] + failed_nodes = [] node_ids = [node.Node for node in nodes_response.Nodes] if nodes_response.Nodes else [] logger.info("Communication and Print Board detected - fetching config for {} nodes", len(node_ids)) @@ -652,7 +673,22 @@ def get_config_nodes(self) -> NodesResponse: node_configs.append(config.dict()) except Exception as e: logger.warning("Failed to fetch config for node {}: {}", node_id, e) - # Continue with other nodes even if one fails + failed_nodes.append(node_id) + # Continue with other nodes - individual node failures are tolerated + + # If ALL nodes failed, raise an error - this indicates a systematic problem + if failed_nodes and len(failed_nodes) == len(node_ids): + raise RuntimeError( + f"Failed to fetch configuration for all {len(node_ids)} nodes. " + "This may indicate a communication problem with the board." + ) + + # If some nodes failed, log it but continue + if failed_nodes: + logger.warning( + "Successfully fetched config for {} of {} nodes. Failed nodes: {}", + len(node_configs), len(node_ids), failed_nodes + ) data = {"Nodes": node_configs} return NodesResponse(**data) @@ -697,6 +733,9 @@ def get_nodes(self) -> NodesInfoResponse: """Retrieve list of all nodes.""" logger.info("Fetching list of all nodes") endpoint = self._map_endpoint("/info/nodes") + + # This request must succeed - don't catch exceptions here + # If it fails, let the exception propagate to the caller response = self.session.get(endpoint) response.raise_for_status() logger.debug("Received nodes data") @@ -708,6 +747,7 @@ def get_nodes(self) -> NodesInfoResponse: if self._generation == "legacy" and "nodelist" in data: node_ids = data["nodelist"] nodes = [] + failed_nodes = [] logger.info("Communication and Print Board detected - fetching details for {} nodes", len(node_ids)) for node_id in node_ids: try: @@ -716,7 +756,23 @@ def get_nodes(self) -> NodesInfoResponse: nodes.append(node_info) except Exception as e: logger.warning("Failed to fetch info for node {}: {}", node_id, e) - # Continue with other nodes even if one fails + failed_nodes.append(node_id) + # Continue with other nodes - individual node failures are tolerated + + # If ALL nodes failed, raise an error - this indicates a systematic problem + if failed_nodes and len(failed_nodes) == len(node_ids): + raise RuntimeError( + f"Failed to fetch information for all {len(node_ids)} nodes. " + "This may indicate a communication problem with the board." + ) + + # If some nodes failed, log it but continue + if failed_nodes: + logger.warning( + "Successfully fetched {} of {} nodes. Failed nodes: {}", + len(nodes), len(node_ids), failed_nodes + ) + data = {"Nodes": nodes} elif self._generation == "modern" and "Nodes" in data: # Transform each node in the Connectivity Board response diff --git a/tests/test_client.py b/tests/test_client.py index c282814..c3d4a5a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -101,6 +101,50 @@ def test_post_action_node(client: APIClient, mock_requests: requests_mock.Mocker assert response.Result == mock_data["Result"] +def test_post_action_node_failure_nonzero_code(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test that post_action_node raises an error when board returns non-zero error code.""" + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + + # Mock get_actions_node for validation + actions_data = load_mock_data("actions_node_1.json") + mock_requests.get(f"{BASE_URL}/action/nodes/1", json=actions_data) + + # Mock a failure response with non-zero code + failure_response = { + "Code": 1, # Non-zero indicates error + "Result": "FAILURE: Invalid state" + } + mock_requests.post(f"{BASE_URL}/action/nodes/1", json=failure_response) + + # Should raise ValueError + with pytest.raises(ValueError, match="Failed to perform action.*Error code: 1"): + client.post_action_node(action="SetVentilationState", value="AUTO", node_id=1) + + +def test_post_action_node_failure_result(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test that post_action_node raises an error when board returns failure result.""" + mock_detection_endpoint_modern(mock_requests) + client._generation = "modern" + client._board_type = "Connectivity Board" + + # Mock get_actions_node for validation + actions_data = load_mock_data("actions_node_1.json") + mock_requests.get(f"{BASE_URL}/action/nodes/1", json=actions_data) + + # Mock a failure response with FAILURE in result (but Code could be None or 0) + failure_response = { + "Code": None, + "Result": "FAILURE" + } + mock_requests.post(f"{BASE_URL}/action/nodes/1", json=failure_response) + + # Should raise ValueError + with pytest.raises(ValueError, match="Failed to perform action.*Result: FAILURE"): + client.post_action_node(action="SetVentilationState", value="AUTO", node_id=1) + + def test_get_node_info(client: APIClient, mock_requests: requests_mock.Mocker) -> None: mock_detection_endpoint_modern(mock_requests) client._generation = "modern" @@ -251,6 +295,63 @@ def test_get_nodes_legacy(client: APIClient, mock_requests: requests_mock.Mocker assert len(response.Nodes) == 3 +def test_get_nodes_legacy_all_nodes_fail(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test that get_nodes raises error when ALL nodes fail on Communication and Print Board.""" + mock_detection_endpoint_legacy(mock_requests) + client._generation = "legacy" + client._board_type = "Communication and Print Board" + + # Mock nodelist response + mock_requests.get(f"{BASE_URL}/nodelist", json={"nodelist": [1, 2, 3]}) + + # Mock all node info requests to fail (e.g., 500 error) + mock_requests.get(f"{BASE_URL}/nodeinfoget", status_code=500) + + # Should raise RuntimeError because all nodes failed + with pytest.raises(RuntimeError, match="Failed to fetch information for all 3 nodes"): + client.get_nodes() + + +def test_get_nodes_legacy_partial_failure(client: APIClient, mock_requests: requests_mock.Mocker) -> None: + """Test that get_nodes continues when SOME nodes fail on Communication and Print Board.""" + mock_detection_endpoint_legacy(mock_requests) + client._generation = "legacy" + client._board_type = "Communication and Print Board" + + # Mock nodelist response + mock_requests.get(f"{BASE_URL}/nodelist", json={"nodelist": [1, 2, 3]}) + + # Mock responses: node 1 and 3 succeed, node 2 fails + def node_info_callback(request, context): + params = request.qs + node_id = int(params['node'][0]) if 'node' in params else 1 + + if node_id == 2: + context.status_code = 500 + return "Internal Server Error" + + return { + "node": node_id, + "devtype": "VLVRH", + "addr": node_id, + "state": "AUTO", + "ovrl": 255, + "cerr": 0 + } + + mock_requests.get(f"{BASE_URL}/nodeinfoget", json=node_info_callback) + + # Should succeed with 2 nodes (1 and 3) + response = client.get_nodes() + assert isinstance(response, NodesInfoResponse) + assert len(response.Nodes) == 2 + # Verify we got nodes 1 and 3, not node 2 + node_ids = [node.Node for node in response.Nodes] + assert 1 in node_ids + assert 3 in node_ids + assert 2 not in node_ids + + def test_get_info_legacy(client: APIClient, mock_requests: requests_mock.Mocker) -> None: """Test get_info on Communication and Print Board with value wrapping.""" mock_detection_endpoint_legacy(mock_requests) From 0af96215102a9c5ccffe65385fbb0f737ec18529 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sun, 4 Jan 2026 20:28:51 +0100 Subject: [PATCH 13/19] feat: Add normalization methods for handling Communication/Print and Connectivity board formats --- src/ducopy/rest/models.py | 63 ++++++++++++++++++++++++++++++++++----- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/src/ducopy/rest/models.py b/src/ducopy/rest/models.py index 3859d8f..013828e 100644 --- a/src/ducopy/rest/models.py +++ b/src/ducopy/rest/models.py @@ -141,11 +141,38 @@ class GeneralInfo(BaseModel): Id: int | None = None Val: str + @unified_validator() + def normalize_val_format(cls, values: dict[str, Any] | Any) -> dict[str, Any]: + """ + Handle both board formats: + - Communication/Print board: {"Val": "BOX"} + - Connectivity board: {"Id": None, "Val": "BOX"} + """ + # If we receive just a dict with only "Val", normalize it + if isinstance(values, dict): + if "Val" in values and "Id" not in values: + # This is Communication/Print board format - already correct + values.setdefault("Id", None) + return values + class NodeGeneralInfo(BaseModel): Type: GeneralInfo Addr: int | None = None + @unified_validator() + def normalize_type_format(cls, values: dict[str, Any]) -> dict[str, Any]: + """ + Handle Type field which can be: + - Communication/Print board: {"Val": "BOX"} (missing Id) + - Connectivity board: {"Id": None, "Val": "BOX"} + """ + if "Type" in values and isinstance(values["Type"], dict): + # Ensure it has both Id and Val fields + if "Val" in values["Type"] and "Id" not in values["Type"]: + values["Type"]["Id"] = None + return values + @unified_validator() def validate_addr(cls, values: dict[str, dict | str | int]) -> dict[str, dict | str | int]: addr_value = values.get("Addr", {}) @@ -171,13 +198,24 @@ class NetworkDucoInfo(BaseModel): Link: int | None = None @unified_validator() - def validate_comm_error_ctr(cls, values: dict[str, dict | str | int]) -> dict[str, dict | str | int]: - comm_error_value = values.get("CommErrorCtr", {}) - # Handle empty dict from connectivity boards - if comm_error_value == {}: - values["CommErrorCtr"] = None - else: - values["CommErrorCtr"] = extract_val(comm_error_value) + def normalize_network_fields(cls, values: dict[str, Any]) -> dict[str, Any]: + """ + Normalize all network fields to extract Val from Communication/Print board format. + Fields can be raw integers or {"Val": integer}. + """ + fields_to_normalize = [ + "CommErrorCtr", "Subtype", "Sub", "Prnt", "Asso", + "RssiN2M", "HopVia", "RssiN2H", "Show", "Link" + ] + + for field in fields_to_normalize: + if field in values: + value = values[field] + # Handle empty dict from connectivity boards + if value == {}: + values[field] = None + else: + values[field] = extract_val(value) return values @@ -231,6 +269,17 @@ class NodeInfo(BaseModel): Ventilation: VentilationInfo | None = None Sensor: SensorData | None = Field(default=None) + @unified_validator() + def normalize_node_field(cls, values: dict[str, Any]) -> dict[str, Any]: + """ + Normalize Node field which can be: + - Communication/Print board: {"Val": 1} + - Connectivity board: 1 + """ + if "Node" in values: + values["Node"] = extract_val(values["Node"]) + return values + class NodesInfoResponse(BaseModel): Nodes: list[NodeInfo] | None = Field(default=None) From 3171d3bcc997d3af78626eab84161f146a9909a7 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sun, 4 Jan 2026 20:46:23 +0100 Subject: [PATCH 14/19] feat: Add device identification caching for Communication and Print Board --- src/ducopy/rest/client.py | 145 +++++++++++++++++++++++++++++++++++++- 1 file changed, 142 insertions(+), 3 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index c72698e..8c65959 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -79,6 +79,11 @@ def __init__(self, base_url: HttpUrl, verify: bool = True, auto_detect: bool = T self._generation = None self._board_type = None + # Device identification cache (board-agnostic) + self._mac_address = None + self._board_serial = None + self._device_info_cached = False + logger.info("APIClient initialized with base URL: {}", base_url) # Automatically detect generation if requested @@ -173,12 +178,17 @@ def detect_generation(self) -> dict: str(self.base_url).replace("http://", "").rstrip("/") ) + # Cache device identification info + self._cache_device_info() + return { "generation": self._generation, "api_version": self._api_version, "public_api_version": self._public_api_version, "protocol": "HTTPS" if is_https else "HTTP", - "board_type": self._board_type + "board_type": self._board_type, + "mac_address": self._mac_address, + "board_serial": self._board_serial } except Exception as e: @@ -191,12 +201,17 @@ def detect_generation(self) -> dict: self._board_type = "Communication and Print Board" logger.info("Detected Communication and Print Board (legacy API) - /info endpoint not found (404)") + # Cache device identification info + self._cache_device_info() + return { "generation": self._generation, "api_version": None, "public_api_version": None, "protocol": "HTTPS" if is_https else "HTTP", - "board_type": self._board_type + "board_type": self._board_type, + "mac_address": self._mac_address, + "board_serial": self._board_serial } # Check if it's a timeout or connection error with HTTPS @@ -274,6 +289,103 @@ def is_legacy_api(self) -> bool: """ return self._generation == "legacy" + @property + def mac_address(self) -> str | None: + """ + Get the cached MAC address of the device. + + This is fetched during detect_generation() and cached for consistent access + regardless of board type: + - Connectivity Board: Available in /info + - Communication/Print Board: Available in /boardinfo + + Returns: + str | None: The MAC address or None if not yet cached + """ + return self._mac_address + + @property + def board_serial(self) -> str | None: + """ + Get the cached board serial number. + + This is fetched during detect_generation() and cached for consistent access + regardless of board type. + + Returns: + str | None: The board serial number or None if not yet cached + """ + return self._board_serial + + def _cache_device_info(self) -> None: + """ + Cache device identification information (MAC address, serial number). + + This method fetches device info from the appropriate endpoint based on board type: + - Connectivity Board (modern): /info endpoint contains all info + - Communication/Print Board (legacy): MAC is in /boardinfo endpoint + + The cached info is then available via properties for consistent access. + """ + if self._device_info_cached: + logger.debug("Device info already cached") + return + + try: + if self.is_modern_api: + # Connectivity Board: /info has everything + logger.debug("Fetching device info from /info endpoint (Connectivity Board)") + response = self.session.request("GET", "/info", ensure_apikey=False) + response.raise_for_status() + data = response.json() + + # Extract MAC and serial from nested structure + if "General" in data and "Lan" in data["General"]: + self._mac_address = data["General"]["Lan"].get("Mac", {}).get("Val") + if "General" in data and "Board" in data["General"]: + self._board_serial = data["General"]["Board"].get("SerialBoardBox", {}).get("Val") + + logger.info("Cached device info from Connectivity Board: MAC={}, Serial={}", + self._mac_address, self._board_serial) + + elif self.is_legacy_api: + # Communication/Print Board: Need /boardinfo for MAC + logger.debug("Fetching device info from /boardinfo endpoint (Communication/Print Board)") + + # Try /boardinfo endpoint for MAC and serial + try: + response = self.session.request("GET", "/boardinfo", ensure_apikey=False) + response.raise_for_status() + board_data = response.json() + + # Communication/Print board format may have MAC in different locations + # Try common paths - check for {"Val": ...} format first + mac_value = board_data.get("mac") or board_data.get("Mac") + if isinstance(mac_value, dict) and "Val" in mac_value: + self._mac_address = mac_value["Val"] + elif isinstance(mac_value, str): + self._mac_address = mac_value + + # Serial might also be in boardinfo endpoint + serial_value = board_data.get("serial") or board_data.get("Serial") + if isinstance(serial_value, dict) and "Val" in serial_value: + self._board_serial = serial_value["Val"] + elif isinstance(serial_value, str): + self._board_serial = serial_value + + logger.info("Cached device info from Communication/Print Board: MAC={}, Serial={}", + self._mac_address, self._board_serial) + + except Exception as e: + logger.warning("Failed to fetch /boardinfo endpoint: {}. Device info may be incomplete.", e) + # Continue - partial info is acceptable + + self._device_info_cached = True + + except Exception as e: + logger.error("Failed to cache device info: {}", e) + # Don't raise - this is not critical, just nice to have + def _map_endpoint(self, endpoint: str) -> str: """ Map Connectivity Board endpoints to Communication and Print Board equivalents if using legacy API. @@ -709,7 +821,13 @@ def get_api_info(self) -> dict: return response.json() def get_info(self, module: str = None, submodule: str = None, parameter: str = None) -> dict: - """Fetch general API information.""" + """ + Fetch general API information. + + For Communication/Print boards, this method enriches the response with cached + device identification info (MAC address, serial) that is only available via + the /boardinfo endpoint, ensuring consistent data regardless of board type. + """ params = {k: v for k, v in {"module": module, "submodule": submodule, "parameter": parameter}.items() if v} # Map endpoint for Communication and Print Board @@ -726,6 +844,27 @@ def get_info(self, module: str = None, submodule: str = None, parameter: str = N if self._generation == "legacy": logger.debug("Transforming legacy info response to modern format") data = self._transform_gen1_info(data) + + # Enrich with cached device info (MAC, serial) for legacy boards + # This info is only available in /communication endpoint on legacy boards + if self._mac_address or self._board_serial: + logger.debug("Enriching legacy board response with cached device info") + + # Ensure General structure exists + if "General" not in data: + data["General"] = {} + + # Add Lan.Mac if cached + if self._mac_address: + if "Lan" not in data["General"]: + data["General"]["Lan"] = {} + data["General"]["Lan"]["Mac"] = {"Val": self._mac_address} + + # Add Board.SerialBoardBox if cached + if self._board_serial: + if "Board" not in data["General"]: + data["General"]["Board"] = {} + data["General"]["Board"]["SerialBoardBox"] = {"Val": self._board_serial} return data From f4279368d066f7291d4db5a1ceeed3fc6e28f263 Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sun, 4 Jan 2026 21:58:28 +0100 Subject: [PATCH 15/19] feat: Enhance APIClient and NodeGeneralInfo to support software version and serial number extraction --- src/ducopy/rest/client.py | 41 +++++++++++++++++++++++++++++++-------- src/ducopy/rest/models.py | 27 ++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 8 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 8c65959..b87bd0f 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -481,7 +481,7 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: # Also check for any other potential sensor fields by looking for numeric values # that aren't already captured in other sections - known_non_sensor_keys = {"node", "devtype", "addr", "state", "ovrl", "cerr", "cntdwn", "endtime", "mode", "trgt", "actl", "snsr"} + known_non_sensor_keys = {"node", "devtype", "addr", "state", "ovrl", "cerr", "cntdwn", "endtime", "mode", "trgt", "actl", "snsr", "sw", "swver", "swversion", "serial", "serialboard", "serialnode"} known_non_sensor_keys.update(network_fields.keys()) # Exclude network fields for key, value in gen1_data.items(): if key not in known_non_sensor_keys and isinstance(value, (int, float)): @@ -492,15 +492,40 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: # Remove zero values from sensor data (check the Val inside the dict) sensor_fields = {k: v for k, v in sensor_fields.items() if v.get("Val") != 0 and v.get("Val") != 0.0} + # Extract software version (try multiple common field names) + sw_version = None + for sw_key in ["sw", "swver", "swversion"]: + if sw_key in gen1_data and gen1_data[sw_key]: + sw_version = {"Id": None, "Val": gen1_data[sw_key]} + break + + # Extract serial number (try multiple common field names) + serial_board = None + for serial_key in ["serial", "serialboard", "serialnode"]: + if serial_key in gen1_data and gen1_data[serial_key]: + serial_board = gen1_data[serial_key] + break + + # Build General section + general_info = { + "Type": { + "Id": None, + "Val": gen1_data.get("devtype", "UNKN") + }, + "Addr": {"Val": gen1_data.get("addr", 0)} if gen1_data.get("addr") is not None else None + } + + # Add SwVersion if available + if sw_version: + general_info["SwVersion"] = sw_version + + # Add SerialBoard if available + if serial_board: + general_info["SerialBoard"] = serial_board + return { "Node": gen1_data.get("node"), - "General": { - "Type": { - "Id": None, - "Val": gen1_data.get("devtype", "UNKN") - }, - "Addr": {"Val": gen1_data.get("addr", 0)} if gen1_data.get("addr") is not None else None - }, + "General": general_info, "NetworkDuco": { "CommErrorCtr": {"Val": gen1_data.get("cerr", 0)} if gen1_data.get("cerr") is not None else None, "Subtype": {"Val": gen1_data.get("subtype")} if gen1_data.get("subtype") is not None else None, diff --git a/src/ducopy/rest/models.py b/src/ducopy/rest/models.py index 013828e..eee9fd3 100644 --- a/src/ducopy/rest/models.py +++ b/src/ducopy/rest/models.py @@ -159,6 +159,8 @@ def normalize_val_format(cls, values: dict[str, Any] | Any) -> dict[str, Any]: class NodeGeneralInfo(BaseModel): Type: GeneralInfo Addr: int | None = None + SwVersion: GeneralInfo | None = None + SerialBoard: str | None = None @unified_validator() def normalize_type_format(cls, values: dict[str, Any]) -> dict[str, Any]: @@ -173,6 +175,19 @@ def normalize_type_format(cls, values: dict[str, Any]) -> dict[str, Any]: values["Type"]["Id"] = None return values + @unified_validator() + def normalize_swversion_format(cls, values: dict[str, Any]) -> dict[str, Any]: + """ + Handle SwVersion field which can be: + - Communication/Print board: {"Val": "1.0.0"} (missing Id) + - Connectivity board: {"Id": 65544, "Val": "1.0.0"} + """ + if "SwVersion" in values and isinstance(values["SwVersion"], dict): + # Ensure it has both Id and Val fields + if "Val" in values["SwVersion"] and "Id" not in values["SwVersion"]: + values["SwVersion"]["Id"] = None + return values + @unified_validator() def validate_addr(cls, values: dict[str, dict | str | int]) -> dict[str, dict | str | int]: addr_value = values.get("Addr", {}) @@ -182,6 +197,18 @@ def validate_addr(cls, values: dict[str, dict | str | int]) -> dict[str, dict | else: values["Addr"] = extract_val(addr_value) return values + + @unified_validator() + def normalize_serial_board(cls, values: dict[str, Any]) -> dict[str, Any]: + """ + Handle SerialBoard field which can be: + - Communication/Print board: plain string + - Config endpoint: plain string + """ + if "SerialBoard" in values: + # Extract value if it's wrapped in a dict + values["SerialBoard"] = extract_val(values["SerialBoard"]) + return values class NetworkDucoInfo(BaseModel): From e4e2bc3b9adf5292ab99ddfa22793e3ee6e0354e Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sun, 4 Jan 2026 22:12:49 +0100 Subject: [PATCH 16/19] feat: Add get_board_info method to retrieve board-level information for Connectivity and Communication/Print Boards --- src/ducopy/ducopy.py | 22 ++++++++ src/ducopy/rest/client.py | 115 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) diff --git a/src/ducopy/ducopy.py b/src/ducopy/ducopy.py index ad1fb5c..b3e3a49 100644 --- a/src/ducopy/ducopy.py +++ b/src/ducopy/ducopy.py @@ -235,6 +235,28 @@ def get_logs(self) -> dict: """ return self.client.get_logs() + def get_board_info(self) -> dict: + """ + Get board-level information including MAC address, serial number, and software version. + + This method provides a normalized interface for retrieving board information + regardless of board type (Connectivity Board or Communication/Print Board). + + Returns: + dict: Board information with the following structure: + { + "Mac": str, # MAC address (e.g., "AA:BB:CC:DD:EE:FF") + "Serial": str, # Board serial number (e.g., "BOARD123456") + "SwVersion": str, # Software version (e.g., "2.0.6.0" or "16010.3.7.0") + } + + Example: + >>> facade = DucoPy("https://192.168.1.100") + >>> board_info = facade.get_board_info() + >>> print(f"MAC: {board_info['Mac']}, Version: {board_info['SwVersion']}") + """ + return self.client.get_board_info() + def close(self) -> None: """Close the HTTP session. diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index b87bd0f..7d7ac6b 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -1113,6 +1113,121 @@ def get_logs(self) -> dict: logger.debug("Received API logs") return response.json() + def get_board_info(self) -> dict: + """ + Get board-level information including MAC address, serial number, and software version. + + This method provides a normalized interface for retrieving board information + regardless of board type: + + - Connectivity Board: Queries /info endpoint (General.Board section) + - Communication/Print Board: Queries /boardinfo for MAC/serial and finds BOX node for software version + + Returns: + dict: Board information with the following structure: + { + "Mac": str, # MAC address (e.g., "AA:BB:CC:DD:EE:FF") + "Serial": str, # Board serial number (e.g., "BOARD123456") + "SwVersion": str, # Software version (e.g., "2.0.6.0" or "16010.3.7.0") + } + + Example: + >>> client = APIClient("https://192.168.1.100") + >>> board_info = client.get_board_info() + >>> print(f"MAC: {board_info['Mac']}, Serial: {board_info['Serial']}, Version: {board_info['SwVersion']}") + """ + logger.info("Fetching board information") + + if self.is_modern_api: + # Connectivity Board: Get from /info endpoint + logger.debug("Fetching board info from /info endpoint (Connectivity Board)") + info = self.get_info() + + board_info = { + "Mac": None, + "Serial": None, + "SwVersion": None + } + + # Extract MAC address + if "General" in info and "Lan" in info["General"]: + mac_data = info["General"]["Lan"].get("Mac", {}) + if isinstance(mac_data, dict) and "Val" in mac_data: + board_info["Mac"] = mac_data["Val"] + elif isinstance(mac_data, str): + board_info["Mac"] = mac_data + + # Extract serial number + if "General" in info and "Board" in info["General"]: + serial_data = info["General"]["Board"].get("SerialBoardBox", {}) + if isinstance(serial_data, dict) and "Val" in serial_data: + board_info["Serial"] = serial_data["Val"] + elif isinstance(serial_data, str): + board_info["Serial"] = serial_data + + # Extract software version from Board section + if "General" in info and "Board" in info["General"]: + sw_data = info["General"]["Board"].get("SwVersion", {}) + if isinstance(sw_data, dict) and "Val" in sw_data: + board_info["SwVersion"] = sw_data["Val"] + elif isinstance(sw_data, str): + board_info["SwVersion"] = sw_data + + logger.info("Retrieved Connectivity Board info: MAC={}, Serial={}, SwVersion={}", + board_info["Mac"], board_info["Serial"], board_info["SwVersion"]) + + return board_info + + else: + # Communication/Print Board: Get from /boardinfo + BOX node + logger.debug("Fetching board info from /boardinfo and BOX node (Communication/Print Board)") + + board_info = { + "Mac": self._mac_address, + "Serial": self._board_serial, + "SwVersion": None + } + + # If not cached yet, try to fetch from /boardinfo + if not self._device_info_cached: + self._cache_device_info() + board_info["Mac"] = self._mac_address + board_info["Serial"] = self._board_serial + + # Get software version from BOX node + # Find the BOX node in the node list + try: + nodes = self.get_nodes() + box_node = None + + # Look for a node with Type == "BOX" + if nodes.Nodes: + for node in nodes.Nodes: + if node.General and node.General.Type and node.General.Type.Val == "BOX": + box_node = node + break + + if box_node and box_node.General.SwVersion: + if isinstance(box_node.General.SwVersion, dict) and "Val" in box_node.General.SwVersion: + board_info["SwVersion"] = box_node.General.SwVersion["Val"] + elif hasattr(box_node.General.SwVersion, "Val"): + board_info["SwVersion"] = box_node.General.SwVersion.Val + else: + board_info["SwVersion"] = str(box_node.General.SwVersion) + + logger.debug("Found BOX node (ID: {}) with SwVersion: {}", + box_node.Node, board_info["SwVersion"]) + else: + logger.warning("BOX node not found or does not have SwVersion field") + + except Exception as e: + logger.warning("Failed to retrieve BOX node software version: {}", e) + + logger.info("Retrieved Communication/Print Board info: MAC={}, Serial={}, SwVersion={}", + board_info["Mac"], board_info["Serial"], board_info["SwVersion"]) + + return board_info + def close(self) -> None: """Close the HTTP session.""" logger.info("Closing the API client session") From 22bb6507959cc543c4b6d608d262f34254e3df8c Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sun, 4 Jan 2026 22:48:27 +0100 Subject: [PATCH 17/19] feat: Update APIClient to use /board_info endpoint and add support for serialnb key --- src/ducopy/rest/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 7d7ac6b..fb14dfc 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -354,7 +354,7 @@ def _cache_device_info(self) -> None: # Try /boardinfo endpoint for MAC and serial try: - response = self.session.request("GET", "/boardinfo", ensure_apikey=False) + response = self.session.request("GET", "/board_info", ensure_apikey=False) response.raise_for_status() board_data = response.json() @@ -481,7 +481,7 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: # Also check for any other potential sensor fields by looking for numeric values # that aren't already captured in other sections - known_non_sensor_keys = {"node", "devtype", "addr", "state", "ovrl", "cerr", "cntdwn", "endtime", "mode", "trgt", "actl", "snsr", "sw", "swver", "swversion", "serial", "serialboard", "serialnode"} + known_non_sensor_keys = {"node", "devtype", "addr", "state", "ovrl", "cerr", "cntdwn", "endtime", "mode", "trgt", "actl", "snsr", "sw", "swver", "swversion", "serial", "serialboard", "serialnode", "serialnb"} known_non_sensor_keys.update(network_fields.keys()) # Exclude network fields for key, value in gen1_data.items(): if key not in known_non_sensor_keys and isinstance(value, (int, float)): @@ -501,7 +501,7 @@ def _transform_gen1_node_info(self, gen1_data: dict) -> dict: # Extract serial number (try multiple common field names) serial_board = None - for serial_key in ["serial", "serialboard", "serialnode"]: + for serial_key in ["serial", "serialboard", "serialnode", "serialnb"]: if serial_key in gen1_data and gen1_data[serial_key]: serial_board = gen1_data[serial_key] break From 87cbd3f4d74c25f4d52fccc15e8d1de1e34fd08b Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Sun, 4 Jan 2026 23:24:19 +0100 Subject: [PATCH 18/19] feat: Update APIClient to handle /board_info response and cache uptime for Communication/Print Boards --- src/ducopy/rest/client.py | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index fb14dfc..1839eb6 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -358,23 +358,14 @@ def _cache_device_info(self) -> None: response.raise_for_status() board_data = response.json() - # Communication/Print board format may have MAC in different locations - # Try common paths - check for {"Val": ...} format first - mac_value = board_data.get("mac") or board_data.get("Mac") - if isinstance(mac_value, dict) and "Val" in mac_value: - self._mac_address = mac_value["Val"] - elif isinstance(mac_value, str): - self._mac_address = mac_value + # Communication/Print board /board_info returns plain values: + # {"serial": "PRSN21401066", "mac": "00:08:5f:35:a8:0f", "swversion": "16036.13.4.0", "uptime": 2452} + self._mac_address = board_data.get("mac") + self._board_serial = board_data.get("serial") + self._board_uptime = board_data.get("uptime") - # Serial might also be in boardinfo endpoint - serial_value = board_data.get("serial") or board_data.get("Serial") - if isinstance(serial_value, dict) and "Val" in serial_value: - self._board_serial = serial_value["Val"] - elif isinstance(serial_value, str): - self._board_serial = serial_value - - logger.info("Cached device info from Communication/Print Board: MAC={}, Serial={}", - self._mac_address, self._board_serial) + logger.info("Cached device info from Communication/Print Board: MAC={}, Serial={}, Uptime={}", + self._mac_address, self._board_serial, self._board_uptime) except Exception as e: logger.warning("Failed to fetch /boardinfo endpoint: {}. Device info may be incomplete.", e) @@ -1129,6 +1120,7 @@ def get_board_info(self) -> dict: "Mac": str, # MAC address (e.g., "AA:BB:CC:DD:EE:FF") "Serial": str, # Board serial number (e.g., "BOARD123456") "SwVersion": str, # Software version (e.g., "2.0.6.0" or "16010.3.7.0") + "Uptime": int, # Board uptime in seconds (Communication/Print boards only, None for Connectivity) } Example: @@ -1146,7 +1138,8 @@ def get_board_info(self) -> dict: board_info = { "Mac": None, "Serial": None, - "SwVersion": None + "SwVersion": None, + "Uptime": None } # Extract MAC address @@ -1185,14 +1178,16 @@ def get_board_info(self) -> dict: board_info = { "Mac": self._mac_address, "Serial": self._board_serial, - "SwVersion": None + "SwVersion": None, + "Uptime": getattr(self, '_board_uptime', None) } - # If not cached yet, try to fetch from /boardinfo + # If not cached yet, try to fetch from /board_info if not self._device_info_cached: self._cache_device_info() board_info["Mac"] = self._mac_address board_info["Serial"] = self._board_serial + board_info["Uptime"] = getattr(self, '_board_uptime', None) # Get software version from BOX node # Find the BOX node in the node list From d0cda3113f0719f4d968d31281ccd283093d103e Mon Sep 17 00:00:00 2001 From: Stuart Pearson <1926002+stuartp44@users.noreply.github.com> Date: Mon, 5 Jan 2026 00:32:34 +0100 Subject: [PATCH 19/19] feat: Normalize calibration data structure for consistent access across board types --- src/ducopy/rest/client.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/ducopy/rest/client.py b/src/ducopy/rest/client.py index 1839eb6..677c011 100644 --- a/src/ducopy/rest/client.py +++ b/src/ducopy/rest/client.py @@ -1023,6 +1023,30 @@ def _transform_modern_node_info(self, data: dict) -> dict: # Add network fields to NetworkDuco data["NetworkDuco"].update(network_data) + # Normalize calibration data: Move Ventilation.Calibration.* to Calibration.Calib* + # This ensures consistent access regardless of board type + if "Ventilation" in data and isinstance(data["Ventilation"], dict): + if "Calibration" in data["Ventilation"] and isinstance(data["Ventilation"]["Calibration"], dict): + calib_data = data["Ventilation"].pop("Calibration") + + # Map Connectivity Board calibration fields to normalized names + calib_mapping = { + "Valid": "CalibIsValid", + "State": "CalibState", + "Error": "CalibError" + } + + # Create Calibration section if it doesn't exist + if "Calibration" not in data or data["Calibration"] is None: + data["Calibration"] = {} + + # Move and rename calibration fields + for api_field, normalized_field in calib_mapping.items(): + if api_field in calib_data: + data["Calibration"][normalized_field] = calib_data[api_field] + + logger.debug("Normalized calibration data from Ventilation.Calibration to Calibration section") + return data def get_node_info(self, node_id: int) -> NodeInfo: