From aa6515c9526b19b8636e0fc49188f5f78875d6ae Mon Sep 17 00:00:00 2001 From: cagataycali Date: Fri, 13 Mar 2026 23:36:30 -0400 Subject: [PATCH 1/2] feat: dashboard, camera streaming, zenoh mesh enrichment & teleop - Add Zenoh-native web dashboard (strands_robots/dashboard/server.py) - Real-time peer discovery, joint state gauges, camera feeds - HuggingFace model search with autocomplete - Keyboard teleop (WASD), E-STOP, command dispatch - WebSocket + HTTP server with live event log - Enrich Zenoh mesh with camera streaming & event publishing - publish_event() for discrete mutations (object/robot add/remove) - publish_camera_frame() + start_camera_stream() background thread - Rich state: joints, objects, cameras, world metadata - Dashboard command routing: execute, stop, teleop_start/stop/delta - Simulation improvements - _invalidate_renderers() after model recompile (fixes stale cameras) - teleop_delta() for real-time joint nudging from dashboard - Defensive _pad4() in MJCF builder for short color/orientation lists - Auto-detect first robot when robot_name not specified - _publish_event() on every scene mutation - Robot camera streaming support - _render_camera_jpeg_b64() for real robot camera frames - start_camera_stream() over Zenoh mesh --- strands_robots/dashboard/__init__.py | 1 + strands_robots/dashboard/__main__.py | 4 + strands_robots/dashboard/server.py | 1591 +++++++++++++++++++ strands_robots/robot.py | 56 + strands_robots/simulation/_mjcf_builder.py | 19 +- strands_robots/simulation/_policy_runner.py | 21 + strands_robots/simulation/simulation.py | 176 +- strands_robots/zenoh_mesh.py | 199 ++- 8 files changed, 2054 insertions(+), 13 deletions(-) create mode 100644 strands_robots/dashboard/__init__.py create mode 100644 strands_robots/dashboard/__main__.py create mode 100644 strands_robots/dashboard/server.py diff --git a/strands_robots/dashboard/__init__.py b/strands_robots/dashboard/__init__.py new file mode 100644 index 0000000..6079f43 --- /dev/null +++ b/strands_robots/dashboard/__init__.py @@ -0,0 +1 @@ +# Dashboard package diff --git a/strands_robots/dashboard/__main__.py b/strands_robots/dashboard/__main__.py new file mode 100644 index 0000000..1165aca --- /dev/null +++ b/strands_robots/dashboard/__main__.py @@ -0,0 +1,4 @@ +"""Run the dashboard: python -m strands_robots.dashboard""" +from strands_robots.dashboard.server import main + +main() diff --git a/strands_robots/dashboard/server.py b/strands_robots/dashboard/server.py new file mode 100644 index 0000000..097f7ee --- /dev/null +++ b/strands_robots/dashboard/server.py @@ -0,0 +1,1591 @@ +#!/usr/bin/env python3 +"""Strands Robots Dashboard — Zenoh-native web dashboard for all Robot() instances. + +Every Robot("so100"), Robot("panda"), etc. auto-joins the Zenoh mesh. +This dashboard subscribes to ALL mesh traffic and renders it in a +real-time web UI: + +- Live peer discovery (robots, sims, agents) +- Joint state visualization (sparklines + gauges) +- VLA execution stream (watch policy steps live) +- Camera feeds (JPEG frames from sim/real cameras over Zenoh) +- Action vector visualization (live bar charts) +- Command dispatch (tell any robot to do something) +- Emergency stop (broadcast to all) +- Agent grid layout — switch between peers, see everything at once + +Architecture: + Robot() ──► Zenoh mesh ◄── Dashboard (this) + Robot() ──► ◄── Agent + Sim() ──► ◄── Other dashboards + +Usage: + python -m strands_robots.dashboard.server --port 7860 + # Opens http://localhost:7860 + + # Or from Python: + from strands_robots.dashboard.server import start_dashboard + start_dashboard(port=7860) +""" + +import argparse +import asyncio +import json +import logging +import os +import socket +import threading +import time +import uuid +from http.server import HTTPServer, SimpleHTTPRequestHandler +from socketserver import ThreadingMixIn +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +# State — shared between Zenoh subscribers and WebSocket clients +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +_STATE: Dict[str, Any] = { + "peers": {}, # peer_id → {type, hostname, last_seen, caps, ...} + "states": {}, # peer_id → {joints, sim_time, task, ...} + "streams": {}, # peer_id → [last N stream steps] + "cameras": {}, # peer_id → {camera_name: base64_jpeg} + "commands_sent": [], # [{target, cmd, ts}, ...] + "responses": [], # [{from, turn, result, ts}, ...] + "events": [], # [{ts, type, data}, ...] ring buffer +} +_STATE_LOCK = threading.Lock() +_WS_CLIENTS: List[Any] = [] +_WS_LOCK = threading.Lock() +_ZENOH_SESSION = None +_ZENOH_SUBS = [] +_DASHBOARD_PEER_ID = f"dashboard-{uuid.uuid4().hex[:6]}" + +MAX_EVENTS = 500 +MAX_STREAM_STEPS = 200 + + +def _add_event(event_type: str, data: dict): + """Add event to the ring buffer.""" + with _STATE_LOCK: + _STATE["events"].append({ + "ts": time.time(), + "type": event_type, + "data": data, + }) + if len(_STATE["events"]) > MAX_EVENTS: + _STATE["events"] = _STATE["events"][-MAX_EVENTS:] + _broadcast_ws({"type": "event", "event_type": event_type, "data": data, "ts": time.time()}) + + +def _broadcast_ws(msg: dict): + """Send message to all connected WebSocket clients.""" + payload = json.dumps(msg, default=str) + with _WS_LOCK: + dead = [] + for ws in _WS_CLIENTS: + try: + ws.send(payload) + except Exception: + dead.append(ws) + for ws in dead: + _WS_CLIENTS.remove(ws) + + +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +# Zenoh Mesh Subscriber — listens to everything on strands/** +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +def _start_zenoh(): + """Connect to the Zenoh mesh and subscribe to all robot traffic.""" + global _ZENOH_SESSION + + try: + import zenoh + except ImportError: + logger.warning("eclipse-zenoh not installed. Dashboard runs without live mesh data.") + logger.warning("Install: pip install eclipse-zenoh") + return False + + try: + config = zenoh.Config() + + MESH_PORT = int(os.getenv("STRANDS_MESH_PORT", "7447")) + MESH_EP = f"tcp/127.0.0.1:{MESH_PORT}" + + connect = os.getenv("ZENOH_CONNECT") + listen = os.getenv("ZENOH_LISTEN") + + if connect: + config.insert_json5("connect/endpoints", json.dumps([e.strip() for e in connect.split(",")])) + if listen: + config.insert_json5("listen/endpoints", json.dumps([e.strip() for e in listen.split(",")])) + + if not connect and not listen: + try: + cfg_try = zenoh.Config() + cfg_try.insert_json5("listen/endpoints", json.dumps([MESH_EP])) + cfg_try.insert_json5("connect/endpoints", json.dumps([MESH_EP])) + _ZENOH_SESSION = zenoh.open(cfg_try) + logger.info("Zenoh dashboard session opened (listener on %s)", MESH_EP) + except Exception: + config.insert_json5("mode", '"client"') + config.insert_json5("connect/endpoints", json.dumps([MESH_EP])) + _ZENOH_SESSION = zenoh.open(config) + logger.info("Zenoh dashboard session opened (client)") + else: + _ZENOH_SESSION = zenoh.open(config) + logger.info("Zenoh dashboard session opened") + + # Subscribe to all strands topics + _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/presence", _on_presence)) + _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/state", _on_state)) + _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/stream", _on_stream)) + _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/camera", _on_camera)) + _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/event", _on_sim_event)) + _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/broadcast", _on_broadcast)) + _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("reachy_mini/**", _on_external)) + + # Publish dashboard presence + threading.Thread(target=_dashboard_heartbeat, daemon=True).start() + + logger.info("Zenoh subscriptions active — listening for robots") + return True + + except Exception as e: + logger.error("Zenoh setup failed: %s", e) + return False + + +def _dashboard_heartbeat(): + """Publish dashboard presence on the mesh.""" + while _ZENOH_SESSION: + try: + _ZENOH_SESSION.put( + f"strands/{_DASHBOARD_PEER_ID}/presence", + json.dumps({ + "robot_id": _DASHBOARD_PEER_ID, + "robot_type": "dashboard", + "hostname": socket.gethostname(), + "timestamp": time.time(), + "ws_clients": len(_WS_CLIENTS), + }).encode(), + ) + except Exception: + pass + time.sleep(2.0) + + +def _on_presence(sample): + """Handle presence heartbeats from robots/sims.""" + try: + data = json.loads(sample.payload.to_bytes().decode()) + peer_id = data.get("robot_id", "") + if not peer_id or peer_id == _DASHBOARD_PEER_ID: + return + + with _STATE_LOCK: + is_new = peer_id not in _STATE["peers"] + _STATE["peers"][peer_id] = { + **data, + "last_seen": time.time(), + } + + if is_new: + _add_event("peer_join", {"peer_id": peer_id, **data}) + logger.info("🤖 New peer: %s (%s)", peer_id, data.get("robot_type", "?")) + + # Auto-request camera stream from sim and robot peers + role = data.get("role", data.get("robot_type", "")) + if role in ("sim", "robot"): + threading.Thread( + target=lambda pid=peer_id: ( + time.sleep(1), # Wait for peer to be fully ready + _send_command(pid, {"action": "start_camera_stream", "camera": "default", "fps": 10 if role == "sim" else 5}), + logger.info("📹 Auto-requested camera stream from %s", pid), + ), + daemon=True, + ).start() + + _broadcast_ws({"type": "peers", "peers": _get_peers()}) + + except Exception as e: + logger.debug("Presence parse error: %s", e) + + +def _on_state(sample): + """Handle state updates (joint positions, sim time, task status).""" + try: + data = json.loads(sample.payload.to_bytes().decode()) + peer_id = data.get("peer_id", "") + if not peer_id: + return + + with _STATE_LOCK: + _STATE["states"][peer_id] = { + **data, + "_received_at": time.time(), + } + + _broadcast_ws({"type": "state", "peer_id": peer_id, "state": data}) + + except Exception as e: + logger.debug("State parse error: %s", e) + + +def _on_stream(sample): + """Handle VLA execution stream steps.""" + try: + data = json.loads(sample.payload.to_bytes().decode()) + peer_id = data.get("peer_id", "") + if not peer_id: + return + + with _STATE_LOCK: + if peer_id not in _STATE["streams"]: + _STATE["streams"][peer_id] = [] + _STATE["streams"][peer_id].append(data) + if len(_STATE["streams"][peer_id]) > MAX_STREAM_STEPS: + _STATE["streams"][peer_id] = _STATE["streams"][peer_id][-MAX_STREAM_STEPS:] + + _broadcast_ws({"type": "stream_step", "peer_id": peer_id, "step": data}) + + except Exception as e: + logger.debug("Stream parse error: %s", e) + + +def _on_camera(sample): + """Handle camera frame data (base64 JPEG from sim/real cameras).""" + try: + data = json.loads(sample.payload.to_bytes().decode()) + peer_id = data.get("peer_id", "") + cam_name = data.get("camera", "default") + frame_b64 = data.get("frame", "") + if not peer_id or not frame_b64: + return + + with _STATE_LOCK: + if peer_id not in _STATE["cameras"]: + _STATE["cameras"][peer_id] = {} + _STATE["cameras"][peer_id][cam_name] = frame_b64 + + # Forward camera frame to WS clients that are watching this peer + _broadcast_ws({ + "type": "camera_frame", + "peer_id": peer_id, + "camera": cam_name, + "frame": frame_b64, + "ts": data.get("ts", time.time()), + }) + + except Exception as e: + logger.debug("Camera parse error: %s", e) + + +def _on_broadcast(sample): + """Handle broadcast messages (e.g., emergency stop).""" + try: + data = json.loads(sample.payload.to_bytes().decode()) + _add_event("broadcast", data) + except Exception: + pass + + +def _on_sim_event(sample): + """Handle discrete simulation events (object added/removed, policy started, etc.).""" + try: + data = json.loads(sample.payload.to_bytes().decode()) + peer_id = data.get("peer_id", "") + event_type = data.get("event_type", "") + event_data = data.get("data", {}) + if not peer_id or not event_type: + return + + _add_event(f"sim:{event_type}", {"peer_id": peer_id, **event_data}) + + # Forward to WS clients as a dedicated message type + _broadcast_ws({ + "type": "sim_event", + "peer_id": peer_id, + "event_type": event_type, + "data": event_data, + "ts": data.get("ts", time.time()), + }) + + except Exception as e: + logger.debug("Sim event parse error: %s", e) + + +def _on_external(sample): + """Handle external robot topics (Reachy Mini, etc.).""" + try: + key = str(sample.key_expr) + raw = sample.payload.to_bytes().decode() + try: + data = json.loads(raw) + except json.JSONDecodeError: + data = {"raw": raw} + + _add_event("external", {"topic": key, "data": data}) + except Exception: + pass + + +def _get_peers() -> list: + """Get all peers with age calculation.""" + now = time.time() + with _STATE_LOCK: + peers = [] + stale = [] + for pid, info in _STATE["peers"].items(): + age = now - info.get("last_seen", 0) + if age > 15.0: + stale.append(pid) + continue + peers.append({ + "peer_id": pid, + "type": info.get("robot_type", "unknown"), + "hostname": info.get("hostname", "?"), + "age": round(age, 1), + "task_status": info.get("task_status", ""), + "instruction": info.get("instruction", ""), + "tool_name": info.get("tool_name", ""), + "connected": info.get("connected", None), + "sim_robots": info.get("sim_robots", []), + "world": info.get("world", False), + "action_keys": info.get("action_keys", []), + }) + for pid in stale: + del _STATE["peers"][pid] + _add_event("peer_leave", {"peer_id": pid}) + return peers + + +def _send_command(target: str, command: dict): + """Send a command to a peer via Zenoh.""" + if not _ZENOH_SESSION: + return {"error": "Zenoh not connected"} + + turn_id = uuid.uuid4().hex[:8] + msg = { + "sender_id": _DASHBOARD_PEER_ID, + "turn_id": turn_id, + "command": command, + "timestamp": time.time(), + } + + try: + if target == "broadcast": + _ZENOH_SESSION.put("strands/broadcast", json.dumps(msg).encode()) + else: + _ZENOH_SESSION.put(f"strands/{target}/cmd", json.dumps(msg).encode()) + + with _STATE_LOCK: + _STATE["commands_sent"].append({ + "target": target, + "command": command, + "turn_id": turn_id, + "ts": time.time(), + }) + if len(_STATE["commands_sent"]) > 100: + _STATE["commands_sent"] = _STATE["commands_sent"][-100:] + + _add_event("command_sent", {"target": target, "command": command, "turn_id": turn_id}) + return {"status": "sent", "turn_id": turn_id} + + except Exception as e: + return {"error": str(e)} + + +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +# WebSocket Handler (using websockets library) +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +async def _ws_handler(websocket): + """Handle a single WebSocket client connection.""" + class WSWrapper: + def __init__(self, ws): + self._ws = ws + self._loop = asyncio.get_event_loop() + + def send(self, data): + asyncio.run_coroutine_threadsafe(self._ws.send(data), self._loop) + + wrapper = WSWrapper(websocket) + with _WS_LOCK: + _WS_CLIENTS.append(wrapper) + + logger.info("WebSocket client connected (%d total)", len(_WS_CLIENTS)) + + try: + # Send initial state + await websocket.send(json.dumps({ + "type": "init", + "peers": _get_peers(), + "dashboard_id": _DASHBOARD_PEER_ID, + "states": {k: v for k, v in _STATE.get("states", {}).items()}, + "events": _STATE.get("events", [])[-50:], + }, default=str)) + + async for message in websocket: + try: + msg = json.loads(message) + msg_type = msg.get("type", "") + + if msg_type == "command": + target = msg.get("target", "") + command = msg.get("command", {}) + result = _send_command(target, command) + await websocket.send(json.dumps({"type": "command_result", **result}, default=str)) + + elif msg_type == "tell": + target = msg.get("target", "") + instruction = msg.get("instruction", "") + policy = msg.get("policy_provider", "mock") + duration = msg.get("duration", 30.0) + + # Handle special commands + if instruction == "__stop__" or msg.get("stop"): + result = _send_command(target, {"action": "stop"}) + await websocket.send(json.dumps({"type": "command_result", "action": "stop", **result}, default=str)) + elif instruction == "__teleop_start__" or msg.get("teleop"): + cmd = { + "action": "teleop_start", + "policy_provider": policy, + "duration": duration, + } + for k in ("pretrained_name_or_path", "model_path", "server_address", "policy_type"): + if k in msg: + cmd[k] = msg[k] + result = _send_command(target, cmd) + await websocket.send(json.dumps({"type": "command_result", "action": "teleop_start", **result}, default=str)) + elif instruction == "__teleop_stop__" or msg.get("teleop_stop"): + result = _send_command(target, {"action": "teleop_stop"}) + await websocket.send(json.dumps({"type": "command_result", "action": "teleop_stop", **result}, default=str)) + elif instruction == "__teleop_delta__" and msg.get("teleop_delta"): + delta = msg["teleop_delta"] + result = _send_command(target, {"action": "teleop_delta", **delta}) + # Don't send ack for delta (too noisy) + else: + cmd = { + "action": "execute", + "instruction": instruction, + "policy_provider": policy, + "duration": duration, + } + # Forward extra policy kwargs + for k in ("pretrained_name_or_path", "model_path", "server_address", "policy_type"): + if k in msg: + cmd[k] = msg[k] + result = _send_command(target, cmd) + await websocket.send(json.dumps({"type": "command_result", **result}, default=str)) + + elif msg_type == "emergency_stop": + result = _send_command("broadcast", {"action": "stop"}) + await websocket.send(json.dumps({"type": "command_result", "action": "emergency_stop", **result}, default=str)) + + elif msg_type == "get_peers": + await websocket.send(json.dumps({"type": "peers", "peers": _get_peers()}, default=str)) + + elif msg_type == "get_state": + peer_id = msg.get("peer_id", "") + state = _STATE.get("states", {}).get(peer_id, {}) + await websocket.send(json.dumps({"type": "state", "peer_id": peer_id, "state": state}, default=str)) + + elif msg_type == "get_stream": + peer_id = msg.get("peer_id", "") + steps = _STATE.get("streams", {}).get(peer_id, []) + await websocket.send(json.dumps({"type": "stream_history", "peer_id": peer_id, "steps": steps[-50:]}, default=str)) + + elif msg_type == "get_camera": + peer_id = msg.get("peer_id", "") + cams = _STATE.get("cameras", {}).get(peer_id, {}) + await websocket.send(json.dumps({"type": "camera_state", "peer_id": peer_id, "cameras": cams}, default=str)) + + elif msg_type == "request_camera_stream": + # Ask a sim peer to start publishing camera frames + target = msg.get("peer_id", "") + cam = msg.get("camera", "default") + fps = msg.get("fps", 10) + result = _send_command(target, { + "action": "start_camera_stream", + "camera": cam, + "fps": fps, + }) + await websocket.send(json.dumps({"type": "command_result", **result}, default=str)) + + elif msg_type == "ping": + await websocket.send(json.dumps({"type": "pong", "ts": time.time()})) + + except json.JSONDecodeError: + await websocket.send(json.dumps({"type": "error", "message": "Invalid JSON"})) + except Exception as e: + await websocket.send(json.dumps({"type": "error", "message": str(e)})) + + except Exception: + pass + finally: + with _WS_LOCK: + if wrapper in _WS_CLIENTS: + _WS_CLIENTS.remove(wrapper) + logger.info("WebSocket client disconnected (%d remaining)", len(_WS_CLIENTS)) + + +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +# HTML Dashboard UI — Full Agent Grid +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +DASHBOARD_HTML = r""" + + + + +Strands Robots Dashboard + + + +
+ +
connecting
+
0 peers
+
+ + +
+ +
+ +
+ +
+
+ +
🤖
Create a Robot() to see it here
from strands_robots import Robot; sim = Robot("so100")
+
+
+

