Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/configs/example_rack_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ rackConfig:
# [type: "tapo", ip: "", username: "", password: "", outlet: "optional"]
# [type: "hs100", ip:"", port:"optional" ] kara also supports hs100
# [type: "apc", ip:"", username:"", password:"" ] rack apc switch
# [type: "olimex", ip:"", port:"optional", relay:"" ]
# [type: "olimex", ip:"", port:"optional", relay:"" ]
# [type: "SLP", ip:"", username: "", password: "", outlet_id:"", port:"optional"]
# [type: "none" ] if section doesn't exist then type:none will be used

# [ hdmiCECController: optional ] - Specific hdmiCECController for the slot
# [ hdmiCECController: optional ] - Specifies hdmiCECController for the slot
# supported types:
# [type: "cec-client", adaptor: "/dev/ttycec"]
# [type: "remote-cec-client", adaptor: "/dev/ttycec", address: "192.168.99.1", username(optional): "testuser", password(optional): "testpswd", port(optional): "22"]

# [type: "virtual-cec-client", address: "127.0.0.1", username: "testuser", password: "testpswd", port: "5522", control_port: 8080, device_network_configuration: "path to device network configuration file" ]
# [ avSyncController: optional] - Specifiec AVSyncController for the slot
# supported types:
# [type: "SyncOne2", port: "/dev/ttyACM0", extended_mode (optional): true|false, audio_input (optional): "AUTO|EXTERNAL|INTERNAL", speaker_distance (optional): "1.5"]
Expand Down
12 changes: 12 additions & 0 deletions framework/core/hdmiCECController.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from framework.core.logModule import logModule
from framework.core.streamToFile import StreamToFile
from framework.core.hdmicecModules import CECClientController, RemoteCECClient, CECDeviceType
from framework.core.hdmicecModules.virtualCECController import virtualCECController

class HDMICECController():
"""
Expand Down Expand Up @@ -72,6 +73,17 @@ def __init__(self, log: logModule, config: dict):
password=config.get('password',''),
port=config.get('port',22),
prompt=config.get('prompt', ':~'))
elif self.controllerType.lower() == 'virtual-cec-client':
self.controller = virtualCECController(self.cecAdaptor,
self._log,
self._stream,
address=config.get('address'),
username=config.get('username',''),
password=config.get('password',''),
port=config.get('port',22),
prompt=config.get('prompt', '~#'),
device_configuration=config.get('device_network_configuration',''),
control_port=config.get('control_port', 8080))
self._read_line = 0

def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> None:
Expand Down
2 changes: 1 addition & 1 deletion framework/core/hdmicecModules/abstractCECController.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def __init__(self, adaptor_path:str, logger:logModule, streamLogger: StreamToFil
def sendMessage(cls, sourceAddress: str, destAddress: str, opCode: str, payload: list = None, deviceType: CECDeviceType=None) -> None:
"""
Sends an opCode from a specified source and to a specified destination.

Args:
sourceAddress (str): The logical address of the source device (0-9 or A-F).
destAddress (str): The logical address of the destination device (0-9 or A-F).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#** *****************************************************************************
# *
# * If not stated otherwise in this file or this component's LICENSE file the
# * following copyright and licenses apply:
# *
# * Copyright 2025 RDK Management
# *
# * Licensed under the Apache License, Version 2.0 (the "License");
# * you may not use this file except in compliance with the License.
# * You may obtain a copy of the License at
# *
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# *
#** ******************************************************************************
HdmiCec:
command: print
description: Print the current HDMI CEC device map
222 changes: 222 additions & 0 deletions framework/core/hdmicecModules/virtualCECController.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
#!/usr/bin/env python3
#** *****************************************************************************
# *
# * If not stated otherwise in this file or this component's LICENSE file the
# * following copyright and licenses apply:
# *
# * Copyright 2025 RDK Management
# *
# * Licensed under the Apache License, Version 2.0 (the "License");
# * you may not use this file except in compliance with the License.
# * You may obtain a copy of the License at
# *
# *
# http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# *
#* ******************************************************************************

import os
import sys
import time
import re
import yaml

dir_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(dir_path)
sys.path.append(os.path.join(dir_path, "../"))

from framework.core.logModule import logModule
from framework.core.streamToFile import StreamToFile
from framework.core.utPlaneController import utPlaneController
from .abstractCECController import CECInterface
from framework.core.commandModules.sshConsole import sshConsole

HDMICEC_DEVICE_LIST_FILE = "/tmp/hdmi_cec_device_list_info.txt"
HDMICEC_PRINT_CEC_NETWORK_CONFIG_FILE = os.path.join(dir_path, "configuration", "virtual_cec_print_device_network_configuration.yaml")
HOST_CMD_EXEC_TIMEOUT = 2

class virtualCECController(CECInterface):
"""
HDMI CEC related utility functions
"""
def __init__(self, adaptor: str, logger: logModule, streamLogger: StreamToFile,
address: str, username: str = '', password: str = '', port: int = 22, prompt = '~#',
device_configuration:str = '', control_port:int = 8080):
"""
Initializes the virtualCECController class for HDMI CEC device communication.
Args:
adaptor (str): The adaptor file path for the parent class.
logger (logModule): Logger module instance for logging operations.
streamLogger (StreamToFile): Stream logger for file-based logging.
address (str): IP address or hostname of the remote device.
username (str, optional): SSH username for authentication. Defaults to ''.
password (str, optional): SSH password for authentication. Defaults to ''.
port (int, optional): SSH port number for connection. Defaults to 22.
prompt (str, optional): Command prompt string for the SSH session. Defaults to '~#'.
device_configuration (str, optional): Path to the HDMI CEC device network configuration YAML file. Defaults to ''.
control_port (int, optional): Port number for ut-controller communication. Defaults to 8080.
"""
super().__init__(adaptor, logger, streamLogger)

