From 55547bd52ffdeb2fc7bab40a5f07677265dee08e Mon Sep 17 00:00:00 2001 From: Nick Brook Date: Thu, 27 Mar 2025 14:25:19 +0000 Subject: [PATCH 1/7] python: Accept baudrate parameter to threadsafe wrapper Signed-off-by: Nick Brook --- host/min.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/host/min.py b/host/min.py index 31a4ead..9bb1ff1 100644 --- a/host/min.py +++ b/host/min.py @@ -819,8 +819,8 @@ class ThreadsafeTransportMINSerialHandler(MINTransportSerial): The application can send directly and pick up incoming frames from the queue. """ - def __init__(self, port, loglevel=ERROR): - super().__init__(port=port, loglevel=loglevel) + def __init__(self, port, baudrate=9600, loglevel=ERROR): + super().__init__(port=port, baudrate=baudrate, loglevel=loglevel) self._thread_lock = Lock() def close(self): From 3bbc94d43216f7c882c6e82f7520c457f4f99ac0 Mon Sep 17 00:00:00 2001 From: Nick Brook Date: Thu, 27 Mar 2025 14:25:37 +0000 Subject: [PATCH 2/7] python: Add interactive terminal Signed-off-by: Nick Brook --- host/min_terminal.py | 268 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 268 insertions(+) create mode 100644 host/min_terminal.py diff --git a/host/min_terminal.py b/host/min_terminal.py new file mode 100644 index 0000000..12ec80f --- /dev/null +++ b/host/min_terminal.py @@ -0,0 +1,268 @@ +""" +Interactive terminal program for sending and receiving MIN frames. +Supports both hex and string input modes. +""" +import argparse +from struct import unpack +from time import sleep +import threading +import logging +import serial.tools.list_ports +from typing import List, Optional + +from min import ThreadsafeTransportMINSerialHandler + +# Set up logger for this module +logger = logging.getLogger(__name__) + + +def bytes_to_int32(data: bytes, big_endian=True) -> int: + """Convert 4 bytes to a 32-bit integer.""" + if len(data) != 4: + raise ValueError("int32 should be exactly 4 bytes") + if big_endian: + return unpack('>I', data)[0] + else: + return unpack(' bytes: + """Convert a hex string to bytes, handling spaces and 0x prefixes.""" + # Remove spaces and 0x prefixes + hex_str = hex_str.replace(' ', '').replace('0x', '') + return bytes.fromhex(hex_str) + + +def log_and_print(message, level=logging.INFO, reprint_input_prompt=False, hex_mode=False): + """Print message to console and log it.""" + print(("\n" if reprint_input_prompt else "") + message) + logger.log(level, message) + if reprint_input_prompt: + if hex_mode: + print("Enter hex payload: ", end='', flush=True) + else: + print("Enter string payload: ", end='', flush=True) + + + +def receive_frames_thread( + min_handler, hex_mode: bool, stop_event: threading.Event, + callback=None, print_prompt=True +): + """Thread function to continuously poll for and display received frames.""" + while not stop_event.is_set(): + frames = min_handler.poll() + for frame in frames: + # Run callback if provided before standard handling + if callback and callback(frame): + # Skip standard handling if callback returns True + continue + + if hex_mode: + data = frame.payload.hex() + else: + try: + data = frame.payload.decode('ascii') + except UnicodeDecodeError: + data = frame.payload.hex() + msg = "Frame received: min ID={0} {1}".format(frame.min_id, data) + log_and_print(msg, reprint_input_prompt=True, hex_mode=hex_mode) + + sleep(0.05) # Small delay to prevent CPU hogging + + +def parse_log_level(level_name): + """Convert a log level name to the corresponding logging level.""" + levels = { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL + } + level_name = level_name.lower() + if level_name not in levels: + raise ValueError(f"Invalid log level: {level_name}") + return levels[level_name] + + +def setup_min_handler(port, baudrate, loglevel=logging.ERROR): + """Set up and return a MIN handler with the given parameters.""" + # Set up logging configuration + log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + logging.basicConfig(level=loglevel, format=log_format) + + min_handler = ThreadsafeTransportMINSerialHandler( + port=port, + baudrate=baudrate, + loglevel=loglevel + ) + return min_handler + + +def start_min_terminal(min_handler, hex_mode=False, min_id=0x00, frame_callback=None): + """Start an interactive MIN terminal.""" + log_and_print(f"Mode: {'Hex' if hex_mode else 'String'}") + log_and_print(f"Using MIN ID: 0x{min_id:02X}") + log_and_print("Press Ctrl+C to exit") + if hex_mode: + log_and_print("Input format: '0x01 02 03' or '01 02 03'") + + # Create a stop event and a thread for receiving frames + stop_event = threading.Event() + receive_thread = threading.Thread( + target=receive_frames_thread, + args=(min_handler, hex_mode, stop_event, frame_callback), + daemon=True + ) + receive_thread.start() + + try: + while True: + # Get input from user + if hex_mode: + user_input = input("Enter hex payload: ") + try: + payload = parse_hex_input(user_input) + except ValueError as e: + log_and_print(f"Invalid hex input: {e}", logging.ERROR) + continue + else: + user_input = input("Enter string payload: ") + payload = user_input.encode('ascii') + + # Send the frame using the specified MIN ID + min_handler.queue_frame(min_id=min_id, payload=payload) + + except KeyboardInterrupt: + log_and_print("\nTerminating...") + stop_event.set() # Signal the thread to stop + return stop_event + except Exception as e: + log_and_print(f"Error: {e}", logging.ERROR) + return stop_event + + +def get_available_ports() -> List[str]: + """Get list of available serial ports. + + Returns: + List[str]: List of available port names + """ + return [port.device for port in serial.tools.list_ports.comports()] + + +def select_port(port: Optional[str] = None) -> str: + """Select a serial port interactively if none specified. + + Args: + port: Optional port name to use directly + + Returns: + str: Selected port name + + Raises: + RuntimeError: If no ports are available or selection fails + """ + if port: + return port + + ports = get_available_ports() + if not ports: + raise RuntimeError("No serial ports found") + + print("\nAvailable ports:") + for i, port_name in enumerate(ports, 1): + print(f"{i}. {port_name}") + + while True: + try: + choice = input("\nSelect port number: ") + index = int(choice) - 1 + if 0 <= index < len(ports): + return ports[index] + print("Invalid selection") + except ValueError: + print("Please enter a number") + except KeyboardInterrupt: + print("Exiting...") + exit(0) + + +def parse_args(): + """Parse command line arguments for MIN terminal functionality.""" + parser = argparse.ArgumentParser(description='Interactive MIN terminal') + parser.add_argument( + '--port', '-p', + help='Serial port (e.g., /dev/tty.usbmodem1421)' + ) + parser.add_argument( + '--hex', + action='store_true', + help='Use hex input mode' + ) + parser.add_argument( + '--min-id', + type=lambda x: int(x, 0), # Allows for hex (0x01) or decimal input + default=0x01, + help='MIN ID to use when sending frames (default: 0x01)' + ) + parser.add_argument( + '--baudrate', + type=int, + default=9600, + help='Baudrate for serial communication (default: 9600)' + ) + parser.add_argument( + '--log-level', + type=parse_log_level, + default=logging.ERROR, + help='Set logging level: debug, info, warning, error, critical ' + '(default: error)' + ) + args = parser.parse_args() + + args.port = select_port(args.port) + + # Validate MIN ID range (0-63 as per the spec) + if args.min_id not in range(64): + parser.error("MIN ID must be in range 0-63") + + return args + + +def main(): + """Run the MIN terminal.""" + args = parse_args() + + # Set up and connect MIN handler + min_handler = setup_min_handler( + port=args.port, + baudrate=args.baudrate, + loglevel=args.log_level + ) + + log_and_print(f"Connected to {args.port} at {args.baudrate} baud") + # Use a dictionary to map logging levels to their names + level_names = { + logging.DEBUG: 'DEBUG', + logging.INFO: 'INFO', + logging.WARNING: 'WARNING', + logging.ERROR: 'ERROR', + logging.CRITICAL: 'CRITICAL' + } + log_and_print(f"Log level: {level_names.get(args.log_level, 'UNKNOWN')}") + + # Start the interactive terminal + stop_event = start_min_terminal( + min_handler=min_handler, + hex_mode=args.hex, + min_id=args.min_id + ) + + # Cleanup if the terminal exits + stop_event.set() + + +if __name__ == "__main__": + main() From 9e9df49cb5dcc89108b8f5bf9d7957b5d8cd38e2 Mon Sep 17 00:00:00 2001 From: Nick Brook Date: Wed, 9 Apr 2025 20:22:25 +0100 Subject: [PATCH 3/7] Improved log message Signed-off-by: Nick Brook --- host/min.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/host/min.py b/host/min.py index 9bb1ff1..ccea00b 100644 --- a/host/min.py +++ b/host/min.py @@ -560,7 +560,7 @@ def _rx_bytes(self, data: bytes): if self._rx_frame_checksum != computed_checksum: min_logger.warning( - "CRC mismatch (0x%08X vs 0x%08X), frame dropped", + "CRC mismatch (0x%08X received vs 0x%08X computed), frame dropped", self._rx_frame_checksum, computed_checksum, ) From 4a993f704c470b560250c208a6952299845cf049 Mon Sep 17 00:00:00 2001 From: Nick Brook Date: Wed, 9 Apr 2025 20:24:25 +0100 Subject: [PATCH 4/7] Add all arguments to subclass init Signed-off-by: Nick Brook --- host/min.py | 71 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 66 insertions(+), 5 deletions(-) diff --git a/host/min.py b/host/min.py index ccea00b..277b007 100644 --- a/host/min.py +++ b/host/min.py @@ -793,11 +793,32 @@ def _serial_read_all(self): def _serial_close(self): self._serial.close() - def __init__(self, port, baudrate=9600, loglevel=ERROR): + def __init__( + self, + port, + baudrate=9600, + window_size=8, + rx_window_size=16, + transport_fifo_size=100, + idle_timeout_ms=3000, + ack_retransmit_timeout_ms=25, + frame_retransmit_timeout_ms=50, + loglevel=ERROR, + ): """ Open MIN connection on a given port. :param port: serial port - :param debug: + :param baudrate: baud rate + :param window_size: Number of outstanding unacknowledged frames + permitted to send + :param rx_window_size: Number of outstanding unacknowledged frames + that can be received + :param transport_fifo_size: Maximum number of outstanding frames to send + :param idle_timeout_ms: Time before connection assumed to have been lost and + retransmissions stopped + :param ack_retransmit_timeout_ms: Time before ACK frames are resent + :param frame_retransmit_timeout_ms: Time before frames are resent + :param loglevel: set the logging desired """ self.fake_errors = False try: @@ -806,7 +827,13 @@ def __init__(self, port, baudrate=9600, loglevel=ERROR): self._serial.reset_output_buffer() except SerialException: raise MINConnectionError(f"Transport MIN cannot open port '{port}'") - super().__init__(loglevel=loglevel) + super().__init__(window_size=window_size, + rx_window_size=rx_window_size, + transport_fifo_size=transport_fifo_size, + idle_timeout_ms=idle_timeout_ms, + ack_retransmit_timeout_ms=ack_retransmit_timeout_ms, + frame_retransmit_timeout_ms=frame_retransmit_timeout_ms, + loglevel=loglevel) class ThreadsafeTransportMINSerialHandler(MINTransportSerial): @@ -819,8 +846,42 @@ class ThreadsafeTransportMINSerialHandler(MINTransportSerial): The application can send directly and pick up incoming frames from the queue. """ - def __init__(self, port, baudrate=9600, loglevel=ERROR): - super().__init__(port=port, baudrate=baudrate, loglevel=loglevel) + def __init__( + self, + port, + baudrate=9600, + window_size=8, + rx_window_size=16, + transport_fifo_size=100, + idle_timeout_ms=3000, + ack_retransmit_timeout_ms=25, + frame_retransmit_timeout_ms=50, + loglevel=ERROR, + ): + """ + Open MIN connection on a given port. + :param port: serial port + :param baudrate: baud rate + :param window_size: Number of outstanding unacknowledged frames + permitted to send + :param rx_window_size: Number of outstanding unacknowledged frames + that can be received + :param transport_fifo_size: Maximum number of outstanding frames to send + :param idle_timeout_ms: Time before connection assumed to have been lost and + retransmissions stopped + :param ack_retransmit_timeout_ms: Time before ACK frames are resent + :param frame_retransmit_timeout_ms: Time before frames are resent + :param loglevel: set the logging desired + """ + super().__init__(port=port, + baudrate=baudrate, + window_size=window_size, + rx_window_size=rx_window_size, + transport_fifo_size=transport_fifo_size, + idle_timeout_ms=idle_timeout_ms, + ack_retransmit_timeout_ms=ack_retransmit_timeout_ms, + frame_retransmit_timeout_ms=frame_retransmit_timeout_ms, + loglevel=loglevel) self._thread_lock = Lock() def close(self): From f15079373b6efce548b940ab0739f0cc54606eb4 Mon Sep 17 00:00:00 2001 From: Nick Brook Date: Wed, 9 Apr 2025 20:24:42 +0100 Subject: [PATCH 5/7] Improve threadsafe wrapper Signed-off-by: Nick Brook --- host/min.py | 62 +++++++++++++++++++++-------------------------------- 1 file changed, 24 insertions(+), 38 deletions(-) diff --git a/host/min.py b/host/min.py index 277b007..4974429 100644 --- a/host/min.py +++ b/host/min.py @@ -883,52 +883,38 @@ def __init__( frame_retransmit_timeout_ms=frame_retransmit_timeout_ms, loglevel=loglevel) self._thread_lock = Lock() - - def close(self): + + def _with_lock(self, func): + """Execute a function with the thread lock acquired. + + Args: + func: Function to execute with the lock + + Returns: + The result of the function + """ self._thread_lock.acquire() try: - super().close() - except Exception as e: + return func() + finally: self._thread_lock.release() - raise e - self._thread_lock.release() - def transport_stats(self): - self._thread_lock.acquire() - try: - result = super().transport_stats() - except Exception as e: - self._thread_lock.release() - raise e - self._thread_lock.release() + def close(self): + self._with_lock(super().close) - return result + def transport_stats(self): + return self._with_lock(super().transport_stats) + + def transport_reset(self): + self._with_lock(super().transport_reset) def send_frame(self, min_id: int, payload: bytes): - self._thread_lock.acquire() - try: - super().send_frame(min_id=min_id, payload=payload) - except Exception as e: - self._thread_lock.release() - raise e - self._thread_lock.release() + parent_send_frame = super().send_frame + self._with_lock(lambda: parent_send_frame(min_id=min_id, payload=payload)) def queue_frame(self, min_id: int, payload: bytes): - self._thread_lock.acquire() - try: - super().queue_frame(min_id=min_id, payload=payload) - except Exception as e: - self._thread_lock.release() - raise e - self._thread_lock.release() + parent_queue_frame = super().queue_frame + self._with_lock(lambda: parent_queue_frame(min_id=min_id, payload=payload)) def poll(self): - self._thread_lock.acquire() - try: - result = super().poll() - except Exception as e: - self._thread_lock.release() - raise e - self._thread_lock.release() - - return result + return self._with_lock(super().poll) From f8527dd43edc332bf4703c4060388a7e11affa31 Mon Sep 17 00:00:00 2001 From: Nick Brook Date: Wed, 9 Apr 2025 20:24:59 +0100 Subject: [PATCH 6/7] Improve min terminal Signed-off-by: Nick Brook --- host/min_terminal.py | 81 ++++++++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/host/min_terminal.py b/host/min_terminal.py index 12ec80f..877befc 100644 --- a/host/min_terminal.py +++ b/host/min_terminal.py @@ -71,19 +71,22 @@ def receive_frames_thread( sleep(0.05) # Small delay to prevent CPU hogging -def parse_log_level(level_name): - """Convert a log level name to the corresponding logging level.""" - levels = { +log_levels_map = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL } + + +def parse_log_level(level_name: str) -> int: + """Convert a log level name to the corresponding logging level.""" + level_name = level_name.lower() - if level_name not in levels: + if level_name not in log_levels_map: raise ValueError(f"Invalid log level: {level_name}") - return levels[level_name] + return log_levels_map[level_name] def setup_min_handler(port, baudrate, loglevel=logging.ERROR): @@ -117,6 +120,8 @@ def start_min_terminal(min_handler, hex_mode=False, min_id=0x00, frame_callback= ) receive_thread.start() + min_handler.transport_reset() + try: while True: # Get input from user @@ -189,51 +194,85 @@ def select_port(port: Optional[str] = None) -> str: exit(0) -def parse_args(): - """Parse command line arguments for MIN terminal functionality.""" - parser = argparse.ArgumentParser(description='Interactive MIN terminal') +def add_port_arg(parser): parser.add_argument( '--port', '-p', help='Serial port (e.g., /dev/tty.usbmodem1421)' ) - parser.add_argument( - '--hex', - action='store_true', - help='Use hex input mode' - ) + + +def add_min_id_arg(parser): parser.add_argument( '--min-id', type=lambda x: int(x, 0), # Allows for hex (0x01) or decimal input default=0x01, help='MIN ID to use when sending frames (default: 0x01)' ) + + +def add_baudrate_arg(parser): parser.add_argument( '--baudrate', type=int, default=9600, help='Baudrate for serial communication (default: 9600)' ) + + +def add_log_level_arg(parser): parser.add_argument( '--log-level', - type=parse_log_level, - default=logging.ERROR, + type=str, + default="error", + choices=log_levels_map.keys(), help='Set logging level: debug, info, warning, error, critical ' '(default: error)' ) + + +def add_hex_arg(parser): + parser.add_argument( + '--hex', + action='store_true', + help='Use hex input mode' + ) + + +def parse_args(parser,add_port=True, add_log_level=True, add_hex=True, add_min_id=True, add_baudrate=True): + """Parse command line arguments for MIN terminal functionality.""" + + if add_port: + add_port_arg(parser) + if add_log_level: + add_log_level_arg(parser) + if add_hex: + add_hex_arg(parser) + if add_min_id: + add_min_id_arg(parser) + if add_baudrate: + add_baudrate_arg(parser) args = parser.parse_args() - args.port = select_port(args.port) + if add_port: + args.port = select_port(args.port) + if add_log_level: + args.log_level = parse_log_level(args.log_level) + if not add_hex: + args.hex = True + if add_min_id: + # Validate MIN ID range (0-63 as per the spec) + if args.min_id not in range(64): + parser.error("MIN ID must be in range 0-63") + if not add_baudrate: + args.baudrate = 9600 - # Validate MIN ID range (0-63 as per the spec) - if args.min_id not in range(64): - parser.error("MIN ID must be in range 0-63") - return args def main(): """Run the MIN terminal.""" - args = parse_args() + parser = argparse.ArgumentParser(description="MIN terminal") + args = parse_args(parser=parser) # Set up and connect MIN handler min_handler = setup_min_handler( From f6d7a2660edec7f15d96d98d5fb6d6f47695a651 Mon Sep 17 00:00:00 2001 From: Nick Brook Date: Thu, 4 Sep 2025 08:37:25 +0100 Subject: [PATCH 7/7] Allow setting min terminal defaults from external scripts Signed-off-by: Nick Brook --- host/min_terminal.py | 68 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/host/min_terminal.py b/host/min_terminal.py index 877befc..575dbcc 100644 --- a/host/min_terminal.py +++ b/host/min_terminal.py @@ -4,6 +4,7 @@ """ import argparse from struct import unpack +import sys from time import sleep import threading import logging @@ -16,6 +17,44 @@ logger = logging.getLogger(__name__) +# Module-level defaults for CLI arguments. Can be modified at runtime via +# set_default_args() so that embedding applications can change +# defaults without altering user-provided values. +DEFAULT_BAUDRATE = 9600 +DEFAULT_MIN_ID = 0x01 +DEFAULT_HEX_MODE = False +DEFAULT_LOG_LEVEL_NAME = "error" + + +def set_default_args( + *, + baudrate: Optional[int] = None, + min_id: Optional[int] = None, + hex_mode: Optional[bool] = None, + log_level_name: Optional[str] = None, +) -> None: + """Set module-level default CLI values used when arguments are omitted. + + Args: + baudrate: Default baudrate for serial communication. + min_id: Default MIN ID to use for sending frames (0-63). + hex_mode: Default input mode for interactive terminal (True for hex). + log_level_name: Default log level name (debug, info, warning, error, critical). + + Returns: + None + """ + global DEFAULT_BAUDRATE, DEFAULT_MIN_ID, DEFAULT_HEX_MODE, DEFAULT_LOG_LEVEL_NAME + if baudrate is not None: + DEFAULT_BAUDRATE = baudrate + if min_id is not None: + DEFAULT_MIN_ID = min_id + if hex_mode is not None: + DEFAULT_HEX_MODE = hex_mode + if log_level_name is not None: + DEFAULT_LOG_LEVEL_NAME = log_level_name + + def bytes_to_int32(data: bytes, big_endian=True) -> int: """Convert 4 bytes to a 32-bit integer.""" if len(data) != 4: @@ -160,6 +199,8 @@ def get_available_ports() -> List[str]: def select_port(port: Optional[str] = None) -> str: """Select a serial port interactively if none specified. + If there is only one port, it will be returned directly. + Args: port: Optional port name to use directly @@ -176,6 +217,9 @@ def select_port(port: Optional[str] = None) -> str: if not ports: raise RuntimeError("No serial ports found") + if len(ports) == 1: + return ports[0] + print("\nAvailable ports:") for i, port_name in enumerate(ports, 1): print(f"{i}. {port_name}") @@ -205,7 +249,7 @@ def add_min_id_arg(parser): parser.add_argument( '--min-id', type=lambda x: int(x, 0), # Allows for hex (0x01) or decimal input - default=0x01, + default=DEFAULT_MIN_ID, help='MIN ID to use when sending frames (default: 0x01)' ) @@ -214,7 +258,7 @@ def add_baudrate_arg(parser): parser.add_argument( '--baudrate', type=int, - default=9600, + default=DEFAULT_BAUDRATE, help='Baudrate for serial communication (default: 9600)' ) @@ -223,7 +267,7 @@ def add_log_level_arg(parser): parser.add_argument( '--log-level', type=str, - default="error", + default=DEFAULT_LOG_LEVEL_NAME, choices=log_levels_map.keys(), help='Set logging level: debug, info, warning, error, critical ' '(default: error)' @@ -231,11 +275,9 @@ def add_log_level_arg(parser): def add_hex_arg(parser): - parser.add_argument( - '--hex', - action='store_true', - help='Use hex input mode' - ) + parser.add_argument('--hex', dest='hex', action='store_true', help='Use hex input mode') + parser.add_argument('--no-hex', dest='hex', action='store_false', help='Use string input mode') + parser.set_defaults(hex=DEFAULT_HEX_MODE) def parse_args(parser,add_port=True, add_log_level=True, add_hex=True, add_min_id=True, add_baudrate=True): @@ -258,13 +300,13 @@ def parse_args(parser,add_port=True, add_log_level=True, add_hex=True, add_min_i if add_log_level: args.log_level = parse_log_level(args.log_level) if not add_hex: - args.hex = True + args.hex = DEFAULT_HEX_MODE if DEFAULT_HEX_MODE is not None else True if add_min_id: # Validate MIN ID range (0-63 as per the spec) if args.min_id not in range(64): parser.error("MIN ID must be in range 0-63") if not add_baudrate: - args.baudrate = 9600 + args.baudrate = DEFAULT_BAUDRATE return args @@ -272,7 +314,11 @@ def parse_args(parser,add_port=True, add_log_level=True, add_hex=True, add_min_i def main(): """Run the MIN terminal.""" parser = argparse.ArgumentParser(description="MIN terminal") - args = parse_args(parser=parser) + try: + args = parse_args(parser=parser) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) # Set up and connect MIN handler min_handler = setup_min_handler(