diff --git a/examples/configs/example_rack_config.yml b/examples/configs/example_rack_config.yml index 5fb433f..8e12cd2 100644 --- a/examples/configs/example_rack_config.yml +++ b/examples/configs/example_rack_config.yml @@ -91,15 +91,15 @@ rackConfig: # [type: "tapo", ip: "", username: "", password: "", outlet: "optional"] # [type: "hs100", ip:"", port:"optional" ] kara also supports hs100 # [type: "apc", ip:"", username:"", password:"" ] rack apc switch - # [type: "olimex", ip:"", port:"optional", relay:"" ] + # [type: "olimex", ip:"", port:"optional", relay:"" ] # [type: "SLP", ip:"", username: "", password: "", outlet_id:"", port:"optional"] # [type: "none" ] if section doesn't exist then type:none will be used - # [ hdmiCECController: optional ] - Specific hdmiCECController for the slot + # [ hdmiCECController: optional ] - Specifies hdmiCECController for the slot # supported types: # [type: "cec-client", adaptor: "/dev/ttycec"] # [type: "remote-cec-client", adaptor: "/dev/ttycec", address: "192.168.99.1", username(optional): "testuser", password(optional): "testpswd", port(optional): "22"] - + # [type: "virtual-cec-client", address: "127.0.0.1", username: "testuser", password: "testpswd", port: "5522", control_port: 8080, device_network_configuration: "path to device network configuration file" ] # [ avSyncController: optional] - Specifiec AVSyncController for the slot # supported types: # [type: "SyncOne2", port: "/dev/ttyACM0", extended_mode (optional): true|false, audio_input (optional): "AUTO|EXTERNAL|INTERNAL", speaker_distance (optional): "1.5"] diff --git a/framework/core/hdmiCECController.py b/framework/core/hdmiCECController.py index d16a0ea..9834b78 100644 --- a/framework/core/hdmiCECController.py +++ b/framework/core/hdmiCECController.py @@ -39,6 +39,7 @@ from framework.core.logModule import logModule from framework.core.streamToFile import StreamToFile from framework.core.hdmicecModules import CECClientController, RemoteCECClient, CECDeviceType +from framework.core.hdmicecModules.virtualCECController import virtualCECController class HDMICECController(): """ @@ -72,6 +73,17 @@ def __init__(self, log: logModule, config: dict): password=config.get('password',''), port=config.get('port',22), prompt=config.get('prompt', ':~')) + elif self.controllerType.lower() == 'virtual-cec-client': + self.controller = virtualCECController(self.cecAdaptor, + self._log, + self._stream, + address=config.get('address'), + username=config.get('username',''), + password=config.get('password',''), + port=config.get('port',22), + prompt=config.get('prompt', '~#'), + device_configuration=config.get('device_network_configuration',''), + control_port=config.get('control_port', 8080)) self._read_line = 0 def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> None: diff --git a/framework/core/hdmicecModules/abstractCECController.py b/framework/core/hdmicecModules/abstractCECController.py index 89e5cf3..39f0f04 100644 --- a/framework/core/hdmicecModules/abstractCECController.py +++ b/framework/core/hdmicecModules/abstractCECController.py @@ -49,7 +49,7 @@ def __init__(self, adaptor_path:str, logger:logModule, streamLogger: StreamToFil def sendMessage(cls, sourceAddress: str, destAddress: str, opCode: str, payload: list = None, deviceType: CECDeviceType=None) -> None: """ Sends an opCode from a specified source and to a specified destination. - + Args: sourceAddress (str): The logical address of the source device (0-9 or A-F). destAddress (str): The logical address of the destination device (0-9 or A-F). diff --git a/framework/core/hdmicecModules/configuration/virtual_cec_print_device_network_configuration.yaml b/framework/core/hdmicecModules/configuration/virtual_cec_print_device_network_configuration.yaml new file mode 100644 index 0000000..c5a2d5e --- /dev/null +++ b/framework/core/hdmicecModules/configuration/virtual_cec_print_device_network_configuration.yaml @@ -0,0 +1,24 @@ +#** ***************************************************************************** +# * +# * If not stated otherwise in this file or this component's LICENSE file the +# * following copyright and licenses apply: +# * +# * Copyright 2025 RDK Management +# * +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +#** ****************************************************************************** +HdmiCec: + command: print + description: Print the current HDMI CEC device map \ No newline at end of file diff --git a/framework/core/hdmicecModules/virtualCECController.py b/framework/core/hdmicecModules/virtualCECController.py new file mode 100644 index 0000000..1f57ac7 --- /dev/null +++ b/framework/core/hdmicecModules/virtualCECController.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +#** ***************************************************************************** +# * +# * If not stated otherwise in this file or this component's LICENSE file the +# * following copyright and licenses apply: +# * +# * Copyright 2025 RDK Management +# * +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * +# http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +#* ****************************************************************************** + +import os +import sys +import time +import re +import yaml + +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(dir_path) +sys.path.append(os.path.join(dir_path, "../")) + +from framework.core.logModule import logModule +from framework.core.streamToFile import StreamToFile +from framework.core.utPlaneController import utPlaneController +from .abstractCECController import CECInterface +from framework.core.commandModules.sshConsole import sshConsole + +HDMICEC_DEVICE_LIST_FILE = "/tmp/hdmi_cec_device_list_info.txt" +HDMICEC_PRINT_CEC_NETWORK_CONFIG_FILE = os.path.join(dir_path, "configuration", "virtual_cec_print_device_network_configuration.yaml") +HOST_CMD_EXEC_TIMEOUT = 2 + +class virtualCECController(CECInterface): + """ + HDMI CEC related utility functions + """ + def __init__(self, adaptor: str, logger: logModule, streamLogger: StreamToFile, + address: str, username: str = '', password: str = '', port: int = 22, prompt = '~#', + device_configuration:str = '', control_port:int = 8080): + """ + Initializes the virtualCECController class for HDMI CEC device communication. + + Args: + adaptor (str): The adaptor file path for the parent class. + logger (logModule): Logger module instance for logging operations. + streamLogger (StreamToFile): Stream logger for file-based logging. + address (str): IP address or hostname of the remote device. + username (str, optional): SSH username for authentication. Defaults to ''. + password (str, optional): SSH password for authentication. Defaults to ''. + port (int, optional): SSH port number for connection. Defaults to 22. + prompt (str, optional): Command prompt string for the SSH session. Defaults to '~#'. + device_configuration (str, optional): Path to the HDMI CEC device network configuration YAML file. Defaults to ''. + control_port (int, optional): Port number for ut-controller communication. Defaults to 8080. + + """ + super().__init__(adaptor, logger, streamLogger) + + self.control_port = control_port + self.commandPrompt = prompt + + try: + # Load the HDMI CEC device network configuration file + with open(device_configuration, "r") as f: + config_dict = yaml.safe_load(f) + + self.cecDeviceNetworkConfigString = yaml.dump(config_dict) + + # Load the print configuration file + with open(HDMICEC_PRINT_CEC_NETWORK_CONFIG_FILE, "r") as f: + print_dict = yaml.safe_load(f) + + self.printConfigString = yaml.dump(print_dict) + + self.session = sshConsole(self._log, address, username, password, port=port, prompt=prompt) + + self.utPlaneController = utPlaneController(self.session, port=self.control_port) + except FileNotFoundError: + self._log.critical(f"Device config file not found") + raise + except yaml.YAMLError as e: + self._log.critical(f"Invalid YAML in device config file: {e}") + raise + except Exception as e: + self._log.critical(f"Failed to load device configuration: {e}") + raise + + def loadCecDeviceNetworkConfiguration(self, configString: str): + """ + Loads the HDMI CEC device network configuration file on to the vComponent. + """ + + self.utPlaneController.sendMessage(configString) + + def readDeviceNetworkList(self) -> list: + """ + Reads the device network list from the HDMI CEC device. + + Returns: + list: A list of dictionaries representing discovered devices with details. + """ + result = self.session.read_until(self.commandPrompt) + + result = re.sub(r'\x1b\[[0-9;]*m', '', result) # remove ANSI color codes + result = result.replace('\r', '') # normalize newlines + result = re.sub(r'root@[\w\-\:\/# ]+', '', result) # remove shell prompt lines + result = re.sub(r'curl:.*?\n', '', result, flags=re.DOTALL) # remove curl noise if any + + devices = [] + + # Regex to match device lines + pattern = re.compile( + r"- Name:\s*(?P[^,]+),.*?" + r"Active Source:\s*(?P\d+),.*?" + r"Logical-1:\s*(?P-?\d+),.*?" + r"Physical:\s*(?P[\d\.]+)", + re.MULTILINE + ) + + for match in pattern.finditer(result): + devices.append({ + "name": match.group("name").strip(), + "physical address": match.group("physical"), + "logical address": int(match.group("logical1")), + "active source": int(match.group("active")), + }) + + return devices + + def listDevices(self) -> list: + """ + Lists the devices currently available on the HDMI CEC network. + + Returns: + list: A list of dictionaries representing discovered devices with details. + { + "name": "name", + "physical address": "0.0.0.0", + "logical address": 0, + "active source": 0, + } + """ + # send command to CEC network to print device configuration + self.utPlaneController.sendMessage(self.printConfigString) + + devices = self.readDeviceNetworkList() + + if devices is None or len(devices) == 0: + self.session.write("cat " + HDMICEC_DEVICE_LIST_FILE) + time.sleep(HOST_CMD_EXEC_TIMEOUT) + devices = self.readDeviceNetworkList() + + return devices + + def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> bool: + """ + This function sends a specified opCode. + + Args: + sourceAddress (str): The logical address of the source device ('0'-'F'). + destAddress (str): The logical address of the destination device ('0'-'F'). + opCode (str): Operation code to send as a hexadecimal string e.g 0x81. + payload (list): List of hexadecimal strings to be sent with the opCode. Optional. + + Returns: + bool: True if the message was sent successfully, False otherwise. + """ + # Format the payload: source, destination, opCode, and payload + msg_payload = [f"0x{sourceAddress}{destAddress}", opCode] + if payload: + msg_payload.extend(payload) + + yaml_content = ( + "HdmiCec:\n" + " command: cec_message\n" + " description: Send a CEC message\n" + " message:\n" + " user_defined: true\n" + f" payload: {msg_payload}\n" + ) + + try: + result = self.utPlaneController.sendMessage(yaml_content) + return bool(result) + except Exception as e: + self._log.critical(f"Failed to send CEC message: {e}") + return False + + def start(self): + self.loadCecDeviceNetworkConfiguration(self.cecDeviceNetworkConfigString) + + def stop(self): + pass + + def receiveMessage(self,sourceAddress: str, destAddress: str, opCode: str, timeout: int = 10, payload: list = None) -> bool: + """ + For a virtual component, the device network is only a simulation, so the controller + does not generate any logs when messages are sent or received. Therefore, it always returns expected message. + + Args: + sourceAddress (str): The logical address of the source device (0-9 or A-F). + destAddress (str): The logical address of the destination device (0-9 or A-F). + opCode (str): Operation code to send as an hexidecimal string e.g 0x81. + timeout (int): The maximum amount of time, in seconds, that the method will + wait for the message to be received. Defaults to 10. + payload (list): List of hexidecimal strings to be sent with the opCode. Optional. + + Returns: + list: list of strings containing found message. + """ + message = "Received expected message" + return message diff --git a/framework/core/utPlaneController.py b/framework/core/utPlaneController.py new file mode 100644 index 0000000..386d044 --- /dev/null +++ b/framework/core/utPlaneController.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +#** ***************************************************************************** +# * +# * If not stated otherwise in this file or this component's LICENSE file the +# * following copyright and licenses apply: +# * +# * Copyright 2025 RDK Management +# * +# * Licensed under the Apache License, Version 2.0 (the "License"); +# * you may not use this file except in compliance with the License. +# * You may obtain a copy of the License at +# * +# * +# http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# * +#* ****************************************************************************** + +import os +import sys + +dir_path = os.path.dirname(os.path.realpath(__file__)) +sys.path.append(dir_path) +sys.path.append(os.path.join(dir_path, "..", "..", "..")) + +from framework.core.logModule import logModule + +class utPlaneController(): + """ + UT Plane Controller class for managing communication with the ut-controller. + + This class provides an interface to interact with the ut-controller running on a device. + It facilitates sending commands and YAML configuration files to the controller via HTTP requests + using curl commands. The controller operates over a configurable port (default: 8080) and uses + the session object to execute commands on the target device. + + Typical usage involves: + 1. Creating an instance with an active session and optional port/log configuration + 2. Preparing YAML files with test commands or configuration + 3. Sending these files to the ut-controller using sendMessage() + + Attributes: + session (object): Active session object for device communication + port (int): Port number where ut-controller service is listening (default: 8080) + log (logModule): Logger instance for recording controller activities and debugging + + Example: + >>> controller = utPlaneController(session, port=8080) + >>> controller.sendMessage("/path/on/dut/to/test_config.yaml") + >>> controller.sendMessage("yaml string directly") + """ + + def __init__(self, session:object, port: int = 8080, log: logModule = None): + """ + Initializes UT Plane Controller class. + + Args: + session (class): The session object to communicate with the device + port (int): The port number for the controller + log (class, optional): Parent log class. Defaults to None. + """ + + # Validate session + if session is None: + raise ValueError("session cannot be None") + + self.log = log + if log is None: + self.log = logModule(self.__class__.__name__) + self.log.setLevel( self.log.INFO ) + + self.session = session + self.port = port + + def sendMessage(self, yamlInput: str, isFile: bool = False) -> bool: + """ + Sends a command to the ut-controller via curl. + + Args: + yamlInput (str): Either a YAML string or path to a YAML file. + isFile (bool): Flag indicating if yamlInput is a file path. Defaults to False. + + Returns: + bool: True if the message was sent successfully, False otherwise. + """ + try: + # Validate input + if not yamlInput or not isinstance(yamlInput, str): + self.log.error("Invalid input provided") + return False + + if isFile: + # It's a file path on target device - use --data-binary with file reference + yaml_content = yamlInput + cmd = f'curl -X POST -H "Content-Type: application/x-yaml" --data-binary @"{yaml_content}" "http://localhost:{self.port}/api/postKVP"' + else: + # It's a direct YAML string - escape quotes and send inline + yaml_content = yamlInput.replace('"', '\\"') + cmd = f'curl -X POST -H "Content-Type: application/x-yaml" --data-binary "{yaml_content}" "http://localhost:{self.port}/api/postKVP"' + + result = True + + # Send command + result = self.session.write(cmd) + + self.log.info(f"Message sent successfully to ut-controller on port {self.port}") + return result + + except Exception as e: + self.log.error(f"Failed to send message to ut-controller: {str(e)}") + return False