diff --git a/doc/conf.py b/doc/conf.py index 2a0ac4e..88e624b 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -45,6 +45,7 @@ ("py:class", r"dotbot.models.Annotated"), ("py:class", r"Query"), ("py:class", r"PydanticUndefined"), + ("py:class", r"queue.Queue"), ] # -- Options for HTML output ------------------------------------------------- diff --git a/dotbot/controller.py b/dotbot/controller.py index 7bfc3ac..e30b37c 100644 --- a/dotbot/controller.py +++ b/dotbot/controller.py @@ -20,6 +20,7 @@ from typing import Dict, List import serial +import starlette import uvicorn import websockets from dotbot_utils.protocol import Frame, Payload @@ -294,7 +295,7 @@ def handle_received_frame( else: # reload if a new dotbot comes in logger.info("New robot") - notification_cmd = DotBotNotificationCommand.RELOAD + notification_cmd = DotBotNotificationCommand.NEW_DOTBOT if frame.packet.payload_type == PayloadType.ADVERTISEMENT: logger = logger.bind( @@ -425,6 +426,8 @@ def handle_received_frame( if self.settings.verbose is True: print(frame) self.dotbots.update({dotbot.address: dotbot}) + if notification_cmd != DotBotNotificationCommand.NEW_DOTBOT: + notification.data = DotBotModel(**dotbot.model_dump(exclude_none=True)) if notification_cmd != DotBotNotificationCommand.NONE: asyncio.create_task(self.notify_clients(notification)) @@ -432,8 +435,17 @@ async def _ws_send_safe(self, websocket: WebSocket, msg: str): """Safely send a message to a websocket client.""" try: await websocket.send_text(msg) - except websockets.exceptions.ConnectionClosedError: - await asyncio.sleep(0.1) + except ( + websockets.exceptions.ConnectionClosedError, + RuntimeError, + starlette.websockets.WebSocketDisconnect, + ) as exc: + self.logger.warning( + "Failed to send message to websocket client", + error=str(exc), + ) + if websocket in self.websockets: + self.websockets.remove(websocket) async def notify_clients(self, notification): """Send a message to all clients connected.""" diff --git a/dotbot/dotbot_simulator.py b/dotbot/dotbot_simulator.py index 756746c..212003b 100644 --- a/dotbot/dotbot_simulator.py +++ b/dotbot/dotbot_simulator.py @@ -6,9 +6,9 @@ """Dotbot simulator for the DotBot project.""" +import queue import random import threading -import time from binascii import hexlify from dataclasses import dataclass from enum import Enum @@ -31,6 +31,9 @@ INITIAL_BATTERY_VOLTAGE = 3000 # mV MAX_BATTERY_DURATION = 300 # 5 minutes +ADVERTISEMENT_INTERVAL_S = 0.5 +SIMULATOR_UPDATE_INTERVAL_S = 0.1 + def battery_discharge_model(time_elapsed_s: float) -> int: """Simple battery discharge model.""" @@ -76,11 +79,10 @@ class InitStateToml(BaseModel): network: SimulatedNetworkSettings = SimulatedNetworkSettings() -class DotBotSimulator(threading.Thread): +class DotBotSimulator: """Simulator class for the dotbot.""" - def __init__(self, settings: SimulatedDotBotSettings): - super().__init__(daemon=True) + def __init__(self, settings: SimulatedDotBotSettings, tx_queue: queue.Queue): self.address = settings.address.lower() self.pos_x = settings.pos_x self.pos_y = settings.pos_y @@ -97,10 +99,18 @@ def __init__(self, settings: SimulatedDotBotSettings): self.waypoints = [] self.waypoint_index = 0 + self._lock = threading.Lock() + self.tx_queue = tx_queue + self.queue = queue.Queue() + self.advertise_thread = threading.Thread(target=self.advertise, daemon=True) + self.rx_thread = threading.Thread(target=self.rx_frame, daemon=True) + self.main_thread = threading.Thread(target=self.run, daemon=True) self.controller_mode: DotBotSimulatorMode = DotBotSimulatorMode.MANUAL self.logger = LOGGER.bind(context=__name__, address=self.address) - self.running = True - self.start() + self._stop_event = threading.Event() + self.rx_thread.start() + self.advertise_thread.start() + self.main_thread.start() @property def header(self): @@ -187,98 +197,111 @@ def update(self, dt: float): "DotBot simulator update", x=self.pos_x, y=self.pos_y, theta=self.theta ) self.time_elapsed_s += dt - time.sleep(dt) def advertise(self): """Send an advertisement message to the gateway.""" - payload = Frame( - header=self.header, - packet=Packet.from_payload( - PayloadDotBotAdvertisement( - calibrated=self.calibrated, - direction=int(self.theta * 180 / pi + 90), - pos_x=int(self.pos_x) if self.pos_x >= 0 else 0, - pos_y=int(self.pos_y) if self.pos_y >= 0 else 0, - pos_z=0, - battery=battery_discharge_model(self.time_elapsed_s), - ) - ), - ) - return payload + while self._stop_event.is_set() is False: + payload = Frame( + header=self.header, + packet=Packet.from_payload( + PayloadDotBotAdvertisement( + calibrated=self.calibrated, + direction=int(self.theta * 180 / pi + 90), + pos_x=int(self.pos_x) if self.pos_x >= 0 else 0, + pos_y=int(self.pos_y) if self.pos_y >= 0 else 0, + pos_z=0, + battery=battery_discharge_model(self.time_elapsed_s), + ) + ), + ) + self.tx_queue.put_nowait(payload) + is_stopped = self._stop_event.wait(ADVERTISEMENT_INTERVAL_S) + if is_stopped: + break - def handle_frame(self, frame: Frame): + def rx_frame(self): """Decode the serial input received from the gateway.""" - if self.address == hex(frame.header.destination)[2:]: - if frame.payload_type == PayloadType.CMD_MOVE_RAW: - self.controller_mode = DotBotSimulatorMode.MANUAL - self.v_left = frame.packet.payload.left_y - self.v_right = frame.packet.payload.right_y - - if self.v_left > 127: - self.v_left = self.v_left - 256 - if self.v_right > 127: - self.v_right = self.v_right - 256 - - elif frame.payload_type == PayloadType.LH2_WAYPOINTS: - self.v_left = 0 - self.v_right = 0 - self.controller_mode = DotBotSimulatorMode.MANUAL - self.waypoint_threshold = frame.packet.payload.threshold - self.waypoints = frame.packet.payload.waypoints - if self.waypoints: - self.controller_mode = DotBotSimulatorMode.AUTOMATIC - - def stop(self): - self.logger.info("Stopping DotBot simulator...") - self.running = False - self.join() + while self._stop_event.is_set() is False: + frame = self.queue.get() + if frame is None: + break + with self._lock: + if self.address == hex(frame.header.destination)[2:]: + if frame.payload_type == PayloadType.CMD_MOVE_RAW: + self.controller_mode = DotBotSimulatorMode.MANUAL + self.v_left = frame.packet.payload.left_y + self.v_right = frame.packet.payload.right_y + + if self.v_left > 127: + self.v_left = self.v_left - 256 + if self.v_right > 127: + self.v_right = self.v_right - 256 + + elif frame.payload_type == PayloadType.LH2_WAYPOINTS: + self.v_left = 0 + self.v_right = 0 + self.controller_mode = DotBotSimulatorMode.MANUAL + self.waypoint_threshold = frame.packet.payload.threshold + self.waypoints = frame.packet.payload.waypoints + if self.waypoints: + self.controller_mode = DotBotSimulatorMode.AUTOMATIC def run(self): """Update the state of the dotbot simulator.""" - while self.running is True: - self.update(0.1) + while True: + with self._lock: + self.update(SIMULATOR_UPDATE_INTERVAL_S) + is_stopped = self._stop_event.wait(SIMULATOR_UPDATE_INTERVAL_S) + if is_stopped: + break + + def stop(self): + self.logger.info(f"Stopping DotBot {self.address} simulator...") + self._stop_event.set() + self.queue.put_nowait(None) # unblock the rx_thread if waiting on the queue + self.advertise_thread.join() + self.rx_thread.join() + self.main_thread.join() -class DotBotSimulatorCommunicationInterface(threading.Thread): +class DotBotSimulatorCommunicationInterface: """Bidirectional serial interface to control simulated robots""" def __init__(self, on_frame_received: Callable, simulator_init_state_path: str): + self.queue = queue.Queue() self.on_frame_received = on_frame_received - self.running = True - super().__init__(daemon=True) + self._stp_event = threading.Event() + self.main_thread = threading.Thread(target=self.run, daemon=True) init_state = InitStateToml(**toml.load(simulator_init_state_path)) self.network_pdr = init_state.network.pdr self.dotbots = [ DotBotSimulator( settings=dotbot_settings, + tx_queue=self.queue, ) for dotbot_settings in init_state.dotbots ] - self.start() + self.main_thread.start() self.logger = LOGGER.bind(context=__name__) self.logger.info("DotBot Simulation Started") def run(self): """Listen continuously at each byte received on the fake serial interface.""" - for dotbot in self.dotbots: - self.on_frame_received(dotbot.advertise()) - time.sleep(0.1) - - while self.running: - for dotbot in self.dotbots: - self.handle_dotbot_frame(dotbot.advertise()) - time.sleep( - 0.5 - PayloadDotBotAdvertisement().size * len(self.dotbots) * 0.000001 - ) + while self._stp_event.is_set() is False: + frame = self.queue.get() + if frame is None: + break + self.handle_dotbot_frame(frame) def stop(self): self.logger.info("Stopping DotBot Simulation...") - self.running = False + self._stp_event.set() + self.queue.put_nowait(None) # unblock the run thread if waiting on the queue for dotbot in self.dotbots: dotbot.stop() - self.join() + self.main_thread.join() def flush(self): """Flush fake serial output.""" @@ -304,4 +327,4 @@ def write(self, bytes_): f"Packet to DotBot {dotbot.address} lost in simulation" ) continue - dotbot.handle_frame(Frame.from_bytes(bytes_)) + dotbot.queue.put_nowait(Frame.from_bytes(bytes_)) diff --git a/dotbot/frontend/src/DotBots.js b/dotbot/frontend/src/DotBots.js index 219811b..ec85651 100644 --- a/dotbot/frontend/src/DotBots.js +++ b/dotbot/frontend/src/DotBots.js @@ -164,6 +164,9 @@ const DotBots = ({ dotbots, areaSize, updateDotbots, publishCommand, publish }) ]); let needDotBotMap = dotbots.filter(dotbot => dotbot.application === ApplicationType.DotBot).some((dotbot) => dotbot.calibrated > 0x00); + let dotbotCount = dotbots.filter(dotbot => dotbot.application === ApplicationType.DotBot).length; + let sailbotCount = dotbots.filter(dotbot => dotbot.application === ApplicationType.SailBot).length; + let xgoCount = dotbots.filter(dotbot => dotbot.application === ApplicationType.XGO).length; return ( <> @@ -185,11 +188,11 @@ const DotBots = ({ dotbots, areaSize, updateDotbots, publishCommand, publish })