Event Log

+
+
+
+ + + +""" + + +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ +# HTTP + WebSocket Server +# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + """HTTPServer that handles each request in a separate thread.""" + daemon_threads = True + + +class DashboardHTTPHandler(SimpleHTTPRequestHandler): + """Serve the dashboard HTML and API endpoints.""" + + def do_GET(self): + if self.path == "/" or self.path == "/index.html": + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + self.wfile.write(DASHBOARD_HTML.encode("utf-8")) + elif self.path == "/api/peers": + self._json_response(_get_peers()) + elif self.path == "/api/state": + self._json_response(_STATE.get("states", {})) + elif self.path == "/api/events": + self._json_response(_STATE.get("events", [])[-100:]) + elif self.path.startswith("/api/state/"): + peer_id = self.path.split("/api/state/")[1] + self._json_response(_STATE.get("states", {}).get(peer_id, {})) + elif self.path.startswith("/api/cameras/"): + peer_id = self.path.split("/api/cameras/")[1] + self._json_response(_STATE.get("cameras", {}).get(peer_id, {})) + else: + self.send_response(404) + self.end_headers() + + def _json_response(self, data): + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(json.dumps(data, default=str).encode()) + + def log_message(self, format, *args): + pass + + +def start_dashboard(port: int = 7860, open_browser: bool = True): + """Start the Strands Robots Dashboard. + + Args: + port: HTTP port (WebSocket will be port+1) + open_browser: Auto-open browser + """ + ws_port = port + 1 + + print(f"🤖 Strands Robots Dashboard") + print(f"=" * 50) + + zenoh_ok = _start_zenoh() + if zenoh_ok: + print(f"✅ Zenoh mesh connected (peer: {_DASHBOARD_PEER_ID})") + else: + print(f"⚠️ Zenoh not available — dashboard will show no live data") + print(f" Install: pip install eclipse-zenoh") + + async def _ws_server(): + import websockets + async with websockets.serve(_ws_handler, "0.0.0.0", ws_port): + logger.info("WebSocket server on :%d", ws_port) + await asyncio.Future() + + ws_thread = threading.Thread( + target=lambda: asyncio.new_event_loop().run_until_complete(_ws_server()), + daemon=True, + ) + ws_thread.start() + print(f"✅ WebSocket server: ws://localhost:{ws_port}") + + httpd = ThreadingHTTPServer(("0.0.0.0", port), DashboardHTTPHandler) + print(f"✅ Dashboard: http://localhost:{port}") + print(f"=" * 50) + print(f"📡 Listening for Robot() instances on Zenoh mesh...") + print(f" Create a robot to see it here: Robot('so100')") + print() + + if open_browser: + try: + import webbrowser + webbrowser.open(f"http://localhost:{port}") + except Exception: + pass + + try: + httpd.serve_forever() + except KeyboardInterrupt: + print("\n🛑 Dashboard stopped") + httpd.shutdown() + + +def main(): + """CLI entry point.""" + parser = argparse.ArgumentParser(description="Strands Robots Dashboard") + parser.add_argument("--port", "-p", type=int, default=7860, help="HTTP port (default: 7860)") + parser.add_argument("--no-browser", action="store_true", help="Don't auto-open browser") + args = parser.parse_args() + start_dashboard(port=args.port, open_browser=not args.no_browser) + + +if __name__ == "__main__": + main() diff --git a/strands_robots/robot.py b/strands_robots/robot.py index a0c07eb..83487eb 100644 --- a/strands_robots/robot.py +++ b/strands_robots/robot.py @@ -1037,6 +1037,62 @@ def get_features(self) -> Dict[str, Any]: ], } + # --- Camera streaming for dashboard --- + + def _render_camera_jpeg_b64(self, camera_name: str = "default") -> str | None: + """Render a camera frame from the real robot as base64 JPEG for mesh streaming. + + Grabs the latest observation and encodes the first camera image. + """ + try: + if not getattr(self.robot, "is_connected", False): + return None + + observation = self.robot.get_observation() + + # Find camera keys + camera_keys = [] + if hasattr(self.robot, "config") and hasattr(self.robot.config, "cameras"): + camera_keys = list(self.robot.config.cameras.keys()) + + # If specific camera requested, use it; otherwise use first available + cam_key = camera_name if camera_name in observation else (camera_keys[0] if camera_keys else None) + if not cam_key or cam_key not in observation: + return None + + img = observation[cam_key] + if not hasattr(img, "shape") or len(img.shape) < 2: + return None + + import base64 + import io + from PIL import Image + + # Convert numpy array to JPEG + if hasattr(img, "numpy"): + img = img.numpy() + pil_img = Image.fromarray(img) + buffer = io.BytesIO() + pil_img.save(buffer, format="JPEG", quality=60) + return base64.b64encode(buffer.getvalue()).decode("ascii") + + except Exception as e: + logger.debug("Robot camera render failed: %s", e) + return None + + def start_camera_stream(self, camera: str = "default", fps: int = 5) -> Dict[str, Any]: + """Start streaming camera frames from real robot over Zenoh mesh. + + Args: + camera: Camera name (default: first available) + fps: Frames per second (default: 5, lower than sim for bandwidth) + """ + if not self.mesh or not self.mesh.alive: + return {"status": "error", "content": [{"text": "❌ Mesh not enabled"}]} + + self.mesh.start_camera_stream(self._render_camera_jpeg_b64, camera=camera, fps=fps) + return {"status": "success", "content": [{"text": f"📹 Camera stream started: {camera} @ {fps}fps"}]} + def tool_name(self) -> str: return self.tool_name_str diff --git a/strands_robots/simulation/_mjcf_builder.py b/strands_robots/simulation/_mjcf_builder.py index ec54173..3ce7909 100644 --- a/strands_robots/simulation/_mjcf_builder.py +++ b/strands_robots/simulation/_mjcf_builder.py @@ -63,13 +63,26 @@ def build_objects_only(world: SimWorld) -> str: return "\n".join(parts) + @staticmethod + def _pad4(val, default): + """Pad a list to exactly 4 elements (for quaternion/RGBA).""" + if val is None: + return list(default) + v = list(val) + while len(v) < 4: + v.append(default[len(v)] if len(v) < len(default) else 0.0) + return v[:4] + @staticmethod def _object_xml(obj: SimObject, indent: int = 4) -> str: """Generate MJCF XML for a single object.""" pad = " " * indent - px, py, pz = obj.position - qw, qx, qy, qz = obj.orientation - r, g, b, a = obj.color + pos = list(obj.position) + [0.0, 0.0, 0.0] + px, py, pz = pos[0], pos[1], pos[2] + ori = MJCFBuilder._pad4(obj.orientation, [1.0, 0.0, 0.0, 0.0]) + qw, qx, qy, qz = ori[0], ori[1], ori[2], ori[3] + col = MJCFBuilder._pad4(obj.color, [0.5, 0.5, 0.5, 1.0]) + r, g, b, a = col[0], col[1], col[2], col[3] lines = [] lines.append(f'{pad}') diff --git a/strands_robots/simulation/_policy_runner.py b/strands_robots/simulation/_policy_runner.py index 8aa7bad..61de574 100644 --- a/strands_robots/simulation/_policy_runner.py +++ b/strands_robots/simulation/_policy_runner.py @@ -77,6 +77,10 @@ def run_policy( robot.policy_steps = 0 next_frame_step = 0.0 + # Notify dashboard + if hasattr(self, '_publish_event'): + self._publish_event("policy_started", {"robot": robot_name, "policy": policy_provider, "instruction": instruction}) + sim_duration = duration * control_frequency # target number of control steps start_time = time.time() action_sleep = 1.0 / control_frequency @@ -112,6 +116,19 @@ def run_policy( self._apply_sim_action(robot_name, action_dict) robot.policy_steps += 1 + # Stream step to mesh for dashboard live monitoring + if hasattr(self, "mesh") and self.mesh and self.mesh.alive: + try: + self.mesh.publish_step( + step=robot.policy_steps, + observation=observation, + action=action_dict, + instruction=instruction, + policy=policy_provider, + ) + except Exception: + pass # Never let mesh errors break the control loop + if writer and robot.policy_steps >= next_frame_step: renderer = self._get_renderer(video_width, video_height) if cam_id >= 0: @@ -128,6 +145,10 @@ def run_policy( elapsed = time.time() - start_time robot.policy_running = False + # Notify dashboard + if hasattr(self, '_publish_event'): + self._publish_event("policy_completed", {"robot": robot_name, "steps": robot.policy_steps, "elapsed": round(elapsed, 1)}) + result_text = ( f"✅ Policy complete on '{robot_name}'\n" f"🧠 {policy_provider} | 🎯 {instruction}\n" diff --git a/strands_robots/simulation/simulation.py b/strands_robots/simulation/simulation.py index 4994ba6..fc2e228 100644 --- a/strands_robots/simulation/simulation.py +++ b/strands_robots/simulation/simulation.py @@ -219,10 +219,39 @@ def _compile_world(self): def _recompile_world(self) -> Dict[str, Any]: try: self._compile_world() + self._invalidate_renderers() return {"status": "success"} except Exception as e: return {"status": "error", "content": [{"text": f"❌ Recompile failed: {e}"}]} + def _invalidate_renderers(self): + """Invalidate all cached renderers after model recompile. + + Called after any operation that changes the MuJoCo model (add_object, + remove_object, add_robot, add_camera, etc.) so camera streams and + render() produce frames from the new model. + + NOTE: We only set references to None — we do NOT call .close() here + because the camera stream thread may still be using the renderer. + The old renderer will be garbage collected when no longer referenced. + New renderers are lazily created on next use. + """ + # Set to None so next render creates a fresh renderer for the new model. + # The old renderer object will be GC'd when the camera thread moves on. + self._cam_stream_renderer = None + + # Clear all cached renderers — same pattern: just drop references + self._renderers.clear() + self._renderer_model = None + + def _publish_event(self, event_type: str, data: dict = None): + """Publish a sim mutation event to the Zenoh mesh.""" + if hasattr(self, "mesh") and self.mesh and self.mesh.alive: + try: + self.mesh.publish_event(event_type, data or {}) + except Exception: + pass + # --- Robot Management --- @staticmethod @@ -358,6 +387,8 @@ def add_robot( mj.mj_step(model, data) source = f"data_config='{data_config}'" if data_config else os.path.basename(resolved_path) + self._invalidate_renderers() + self._publish_event("robot_added", {"name": name, "joints": len(robot.joint_names), "source": source}) return { "status": "success", "content": [{"text": ( @@ -385,6 +416,7 @@ def remove_robot(self, name: str) -> Dict[str, Any]: pass del self._policy_threads[name] del self._world.robots[name] + self._publish_event("robot_removed", {"name": name}) return {"status": "success", "content": [{"text": f"🗑️ Robot '{name}' removed."}]} def list_robots(self) -> Dict[str, Any]: @@ -440,12 +472,13 @@ def add_object(self, name: str, shape: str = "box", position: List[float] = None if name in self._world.objects: return {"status": "error", "content": [{"text": f"❌ Object '{name}' exists."}]} + from strands_robots.simulation._mjcf_builder import MJCFBuilder obj = SimObject( name=name, shape=shape, - position=position or [0.0, 0.0, 0.0], - orientation=orientation or [1.0, 0.0, 0.0, 0.0], - size=size or [0.05, 0.05, 0.05], - color=color or [0.5, 0.5, 0.5, 1.0], + position=list(position or [0.0, 0.0, 0.0])[:3], + orientation=MJCFBuilder._pad4(orientation, [1.0, 0.0, 0.0, 0.0]), + size=list(size or [0.05, 0.05, 0.05]), + color=MJCFBuilder._pad4(color, [0.5, 0.5, 0.5, 1.0]), mass=mass, mesh_path=mesh_path, is_static=is_static, ) self._world.objects[name] = obj @@ -453,14 +486,18 @@ def add_object(self, name: str, shape: str = "box", position: List[float] = None if self._world.robots: try: result = inject_object_into_scene(self._world, obj) + self._invalidate_renderers() if result: + self._publish_event("object_added", {"name": name, "shape": shape, "position": obj.position}) return {"status": "success", "content": [{"text": f"📦 '{name}' spawned: {shape} at {obj.position}"}]} + self._publish_event("object_added", {"name": name, "shape": shape, "position": obj.position}) return {"status": "success", "content": [{"text": ( f"📦 '{name}' registered: {shape} at {obj.position}\n" "⚠️ Robot scene loaded — object is tracked but not physically spawned." )}]} except Exception as e: logger.warning("Object injection failed, tracking metadata only: %s", e) + self._publish_event("object_added", {"name": name, "shape": shape, "position": obj.position, "warning": str(e)}) return {"status": "success", "content": [{"text": f"📦 '{name}' registered: {shape} at {obj.position}\n⚠️ Injection failed ({e})"}]} result = self._recompile_world() @@ -468,6 +505,7 @@ def add_object(self, name: str, shape: str = "box", position: List[float] = None del self._world.objects[name] return result + self._publish_event("object_added", {"name": name, "shape": shape, "position": obj.position}) return {"status": "success", "content": [{"text": f"📦 '{name}' added: {shape} at {obj.position}, size={obj.size}, {'static' if is_static else f'{mass}kg'}"}]} def remove_object(self, name: str) -> Dict[str, Any]: @@ -476,8 +514,10 @@ def remove_object(self, name: str) -> Dict[str, Any]: del self._world.objects[name] if self._world.robots: eject_body_from_scene(self._world, name) + self._invalidate_renderers() else: self._recompile_world() + self._publish_event("object_removed", {"name": name}) return {"status": "success", "content": [{"text": f"🗑️ '{name}' removed."}]} def move_object(self, name: str, position: List[float] = None, orientation: List[float] = None) -> Dict[str, Any]: @@ -500,6 +540,7 @@ def move_object(self, name: str, position: List[float] = None, orientation: List self._world.objects[name].orientation = orientation mj.mj_forward(model, data) + self._publish_event("object_moved", {"name": name, "position": position, "orientation": orientation}) return {"status": "success", "content": [{"text": f"📍 '{name}' moved to {position or 'same'}"}]} def list_objects(self) -> Dict[str, Any]: @@ -564,6 +605,7 @@ def reset(self) -> Dict[str, Any]: for r in self._world.robots.values(): r.policy_running = False r.policy_steps = 0 + self._publish_event("reset", {}) return {"status": "success", "content": [{"text": "🔄 Reset to initial state."}]} def get_state(self) -> Dict[str, Any]: @@ -749,12 +791,17 @@ def _dispatch_action(self, action: str, d: Dict[str, Any]) -> Dict[str, Any]: "list_urdfs": "list_urdfs_action", "register_urdf": "register_urdf_action", "stop_policy": "_stop_policy", + "execute": "run_policy", + "stop": "_stop_policy", + "teleop_start": "run_policy", + "teleop_stop": "_stop_policy", + "teleop_delta": "teleop_delta", } method_name = _ALIASES.get(action, action) method = getattr(self, method_name, None) - if method is None or action.startswith("_"): + if method is None or (action.startswith("_") and action not in _ALIASES): return {"status": "error", "content": [{"text": f"❌ Unknown action: {action}"}]} # Build kwargs from input dict, excluding 'action' itself @@ -771,6 +818,10 @@ def _dispatch_action(self, action: str, d: Dict[str, Any]) -> Dict[str, Any]: kwargs["name"] = d["robot_name"] elif param_name == "robot_name" and "robot_name" not in d and "name" in d: kwargs["robot_name"] = d["name"] + elif param_name == "robot_name" and "robot_name" not in d and "name" not in d: + # Auto-detect first robot in world + if self._world and self._world.robots: + kwargs["robot_name"] = next(iter(self._world.robots)) elif param_name in d: kwargs[param_name] = d[param_name] # Forward policy kwargs @@ -782,15 +833,130 @@ def _dispatch_action(self, action: str, d: Dict[str, Any]) -> Dict[str, Any]: return method(**kwargs) + def teleop_delta(self, joint: int = 0, delta: float = 0.0, robot_name: str = "", **kwargs) -> Dict[str, Any]: + """Apply a delta to a specific joint and step the simulation. + + Used by the dashboard keyboard teleop (WASD keys) to nudge joints. + The joint is identified by index into the robot's joint_names list. + + Args: + joint: Joint index (0-based into robot's joint list) + delta: Position delta to apply (radians) + robot_name: Robot name (auto-detected if empty) + """ + if self._world is None or self._world._model is None: + return {"status": "error", "content": [{"text": "❌ No simulation."}]} + + mj = _ensure_mujoco() + model, data = self._world._model, self._world._data + + if not robot_name and self._world.robots: + robot_name = next(iter(self._world.robots)) + robot = self._world.robots.get(robot_name) + if not robot: + return {"status": "error", "content": [{"text": f"❌ Robot '{robot_name}' not found."}]} + + if joint < 0 or joint >= len(robot.joint_names): + return {"status": "error", "content": [{"text": f"❌ Joint index {joint} out of range (0-{len(robot.joint_names)-1})"}]} + + jnt_name = robot.joint_names[joint] + jnt_id = mj.mj_name2id(model, mj.mjtObj.mjOBJ_JOINT, jnt_name) + if jnt_id < 0: + return {"status": "error", "content": [{"text": f"❌ Joint '{jnt_name}' not found in model"}]} + + # Apply delta to joint position + qpos_adr = model.jnt_qposadr[jnt_id] + data.qpos[qpos_adr] += delta + + # Also set ctrl if there's a matching actuator + act_id = mj.mj_name2id(model, mj.mjtObj.mjOBJ_ACTUATOR, jnt_name) + if act_id >= 0: + data.ctrl[act_id] = float(data.qpos[qpos_adr]) + + # Step the simulation to apply + for _ in range(5): # small burst of steps for smooth motion + mj.mj_step(model, data) + self._world.sim_time = data.time + self._world.step_count += 5 + + if hasattr(self, '_viewer_handle') and self._viewer_handle is not None: + self._viewer_handle.sync() + + return {"status": "success"} + def _stop_policy(self, robot_name: str = "", **kwargs) -> Dict[str, Any]: + # Auto-detect robot if not specified + if not robot_name and self._world and self._world.robots: + robot_name = next(iter(self._world.robots)) if self._world and robot_name in self._world.robots: self._world.robots[robot_name].policy_running = False return {"status": "success", "content": [{"text": f"🛑 Stopped on '{robot_name}'"}]} + # Stop ALL robots if name not found + if self._world: + for rn, robj in self._world.robots.items(): + robj.policy_running = False + return {"status": "success", "content": [{"text": "🛑 All policies stopped"}]} return {"status": "error", "content": [{"text": f"❌ '{robot_name}' not found."}]} + # --- Camera Streaming --- + + def _render_camera_jpeg_b64(self, camera_name: str = "default") -> Optional[str]: + """Render a camera view and return as base64 JPEG string for mesh streaming. + + Thread-safe: if model changes between calls, the old renderer ref is + dropped by _invalidate_renderers() and a new one is created here. + """ + if self._world is None or self._world._model is None: + return None + try: + mj = _ensure_mujoco() + w, h = 320, 240 + model = self._world._model + data = self._world._data + renderer = getattr(self, "_cam_stream_renderer", None) + if renderer is None: + renderer = mj.Renderer(model, height=h, width=w) + self._cam_stream_renderer = renderer + cam_id = mj.mj_name2id(model, mj.mjtObj.mjOBJ_CAMERA, camera_name) + if cam_id >= 0: + renderer.update_scene(data, camera=cam_id) + else: + renderer.update_scene(data) + img = renderer.render().copy() + from PIL import Image + import io + import base64 + pil_img = Image.fromarray(img) + buffer = io.BytesIO() + pil_img.save(buffer, format="JPEG", quality=60) + return base64.b64encode(buffer.getvalue()).decode("ascii") + except Exception as e: + # If renderer is stale after model change, discard and retry next frame + self._cam_stream_renderer = None + logger.debug("Camera JPEG render failed: %s", e) + return None + + def start_camera_stream(self, camera: str = "default", fps: int = 10, **kwargs) -> Dict[str, Any]: + """Start streaming camera frames over Zenoh mesh.""" + if not hasattr(self, "mesh") or not self.mesh: + return {"status": "error", "content": [{"text": "❌ Mesh not enabled"}]} + self.mesh.start_camera_stream(self._render_camera_jpeg_b64, camera=camera, fps=fps) + return {"status": "success", "content": [{"text": f"📹 Camera stream started: {camera} @ {fps}fps"}]} + + def stop_camera_stream(self, **kwargs) -> Dict[str, Any]: + """Stop camera stream.""" + return {"status": "success", "content": [{"text": "📹 Camera stream will stop on cleanup"}]} + # --- Cleanup --- def cleanup(self): + # Close dedicated camera stream renderer + if hasattr(self, "_cam_stream_renderer") and self._cam_stream_renderer: + try: + self._cam_stream_renderer.close() + except Exception: + pass + self._cam_stream_renderer = None if hasattr(self, "mesh") and self.mesh: self.mesh.stop() if self._world: diff --git a/strands_robots/zenoh_mesh.py b/strands_robots/zenoh_mesh.py index c60027f..8c34a2d 100644 --- a/strands_robots/zenoh_mesh.py +++ b/strands_robots/zenoh_mesh.py @@ -348,7 +348,11 @@ def _state_loop(self): time.sleep(1.0 / STATE_HZ) def _read_state(self) -> Optional[dict]: - """Read current robot/sim state for publishing.""" + """Read current robot/sim state for publishing. + + Includes joints, task status, simulation world metadata (objects, + cameras, robots) so the dashboard can render the full scene. + """ r = self.robot s: dict = {"peer_id": self.peer_id, "t": time.time()} try: @@ -371,16 +375,67 @@ def _read_state(self) -> Optional[dict]: "duration": ts.duration, } - # Simulation + # Simulation world state if hasattr(r, "_world") and r._world is not None: w = r._world if hasattr(w, "_data") and w._data is not None: s["sim_time"] = float(w._data.time) elif hasattr(r, "_data") and r._data is not None: s["sim_time"] = float(r._data.time) - if hasattr(r, "_world") and r._world is not None and hasattr(r._world, "robots"): - for name in r._world.robots: - s.setdefault("robots", {})[name] = {"active": True} + + # Sim joint positions (for dashboard joint gauges) + if "joints" not in s and hasattr(w, "robots") and w.robots and hasattr(w, "_model") and w._model is not None: + try: + import importlib + mj = importlib.import_module("mujoco") + model, data = w._model, w._data + if data is not None: + joints = {} + for rname, rob in w.robots.items(): + for jnt_name in getattr(rob, "joint_names", []): + jnt_id = mj.mj_name2id(model, mj.mjtObj.mjOBJ_JOINT, jnt_name) + if jnt_id >= 0: + joints[jnt_name] = float(data.qpos[model.jnt_qposadr[jnt_id]]) + if joints: + s["joints"] = joints + except Exception: + pass + + # Robots in world + if hasattr(w, "robots"): + robots_info = {} + for name, rob in w.robots.items(): + robots_info[name] = { + "active": True, + "policy_running": getattr(rob, "policy_running", False), + "joints": len(getattr(rob, "joint_names", [])), + } + s["robots"] = robots_info + + # Objects in world (scene composition) + if hasattr(w, "objects") and w.objects: + s["objects"] = { + name: { + "shape": getattr(obj, "shape", "?"), + "position": getattr(obj, "position", []), + "color": getattr(obj, "color", [])[:3] if getattr(obj, "color", None) else [], + "is_static": getattr(obj, "is_static", False), + } + for name, obj in w.objects.items() + } + + # Cameras in world + if hasattr(w, "cameras") and w.cameras: + s["cameras"] = list(w.cameras.keys()) + + # World metadata + if hasattr(w, "_model") and w._model is not None: + s["world"] = { + "bodies": w._model.nbody, + "joints": w._model.njnt, + "actuators": w._model.nu, + "step_count": getattr(w, "step_count", 0), + } except Exception: pass @@ -444,6 +499,11 @@ def _dispatch(self, cmd: dict) -> dict: return {"status": getattr(getattr(r, "_task_state", None), "status", "unknown")} if action == "stop": + # Simulation: stop policy on all robots + if hasattr(r, "_world") and r._world: + for rname, robj in r._world.robots.items(): + robj.policy_running = False + return {"status": "stopped", "content": [{"text": "🛑 Policy stopped"}]} return r.stop_task() if hasattr(r, "stop_task") else {"ok": True} if action == "features": @@ -470,6 +530,32 @@ def _dispatch(self, cmd: dict) -> dict: ) if k in cmd } + + # Simulation backend — route to run_policy / start_policy + if hasattr(r, "run_policy") and hasattr(r, "_world"): + robot_name = None + if r._world and r._world.robots: + robot_name = cmd.get("robot_name") or next(iter(r._world.robots)) + if not robot_name: + return {"error": "no robots in simulation"} + if action == "execute": + return r.run_policy( + robot_name=robot_name, + policy_provider=pp, + instruction=instr, + duration=dur, + **kw, + ) + else: # start + return r.start_policy( + robot_name=robot_name, + policy_provider=pp, + instruction=instr, + duration=dur, + **kw, + ) + + # Hardware robot backend if action == "execute" and hasattr(r, "_execute_task_sync"): return r._execute_task_sync(instr, pp, port, host, dur, **kw) if action == "start" and hasattr(r, "start_task"): @@ -481,6 +567,23 @@ def _dispatch(self, cmd: dict) -> dict: if action == "reset" and hasattr(r, "reset"): return r.reset() + # Camera stream — dispatch to simulation's handler or robot's + if action == "start_camera_stream": + if hasattr(r, "_dispatch_action"): + return r._dispatch_action("start_camera_stream", cmd) + elif hasattr(r, "start_camera_stream"): + return r.start_camera_stream( + camera=cmd.get("camera", "default"), + fps=cmd.get("fps", 10), + ) + return {"error": "start_camera_stream not supported by this peer"} + + # Teleop delta — nudge a joint in simulation + if action == "teleop_delta": + if hasattr(r, "_dispatch_action"): + return r._dispatch_action("teleop_delta", cmd) + return {"error": "teleop_delta requires Simulation backend"} + return {"error": f"unknown action: {action}"} def _on_response(self, sample): @@ -577,8 +680,94 @@ def unsubscribe(self, name: str): pass self.inbox.pop(name, None) + # ── Event publishing (discrete mutations) ───────────────── + + def publish_event(self, event_type: str, data: dict = None): + """Publish a discrete event to the mesh. + + Used for mutations like object_added, object_removed, robot_added, + policy_started, etc. Dashboard subscribes to strands/*/event. + + Args: + event_type: Event type string (e.g. "object_added", "policy_started") + data: Event payload dict + """ + _put( + f"strands/{self.peer_id}/event", + { + "peer_id": self.peer_id, + "event_type": event_type, + "data": data or {}, + "ts": time.time(), + }, + ) + # ── VLA execution stream ─────────────────────────────────── + # ── Camera frame publishing ────────────────────────────── + + def publish_camera_frame( + self, + frame_b64: str, + camera: str = "default", + ): + """Publish a camera frame (base64 JPEG) to the mesh. + + Dashboard and other peers subscribe to strands/{peer_id}/camera. + Frames are JPEG-encoded and base64'd for JSON transport. + + Args: + frame_b64: Base64-encoded JPEG image data + camera: Camera name (default: "default") + """ + _put( + f"strands/{self.peer_id}/camera", + { + "peer_id": self.peer_id, + "camera": camera, + "frame": frame_b64, + "ts": time.time(), + }, + ) + + def start_camera_stream( + self, + render_fn, + camera: str = "default", + fps: float = 10.0, + ): + """Start a background thread that publishes camera frames. + + Args: + render_fn: Callable that returns base64 JPEG string. + Signature: render_fn(camera_name: str) -> str + camera: Camera name + fps: Frames per second (default: 10) + """ + if not hasattr(self, "_camera_threads"): + self._camera_threads = {} + + key = camera + if key in self._camera_threads and self._camera_threads[key].is_alive(): + return # Already streaming + + def _stream_loop(): + interval = 1.0 / fps + while self._running: + try: + frame_b64 = render_fn(camera) + if frame_b64: + self.publish_camera_frame(frame_b64, camera) + except Exception as e: + logger.debug("Camera stream error: %s", e) + time.sleep(interval) + + t = threading.Thread(target=_stream_loop, daemon=True, name=f"cam-{self.peer_id}-{camera}") + t.start() + self._camera_threads[key] = t + logger.info("📹 %s camera stream started: %s @ %.0f fps", self.peer_id, camera, fps) + + def publish_step( self, step: int, From 265865347710a00e33a0f2ede5924dc759491b03 Mon Sep 17 00:00:00 2001 From: cagataycali Date: Fri, 13 Mar 2026 23:46:36 -0400 Subject: [PATCH 2/2] feat: add robots/robots-dashboard CLI entry points --- pyproject.toml | 5 +++++ strands_robots/dashboard/__init__.py | 3 +++ 2 files changed, 8 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 3ef1792..d14e723 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -159,6 +159,11 @@ dependencies = [ "msgpack>=1.0.0", ] + +[project.scripts] +robots = "strands_robots.dashboard:main" +robots-dashboard = "strands_robots.dashboard:main" + [tool.hatch.envs.default.scripts] test = "pytest {args:tests}" lint = "ruff check strands_robots tests" diff --git a/strands_robots/dashboard/__init__.py b/strands_robots/dashboard/__init__.py index 6079f43..5fdd543 100644 --- a/strands_robots/dashboard/__init__.py +++ b/strands_robots/dashboard/__init__.py @@ -1 +1,4 @@ # Dashboard package +from strands_robots.dashboard.server import main + +__all__ = ["main"]