self.control_port = control_port
self.commandPrompt = prompt

try:
# Load the HDMI CEC device network configuration file
with open(device_configuration, "r") as f:
config_dict = yaml.safe_load(f)

self.cecDeviceNetworkConfigString = yaml.dump(config_dict)

# Load the print configuration file
with open(HDMICEC_PRINT_CEC_NETWORK_CONFIG_FILE, "r") as f:
print_dict = yaml.safe_load(f)

self.printConfigString = yaml.dump(print_dict)

self.session = sshConsole(self._log, address, username, password, port=port, prompt=prompt)

self.utPlaneController = utPlaneController(self.session, port=self.control_port)
except FileNotFoundError:
self._log.critical(f"Device config file not found")
raise
except yaml.YAMLError as e:
self._log.critical(f"Invalid YAML in device config file: {e}")
raise
except Exception as e:
self._log.critical(f"Failed to load device configuration: {e}")
raise

def loadCecDeviceNetworkConfiguration(self, configString: str):
"""
Loads the HDMI CEC device network configuration file on to the vComponent.
"""

self.utPlaneController.sendMessage(configString)

def readDeviceNetworkList(self) -> list:
"""
Reads the device network list from the HDMI CEC device.
Returns:
list: A list of dictionaries representing discovered devices with details.
"""
result = self.session.read_until(self.commandPrompt)

result = re.sub(r'\x1b\[[0-9;]*m', '', result) # remove ANSI color codes
result = result.replace('\r', '') # normalize newlines
result = re.sub(r'root@[\w\-\:\/# ]+', '', result) # remove shell prompt lines
result = re.sub(r'curl:.*?\n', '', result, flags=re.DOTALL) # remove curl noise if any

devices = []

# Regex to match device lines
pattern = re.compile(
r"- Name:\s*(?P<name>[^,]+),.*?"
r"Active Source:\s*(?P<active>\d+),.*?"
r"Logical-1:\s*(?P<logical1>-?\d+),.*?"
r"Physical:\s*(?P<physical>[\d\.]+)",
re.MULTILINE
)

for match in pattern.finditer(result):
devices.append({
"name": match.group("name").strip(),
"physical address": match.group("physical"),
"logical address": int(match.group("logical1")),
"active source": int(match.group("active")),
})

return devices

def listDevices(self) -> list:
"""
Lists the devices currently available on the HDMI CEC network.
Returns:
list: A list of dictionaries representing discovered devices with details.
{
"name": "name",
"physical address": "0.0.0.0",
"logical address": 0,
"active source": 0,
}
"""
# send command to CEC network to print device configuration
self.utPlaneController.sendMessage(self.printConfigString)

devices = self.readDeviceNetworkList()

if devices is None or len(devices) == 0:
self.session.write("cat " + HDMICEC_DEVICE_LIST_FILE)
time.sleep(HOST_CMD_EXEC_TIMEOUT)
devices = self.readDeviceNetworkList()

return devices

def sendMessage(self, sourceAddress: str, destAddress: str, opCode: str, payload: list = None) -> bool:
"""
This function sends a specified opCode.
Args:
sourceAddress (str): The logical address of the source device ('0'-'F').
destAddress (str): The logical address of the destination device ('0'-'F').
opCode (str): Operation code to send as a hexadecimal string e.g 0x81.
payload (list): List of hexadecimal strings to be sent with the opCode. Optional.
Returns:
bool: True if the message was sent successfully, False otherwise.
"""
# Format the payload: source, destination, opCode, and payload
msg_payload = [f"0x{sourceAddress}{destAddress}", opCode]
if payload:
msg_payload.extend(payload)

yaml_content = (
"HdmiCec:\n"
" command: cec_message\n"
" description: Send a CEC message\n"
" message:\n"
" user_defined: true\n"
f" payload: {msg_payload}\n"
)

try:
result = self.utPlaneController.sendMessage(yaml_content)
return bool(result)
except Exception as e:
self._log.critical(f"Failed to send CEC message: {e}")
return False

def start(self):
self.loadCecDeviceNetworkConfiguration(self.cecDeviceNetworkConfigString)

def stop(self):
pass

def receiveMessage(self,sourceAddress: str, destAddress: str, opCode: str, timeout: int = 10, payload: list = None) -> bool:
"""
For a virtual component, the device network is only a simulation, so the controller
does not generate any logs when messages are sent or received. Therefore, it always returns expected message.
Args:
sourceAddress (str): The logical address of the source device (0-9 or A-F).
destAddress (str): The logical address of the destination device (0-9 or A-F).
opCode (str): Operation code to send as an hexidecimal string e.g 0x81.
timeout (int): The maximum amount of time, in seconds, that the method will
wait for the message to be received. Defaults to 10.
payload (list): List of hexidecimal strings to be sent with the opCode. Optional.
Returns:
list: list of strings containing found message.
"""
message = "Received expected message"
return message
Loading