diff --git a/host/min.py b/host/min.py index 31a4ead..4974429 100644 --- a/host/min.py +++ b/host/min.py @@ -560,7 +560,7 @@ def _rx_bytes(self, data: bytes): if self._rx_frame_checksum != computed_checksum: min_logger.warning( - "CRC mismatch (0x%08X vs 0x%08X), frame dropped", + "CRC mismatch (0x%08X received vs 0x%08X computed), frame dropped", self._rx_frame_checksum, computed_checksum, ) @@ -793,11 +793,32 @@ def _serial_read_all(self): def _serial_close(self): self._serial.close() - def __init__(self, port, baudrate=9600, loglevel=ERROR): + def __init__( + self, + port, + baudrate=9600, + window_size=8, + rx_window_size=16, + transport_fifo_size=100, + idle_timeout_ms=3000, + ack_retransmit_timeout_ms=25, + frame_retransmit_timeout_ms=50, + loglevel=ERROR, + ): """ Open MIN connection on a given port. :param port: serial port - :param debug: + :param baudrate: baud rate + :param window_size: Number of outstanding unacknowledged frames + permitted to send + :param rx_window_size: Number of outstanding unacknowledged frames + that can be received + :param transport_fifo_size: Maximum number of outstanding frames to send + :param idle_timeout_ms: Time before connection assumed to have been lost and + retransmissions stopped + :param ack_retransmit_timeout_ms: Time before ACK frames are resent + :param frame_retransmit_timeout_ms: Time before frames are resent + :param loglevel: set the logging desired """ self.fake_errors = False try: @@ -806,7 +827,13 @@ def __init__(self, port, baudrate=9600, loglevel=ERROR): self._serial.reset_output_buffer() except SerialException: raise MINConnectionError(f"Transport MIN cannot open port '{port}'") - super().__init__(loglevel=loglevel) + super().__init__(window_size=window_size, + rx_window_size=rx_window_size, + transport_fifo_size=transport_fifo_size, + idle_timeout_ms=idle_timeout_ms, + ack_retransmit_timeout_ms=ack_retransmit_timeout_ms, + frame_retransmit_timeout_ms=frame_retransmit_timeout_ms, + loglevel=loglevel) class ThreadsafeTransportMINSerialHandler(MINTransportSerial): @@ -819,55 +846,75 @@ class ThreadsafeTransportMINSerialHandler(MINTransportSerial): The application can send directly and pick up incoming frames from the queue. """ - def __init__(self, port, loglevel=ERROR): - super().__init__(port=port, loglevel=loglevel) + def __init__( + self, + port, + baudrate=9600, + window_size=8, + rx_window_size=16, + transport_fifo_size=100, + idle_timeout_ms=3000, + ack_retransmit_timeout_ms=25, + frame_retransmit_timeout_ms=50, + loglevel=ERROR, + ): + """ + Open MIN connection on a given port. + :param port: serial port + :param baudrate: baud rate + :param window_size: Number of outstanding unacknowledged frames + permitted to send + :param rx_window_size: Number of outstanding unacknowledged frames + that can be received + :param transport_fifo_size: Maximum number of outstanding frames to send + :param idle_timeout_ms: Time before connection assumed to have been lost and + retransmissions stopped + :param ack_retransmit_timeout_ms: Time before ACK frames are resent + :param frame_retransmit_timeout_ms: Time before frames are resent + :param loglevel: set the logging desired + """ + super().__init__(port=port, + baudrate=baudrate, + window_size=window_size, + rx_window_size=rx_window_size, + transport_fifo_size=transport_fifo_size, + idle_timeout_ms=idle_timeout_ms, + ack_retransmit_timeout_ms=ack_retransmit_timeout_ms, + frame_retransmit_timeout_ms=frame_retransmit_timeout_ms, + loglevel=loglevel) self._thread_lock = Lock() - - def close(self): + + def _with_lock(self, func): + """Execute a function with the thread lock acquired. + + Args: + func: Function to execute with the lock + + Returns: + The result of the function + """ self._thread_lock.acquire() try: - super().close() - except Exception as e: + return func() + finally: self._thread_lock.release() - raise e - self._thread_lock.release() - def transport_stats(self): - self._thread_lock.acquire() - try: - result = super().transport_stats() - except Exception as e: - self._thread_lock.release() - raise e - self._thread_lock.release() + def close(self): + self._with_lock(super().close) - return result + def transport_stats(self): + return self._with_lock(super().transport_stats) + + def transport_reset(self): + self._with_lock(super().transport_reset) def send_frame(self, min_id: int, payload: bytes): - self._thread_lock.acquire() - try: - super().send_frame(min_id=min_id, payload=payload) - except Exception as e: - self._thread_lock.release() - raise e - self._thread_lock.release() + parent_send_frame = super().send_frame + self._with_lock(lambda: parent_send_frame(min_id=min_id, payload=payload)) def queue_frame(self, min_id: int, payload: bytes): - self._thread_lock.acquire() - try: - super().queue_frame(min_id=min_id, payload=payload) - except Exception as e: - self._thread_lock.release() - raise e - self._thread_lock.release() + parent_queue_frame = super().queue_frame + self._with_lock(lambda: parent_queue_frame(min_id=min_id, payload=payload)) def poll(self): - self._thread_lock.acquire() - try: - result = super().poll() - except Exception as e: - self._thread_lock.release() - raise e - self._thread_lock.release() - - return result + return self._with_lock(super().poll) diff --git a/host/min_terminal.py b/host/min_terminal.py new file mode 100644 index 0000000..575dbcc --- /dev/null +++ b/host/min_terminal.py @@ -0,0 +1,353 @@ +""" +Interactive terminal program for sending and receiving MIN frames. +Supports both hex and string input modes. +""" +import argparse +from struct import unpack +import sys +from time import sleep +import threading +import logging +import serial.tools.list_ports +from typing import List, Optional + +from min import ThreadsafeTransportMINSerialHandler + +# Set up logger for this module +logger = logging.getLogger(__name__) + + +# Module-level defaults for CLI arguments. Can be modified at runtime via +# set_default_args() so that embedding applications can change +# defaults without altering user-provided values. +DEFAULT_BAUDRATE = 9600 +DEFAULT_MIN_ID = 0x01 +DEFAULT_HEX_MODE = False +DEFAULT_LOG_LEVEL_NAME = "error" + + +def set_default_args( + *, + baudrate: Optional[int] = None, + min_id: Optional[int] = None, + hex_mode: Optional[bool] = None, + log_level_name: Optional[str] = None, +) -> None: + """Set module-level default CLI values used when arguments are omitted. + + Args: + baudrate: Default baudrate for serial communication. + min_id: Default MIN ID to use for sending frames (0-63). + hex_mode: Default input mode for interactive terminal (True for hex). + log_level_name: Default log level name (debug, info, warning, error, critical). + + Returns: + None + """ + global DEFAULT_BAUDRATE, DEFAULT_MIN_ID, DEFAULT_HEX_MODE, DEFAULT_LOG_LEVEL_NAME + if baudrate is not None: + DEFAULT_BAUDRATE = baudrate + if min_id is not None: + DEFAULT_MIN_ID = min_id + if hex_mode is not None: + DEFAULT_HEX_MODE = hex_mode + if log_level_name is not None: + DEFAULT_LOG_LEVEL_NAME = log_level_name + + +def bytes_to_int32(data: bytes, big_endian=True) -> int: + """Convert 4 bytes to a 32-bit integer.""" + if len(data) != 4: + raise ValueError("int32 should be exactly 4 bytes") + if big_endian: + return unpack('>I', data)[0] + else: + return unpack(' bytes: + """Convert a hex string to bytes, handling spaces and 0x prefixes.""" + # Remove spaces and 0x prefixes + hex_str = hex_str.replace(' ', '').replace('0x', '') + return bytes.fromhex(hex_str) + + +def log_and_print(message, level=logging.INFO, reprint_input_prompt=False, hex_mode=False): + """Print message to console and log it.""" + print(("\n" if reprint_input_prompt else "") + message) + logger.log(level, message) + if reprint_input_prompt: + if hex_mode: + print("Enter hex payload: ", end='', flush=True) + else: + print("Enter string payload: ", end='', flush=True) + + + +def receive_frames_thread( + min_handler, hex_mode: bool, stop_event: threading.Event, + callback=None, print_prompt=True +): + """Thread function to continuously poll for and display received frames.""" + while not stop_event.is_set(): + frames = min_handler.poll() + for frame in frames: + # Run callback if provided before standard handling + if callback and callback(frame): + # Skip standard handling if callback returns True + continue + + if hex_mode: + data = frame.payload.hex() + else: + try: + data = frame.payload.decode('ascii') + except UnicodeDecodeError: + data = frame.payload.hex() + msg = "Frame received: min ID={0} {1}".format(frame.min_id, data) + log_and_print(msg, reprint_input_prompt=True, hex_mode=hex_mode) + + sleep(0.05) # Small delay to prevent CPU hogging + + +log_levels_map = { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL + } + + +def parse_log_level(level_name: str) -> int: + """Convert a log level name to the corresponding logging level.""" + + level_name = level_name.lower() + if level_name not in log_levels_map: + raise ValueError(f"Invalid log level: {level_name}") + return log_levels_map[level_name] + + +def setup_min_handler(port, baudrate, loglevel=logging.ERROR): + """Set up and return a MIN handler with the given parameters.""" + # Set up logging configuration + log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + logging.basicConfig(level=loglevel, format=log_format) + + min_handler = ThreadsafeTransportMINSerialHandler( + port=port, + baudrate=baudrate, + loglevel=loglevel + ) + return min_handler + + +def start_min_terminal(min_handler, hex_mode=False, min_id=0x00, frame_callback=None): + """Start an interactive MIN terminal.""" + log_and_print(f"Mode: {'Hex' if hex_mode else 'String'}") + log_and_print(f"Using MIN ID: 0x{min_id:02X}") + log_and_print("Press Ctrl+C to exit") + if hex_mode: + log_and_print("Input format: '0x01 02 03' or '01 02 03'") + + # Create a stop event and a thread for receiving frames + stop_event = threading.Event() + receive_thread = threading.Thread( + target=receive_frames_thread, + args=(min_handler, hex_mode, stop_event, frame_callback), + daemon=True + ) + receive_thread.start() + + min_handler.transport_reset() + + try: + while True: + # Get input from user + if hex_mode: + user_input = input("Enter hex payload: ") + try: + payload = parse_hex_input(user_input) + except ValueError as e: + log_and_print(f"Invalid hex input: {e}", logging.ERROR) + continue + else: + user_input = input("Enter string payload: ") + payload = user_input.encode('ascii') + + # Send the frame using the specified MIN ID + min_handler.queue_frame(min_id=min_id, payload=payload) + + except KeyboardInterrupt: + log_and_print("\nTerminating...") + stop_event.set() # Signal the thread to stop + return stop_event + except Exception as e: + log_and_print(f"Error: {e}", logging.ERROR) + return stop_event + + +def get_available_ports() -> List[str]: + """Get list of available serial ports. + + Returns: + List[str]: List of available port names + """ + return [port.device for port in serial.tools.list_ports.comports()] + + +def select_port(port: Optional[str] = None) -> str: + """Select a serial port interactively if none specified. + + If there is only one port, it will be returned directly. + + Args: + port: Optional port name to use directly + + Returns: + str: Selected port name + + Raises: + RuntimeError: If no ports are available or selection fails + """ + if port: + return port + + ports = get_available_ports() + if not ports: + raise RuntimeError("No serial ports found") + + if len(ports) == 1: + return ports[0] + + print("\nAvailable ports:") + for i, port_name in enumerate(ports, 1): + print(f"{i}. {port_name}") + + while True: + try: + choice = input("\nSelect port number: ") + index = int(choice) - 1 + if 0 <= index < len(ports): + return ports[index] + print("Invalid selection") + except ValueError: + print("Please enter a number") + except KeyboardInterrupt: + print("Exiting...") + exit(0) + + +def add_port_arg(parser): + parser.add_argument( + '--port', '-p', + help='Serial port (e.g., /dev/tty.usbmodem1421)' + ) + + +def add_min_id_arg(parser): + parser.add_argument( + '--min-id', + type=lambda x: int(x, 0), # Allows for hex (0x01) or decimal input + default=DEFAULT_MIN_ID, + help='MIN ID to use when sending frames (default: 0x01)' + ) + + +def add_baudrate_arg(parser): + parser.add_argument( + '--baudrate', + type=int, + default=DEFAULT_BAUDRATE, + help='Baudrate for serial communication (default: 9600)' + ) + + +def add_log_level_arg(parser): + parser.add_argument( + '--log-level', + type=str, + default=DEFAULT_LOG_LEVEL_NAME, + choices=log_levels_map.keys(), + help='Set logging level: debug, info, warning, error, critical ' + '(default: error)' + ) + + +def add_hex_arg(parser): + parser.add_argument('--hex', dest='hex', action='store_true', help='Use hex input mode') + parser.add_argument('--no-hex', dest='hex', action='store_false', help='Use string input mode') + parser.set_defaults(hex=DEFAULT_HEX_MODE) + + +def parse_args(parser,add_port=True, add_log_level=True, add_hex=True, add_min_id=True, add_baudrate=True): + """Parse command line arguments for MIN terminal functionality.""" + + if add_port: + add_port_arg(parser) + if add_log_level: + add_log_level_arg(parser) + if add_hex: + add_hex_arg(parser) + if add_min_id: + add_min_id_arg(parser) + if add_baudrate: + add_baudrate_arg(parser) + args = parser.parse_args() + + if add_port: + args.port = select_port(args.port) + if add_log_level: + args.log_level = parse_log_level(args.log_level) + if not add_hex: + args.hex = DEFAULT_HEX_MODE if DEFAULT_HEX_MODE is not None else True + if add_min_id: + # Validate MIN ID range (0-63 as per the spec) + if args.min_id not in range(64): + parser.error("MIN ID must be in range 0-63") + if not add_baudrate: + args.baudrate = DEFAULT_BAUDRATE + + return args + + +def main(): + """Run the MIN terminal.""" + parser = argparse.ArgumentParser(description="MIN terminal") + try: + args = parse_args(parser=parser) + except Exception as e: + print(f"Error: {e}") + sys.exit(1) + + # Set up and connect MIN handler + min_handler = setup_min_handler( + port=args.port, + baudrate=args.baudrate, + loglevel=args.log_level + ) + + log_and_print(f"Connected to {args.port} at {args.baudrate} baud") + # Use a dictionary to map logging levels to their names + level_names = { + logging.DEBUG: 'DEBUG', + logging.INFO: 'INFO', + logging.WARNING: 'WARNING', + logging.ERROR: 'ERROR', + logging.CRITICAL: 'CRITICAL' + } + log_and_print(f"Log level: {level_names.get(args.log_level, 'UNKNOWN')}") + + # Start the interactive terminal + stop_event = start_min_terminal( + min_handler=min_handler, + hex_mode=args.hex, + min_id=args.min_id + ) + + # Cleanup if the terminal exits + stop_event.set() + + +if __name__ == "__main__": + main()