diff --git a/pyproject.toml b/pyproject.toml
index 3ef1792..d14e723 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -159,6 +159,11 @@ dependencies = [
"msgpack>=1.0.0",
]
+
+[project.scripts]
+robots = "strands_robots.dashboard:main"
+robots-dashboard = "strands_robots.dashboard:main"
+
[tool.hatch.envs.default.scripts]
test = "pytest {args:tests}"
lint = "ruff check strands_robots tests"
diff --git a/strands_robots/dashboard/__init__.py b/strands_robots/dashboard/__init__.py
new file mode 100644
index 0000000..5fdd543
--- /dev/null
+++ b/strands_robots/dashboard/__init__.py
@@ -0,0 +1,4 @@
+# Dashboard package
+from strands_robots.dashboard.server import main
+
+__all__ = ["main"]
diff --git a/strands_robots/dashboard/__main__.py b/strands_robots/dashboard/__main__.py
new file mode 100644
index 0000000..1165aca
--- /dev/null
+++ b/strands_robots/dashboard/__main__.py
@@ -0,0 +1,4 @@
+"""Run the dashboard: python -m strands_robots.dashboard"""
+from strands_robots.dashboard.server import main
+
+main()
diff --git a/strands_robots/dashboard/server.py b/strands_robots/dashboard/server.py
new file mode 100644
index 0000000..097f7ee
--- /dev/null
+++ b/strands_robots/dashboard/server.py
@@ -0,0 +1,1591 @@
+#!/usr/bin/env python3
+"""Strands Robots Dashboard — Zenoh-native web dashboard for all Robot() instances.
+
+Every Robot("so100"), Robot("panda"), etc. auto-joins the Zenoh mesh.
+This dashboard subscribes to ALL mesh traffic and renders it in a
+real-time web UI:
+
+- Live peer discovery (robots, sims, agents)
+- Joint state visualization (sparklines + gauges)
+- VLA execution stream (watch policy steps live)
+- Camera feeds (JPEG frames from sim/real cameras over Zenoh)
+- Action vector visualization (live bar charts)
+- Command dispatch (tell any robot to do something)
+- Emergency stop (broadcast to all)
+- Agent grid layout — switch between peers, see everything at once
+
+Architecture:
+ Robot() ──► Zenoh mesh ◄── Dashboard (this)
+ Robot() ──► ◄── Agent
+ Sim() ──► ◄── Other dashboards
+
+Usage:
+ python -m strands_robots.dashboard.server --port 7860
+ # Opens http://localhost:7860
+
+ # Or from Python:
+ from strands_robots.dashboard.server import start_dashboard
+ start_dashboard(port=7860)
+"""
+
+import argparse
+import asyncio
+import json
+import logging
+import os
+import socket
+import threading
+import time
+import uuid
+from http.server import HTTPServer, SimpleHTTPRequestHandler
+from socketserver import ThreadingMixIn
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+logger = logging.getLogger(__name__)
+
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+# State — shared between Zenoh subscribers and WebSocket clients
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+
+_STATE: Dict[str, Any] = {
+ "peers": {}, # peer_id → {type, hostname, last_seen, caps, ...}
+ "states": {}, # peer_id → {joints, sim_time, task, ...}
+ "streams": {}, # peer_id → [last N stream steps]
+ "cameras": {}, # peer_id → {camera_name: base64_jpeg}
+ "commands_sent": [], # [{target, cmd, ts}, ...]
+ "responses": [], # [{from, turn, result, ts}, ...]
+ "events": [], # [{ts, type, data}, ...] ring buffer
+}
+_STATE_LOCK = threading.Lock()
+_WS_CLIENTS: List[Any] = []
+_WS_LOCK = threading.Lock()
+_ZENOH_SESSION = None
+_ZENOH_SUBS = []
+_DASHBOARD_PEER_ID = f"dashboard-{uuid.uuid4().hex[:6]}"
+
+MAX_EVENTS = 500
+MAX_STREAM_STEPS = 200
+
+
+def _add_event(event_type: str, data: dict):
+ """Add event to the ring buffer."""
+ with _STATE_LOCK:
+ _STATE["events"].append({
+ "ts": time.time(),
+ "type": event_type,
+ "data": data,
+ })
+ if len(_STATE["events"]) > MAX_EVENTS:
+ _STATE["events"] = _STATE["events"][-MAX_EVENTS:]
+ _broadcast_ws({"type": "event", "event_type": event_type, "data": data, "ts": time.time()})
+
+
+def _broadcast_ws(msg: dict):
+ """Send message to all connected WebSocket clients."""
+ payload = json.dumps(msg, default=str)
+ with _WS_LOCK:
+ dead = []
+ for ws in _WS_CLIENTS:
+ try:
+ ws.send(payload)
+ except Exception:
+ dead.append(ws)
+ for ws in dead:
+ _WS_CLIENTS.remove(ws)
+
+
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+# Zenoh Mesh Subscriber — listens to everything on strands/**
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+
+def _start_zenoh():
+ """Connect to the Zenoh mesh and subscribe to all robot traffic."""
+ global _ZENOH_SESSION
+
+ try:
+ import zenoh
+ except ImportError:
+ logger.warning("eclipse-zenoh not installed. Dashboard runs without live mesh data.")
+ logger.warning("Install: pip install eclipse-zenoh")
+ return False
+
+ try:
+ config = zenoh.Config()
+
+ MESH_PORT = int(os.getenv("STRANDS_MESH_PORT", "7447"))
+ MESH_EP = f"tcp/127.0.0.1:{MESH_PORT}"
+
+ connect = os.getenv("ZENOH_CONNECT")
+ listen = os.getenv("ZENOH_LISTEN")
+
+ if connect:
+ config.insert_json5("connect/endpoints", json.dumps([e.strip() for e in connect.split(",")]))
+ if listen:
+ config.insert_json5("listen/endpoints", json.dumps([e.strip() for e in listen.split(",")]))
+
+ if not connect and not listen:
+ try:
+ cfg_try = zenoh.Config()
+ cfg_try.insert_json5("listen/endpoints", json.dumps([MESH_EP]))
+ cfg_try.insert_json5("connect/endpoints", json.dumps([MESH_EP]))
+ _ZENOH_SESSION = zenoh.open(cfg_try)
+ logger.info("Zenoh dashboard session opened (listener on %s)", MESH_EP)
+ except Exception:
+ config.insert_json5("mode", '"client"')
+ config.insert_json5("connect/endpoints", json.dumps([MESH_EP]))
+ _ZENOH_SESSION = zenoh.open(config)
+ logger.info("Zenoh dashboard session opened (client)")
+ else:
+ _ZENOH_SESSION = zenoh.open(config)
+ logger.info("Zenoh dashboard session opened")
+
+ # Subscribe to all strands topics
+ _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/presence", _on_presence))
+ _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/state", _on_state))
+ _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/stream", _on_stream))
+ _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/camera", _on_camera))
+ _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/*/event", _on_sim_event))
+ _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("strands/broadcast", _on_broadcast))
+ _ZENOH_SUBS.append(_ZENOH_SESSION.declare_subscriber("reachy_mini/**", _on_external))
+
+ # Publish dashboard presence
+ threading.Thread(target=_dashboard_heartbeat, daemon=True).start()
+
+ logger.info("Zenoh subscriptions active — listening for robots")
+ return True
+
+ except Exception as e:
+ logger.error("Zenoh setup failed: %s", e)
+ return False
+
+
+def _dashboard_heartbeat():
+ """Publish dashboard presence on the mesh."""
+ while _ZENOH_SESSION:
+ try:
+ _ZENOH_SESSION.put(
+ f"strands/{_DASHBOARD_PEER_ID}/presence",
+ json.dumps({
+ "robot_id": _DASHBOARD_PEER_ID,
+ "robot_type": "dashboard",
+ "hostname": socket.gethostname(),
+ "timestamp": time.time(),
+ "ws_clients": len(_WS_CLIENTS),
+ }).encode(),
+ )
+ except Exception:
+ pass
+ time.sleep(2.0)
+
+
+def _on_presence(sample):
+ """Handle presence heartbeats from robots/sims."""
+ try:
+ data = json.loads(sample.payload.to_bytes().decode())
+ peer_id = data.get("robot_id", "")
+ if not peer_id or peer_id == _DASHBOARD_PEER_ID:
+ return
+
+ with _STATE_LOCK:
+ is_new = peer_id not in _STATE["peers"]
+ _STATE["peers"][peer_id] = {
+ **data,
+ "last_seen": time.time(),
+ }
+
+ if is_new:
+ _add_event("peer_join", {"peer_id": peer_id, **data})
+ logger.info("🤖 New peer: %s (%s)", peer_id, data.get("robot_type", "?"))
+
+ # Auto-request camera stream from sim and robot peers
+ role = data.get("role", data.get("robot_type", ""))
+ if role in ("sim", "robot"):
+ threading.Thread(
+ target=lambda pid=peer_id: (
+ time.sleep(1), # Wait for peer to be fully ready
+ _send_command(pid, {"action": "start_camera_stream", "camera": "default", "fps": 10 if role == "sim" else 5}),
+ logger.info("📹 Auto-requested camera stream from %s", pid),
+ ),
+ daemon=True,
+ ).start()
+
+ _broadcast_ws({"type": "peers", "peers": _get_peers()})
+
+ except Exception as e:
+ logger.debug("Presence parse error: %s", e)
+
+
+def _on_state(sample):
+ """Handle state updates (joint positions, sim time, task status)."""
+ try:
+ data = json.loads(sample.payload.to_bytes().decode())
+ peer_id = data.get("peer_id", "")
+ if not peer_id:
+ return
+
+ with _STATE_LOCK:
+ _STATE["states"][peer_id] = {
+ **data,
+ "_received_at": time.time(),
+ }
+
+ _broadcast_ws({"type": "state", "peer_id": peer_id, "state": data})
+
+ except Exception as e:
+ logger.debug("State parse error: %s", e)
+
+
+def _on_stream(sample):
+ """Handle VLA execution stream steps."""
+ try:
+ data = json.loads(sample.payload.to_bytes().decode())
+ peer_id = data.get("peer_id", "")
+ if not peer_id:
+ return
+
+ with _STATE_LOCK:
+ if peer_id not in _STATE["streams"]:
+ _STATE["streams"][peer_id] = []
+ _STATE["streams"][peer_id].append(data)
+ if len(_STATE["streams"][peer_id]) > MAX_STREAM_STEPS:
+ _STATE["streams"][peer_id] = _STATE["streams"][peer_id][-MAX_STREAM_STEPS:]
+
+ _broadcast_ws({"type": "stream_step", "peer_id": peer_id, "step": data})
+
+ except Exception as e:
+ logger.debug("Stream parse error: %s", e)
+
+
+def _on_camera(sample):
+ """Handle camera frame data (base64 JPEG from sim/real cameras)."""
+ try:
+ data = json.loads(sample.payload.to_bytes().decode())
+ peer_id = data.get("peer_id", "")
+ cam_name = data.get("camera", "default")
+ frame_b64 = data.get("frame", "")
+ if not peer_id or not frame_b64:
+ return
+
+ with _STATE_LOCK:
+ if peer_id not in _STATE["cameras"]:
+ _STATE["cameras"][peer_id] = {}
+ _STATE["cameras"][peer_id][cam_name] = frame_b64
+
+ # Forward camera frame to WS clients that are watching this peer
+ _broadcast_ws({
+ "type": "camera_frame",
+ "peer_id": peer_id,
+ "camera": cam_name,
+ "frame": frame_b64,
+ "ts": data.get("ts", time.time()),
+ })
+
+ except Exception as e:
+ logger.debug("Camera parse error: %s", e)
+
+
+def _on_broadcast(sample):
+ """Handle broadcast messages (e.g., emergency stop)."""
+ try:
+ data = json.loads(sample.payload.to_bytes().decode())
+ _add_event("broadcast", data)
+ except Exception:
+ pass
+
+
+def _on_sim_event(sample):
+ """Handle discrete simulation events (object added/removed, policy started, etc.)."""
+ try:
+ data = json.loads(sample.payload.to_bytes().decode())
+ peer_id = data.get("peer_id", "")
+ event_type = data.get("event_type", "")
+ event_data = data.get("data", {})
+ if not peer_id or not event_type:
+ return
+
+ _add_event(f"sim:{event_type}", {"peer_id": peer_id, **event_data})
+
+ # Forward to WS clients as a dedicated message type
+ _broadcast_ws({
+ "type": "sim_event",
+ "peer_id": peer_id,
+ "event_type": event_type,
+ "data": event_data,
+ "ts": data.get("ts", time.time()),
+ })
+
+ except Exception as e:
+ logger.debug("Sim event parse error: %s", e)
+
+
+def _on_external(sample):
+ """Handle external robot topics (Reachy Mini, etc.)."""
+ try:
+ key = str(sample.key_expr)
+ raw = sample.payload.to_bytes().decode()
+ try:
+ data = json.loads(raw)
+ except json.JSONDecodeError:
+ data = {"raw": raw}
+
+ _add_event("external", {"topic": key, "data": data})
+ except Exception:
+ pass
+
+
+def _get_peers() -> list:
+ """Get all peers with age calculation."""
+ now = time.time()
+ with _STATE_LOCK:
+ peers = []
+ stale = []
+ for pid, info in _STATE["peers"].items():
+ age = now - info.get("last_seen", 0)
+ if age > 15.0:
+ stale.append(pid)
+ continue
+ peers.append({
+ "peer_id": pid,
+ "type": info.get("robot_type", "unknown"),
+ "hostname": info.get("hostname", "?"),
+ "age": round(age, 1),
+ "task_status": info.get("task_status", ""),
+ "instruction": info.get("instruction", ""),
+ "tool_name": info.get("tool_name", ""),
+ "connected": info.get("connected", None),
+ "sim_robots": info.get("sim_robots", []),
+ "world": info.get("world", False),
+ "action_keys": info.get("action_keys", []),
+ })
+ for pid in stale:
+ del _STATE["peers"][pid]
+ _add_event("peer_leave", {"peer_id": pid})
+ return peers
+
+
+def _send_command(target: str, command: dict):
+ """Send a command to a peer via Zenoh."""
+ if not _ZENOH_SESSION:
+ return {"error": "Zenoh not connected"}
+
+ turn_id = uuid.uuid4().hex[:8]
+ msg = {
+ "sender_id": _DASHBOARD_PEER_ID,
+ "turn_id": turn_id,
+ "command": command,
+ "timestamp": time.time(),
+ }
+
+ try:
+ if target == "broadcast":
+ _ZENOH_SESSION.put("strands/broadcast", json.dumps(msg).encode())
+ else:
+ _ZENOH_SESSION.put(f"strands/{target}/cmd", json.dumps(msg).encode())
+
+ with _STATE_LOCK:
+ _STATE["commands_sent"].append({
+ "target": target,
+ "command": command,
+ "turn_id": turn_id,
+ "ts": time.time(),
+ })
+ if len(_STATE["commands_sent"]) > 100:
+ _STATE["commands_sent"] = _STATE["commands_sent"][-100:]
+
+ _add_event("command_sent", {"target": target, "command": command, "turn_id": turn_id})
+ return {"status": "sent", "turn_id": turn_id}
+
+ except Exception as e:
+ return {"error": str(e)}
+
+
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+# WebSocket Handler (using websockets library)
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+
+async def _ws_handler(websocket):
+ """Handle a single WebSocket client connection."""
+ class WSWrapper:
+ def __init__(self, ws):
+ self._ws = ws
+ self._loop = asyncio.get_event_loop()
+
+ def send(self, data):
+ asyncio.run_coroutine_threadsafe(self._ws.send(data), self._loop)
+
+ wrapper = WSWrapper(websocket)
+ with _WS_LOCK:
+ _WS_CLIENTS.append(wrapper)
+
+ logger.info("WebSocket client connected (%d total)", len(_WS_CLIENTS))
+
+ try:
+ # Send initial state
+ await websocket.send(json.dumps({
+ "type": "init",
+ "peers": _get_peers(),
+ "dashboard_id": _DASHBOARD_PEER_ID,
+ "states": {k: v for k, v in _STATE.get("states", {}).items()},
+ "events": _STATE.get("events", [])[-50:],
+ }, default=str))
+
+ async for message in websocket:
+ try:
+ msg = json.loads(message)
+ msg_type = msg.get("type", "")
+
+ if msg_type == "command":
+ target = msg.get("target", "")
+ command = msg.get("command", {})
+ result = _send_command(target, command)
+ await websocket.send(json.dumps({"type": "command_result", **result}, default=str))
+
+ elif msg_type == "tell":
+ target = msg.get("target", "")
+ instruction = msg.get("instruction", "")
+ policy = msg.get("policy_provider", "mock")
+ duration = msg.get("duration", 30.0)
+
+ # Handle special commands
+ if instruction == "__stop__" or msg.get("stop"):
+ result = _send_command(target, {"action": "stop"})
+ await websocket.send(json.dumps({"type": "command_result", "action": "stop", **result}, default=str))
+ elif instruction == "__teleop_start__" or msg.get("teleop"):
+ cmd = {
+ "action": "teleop_start",
+ "policy_provider": policy,
+ "duration": duration,
+ }
+ for k in ("pretrained_name_or_path", "model_path", "server_address", "policy_type"):
+ if k in msg:
+ cmd[k] = msg[k]
+ result = _send_command(target, cmd)
+ await websocket.send(json.dumps({"type": "command_result", "action": "teleop_start", **result}, default=str))
+ elif instruction == "__teleop_stop__" or msg.get("teleop_stop"):
+ result = _send_command(target, {"action": "teleop_stop"})
+ await websocket.send(json.dumps({"type": "command_result", "action": "teleop_stop", **result}, default=str))
+ elif instruction == "__teleop_delta__" and msg.get("teleop_delta"):
+ delta = msg["teleop_delta"]
+ result = _send_command(target, {"action": "teleop_delta", **delta})
+ # Don't send ack for delta (too noisy)
+ else:
+ cmd = {
+ "action": "execute",
+ "instruction": instruction,
+ "policy_provider": policy,
+ "duration": duration,
+ }
+ # Forward extra policy kwargs
+ for k in ("pretrained_name_or_path", "model_path", "server_address", "policy_type"):
+ if k in msg:
+ cmd[k] = msg[k]
+ result = _send_command(target, cmd)
+ await websocket.send(json.dumps({"type": "command_result", **result}, default=str))
+
+ elif msg_type == "emergency_stop":
+ result = _send_command("broadcast", {"action": "stop"})
+ await websocket.send(json.dumps({"type": "command_result", "action": "emergency_stop", **result}, default=str))
+
+ elif msg_type == "get_peers":
+ await websocket.send(json.dumps({"type": "peers", "peers": _get_peers()}, default=str))
+
+ elif msg_type == "get_state":
+ peer_id = msg.get("peer_id", "")
+ state = _STATE.get("states", {}).get(peer_id, {})
+ await websocket.send(json.dumps({"type": "state", "peer_id": peer_id, "state": state}, default=str))
+
+ elif msg_type == "get_stream":
+ peer_id = msg.get("peer_id", "")
+ steps = _STATE.get("streams", {}).get(peer_id, [])
+ await websocket.send(json.dumps({"type": "stream_history", "peer_id": peer_id, "steps": steps[-50:]}, default=str))
+
+ elif msg_type == "get_camera":
+ peer_id = msg.get("peer_id", "")
+ cams = _STATE.get("cameras", {}).get(peer_id, {})
+ await websocket.send(json.dumps({"type": "camera_state", "peer_id": peer_id, "cameras": cams}, default=str))
+
+ elif msg_type == "request_camera_stream":
+ # Ask a sim peer to start publishing camera frames
+ target = msg.get("peer_id", "")
+ cam = msg.get("camera", "default")
+ fps = msg.get("fps", 10)
+ result = _send_command(target, {
+ "action": "start_camera_stream",
+ "camera": cam,
+ "fps": fps,
+ })
+ await websocket.send(json.dumps({"type": "command_result", **result}, default=str))
+
+ elif msg_type == "ping":
+ await websocket.send(json.dumps({"type": "pong", "ts": time.time()}))
+
+ except json.JSONDecodeError:
+ await websocket.send(json.dumps({"type": "error", "message": "Invalid JSON"}))
+ except Exception as e:
+ await websocket.send(json.dumps({"type": "error", "message": str(e)}))
+
+ except Exception:
+ pass
+ finally:
+ with _WS_LOCK:
+ if wrapper in _WS_CLIENTS:
+ _WS_CLIENTS.remove(wrapper)
+ logger.info("WebSocket client disconnected (%d remaining)", len(_WS_CLIENTS))
+
+
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+# HTML Dashboard UI — Full Agent Grid
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+
+DASHBOARD_HTML = r"""
+
+
+
+
+Strands Robots Dashboard
+
+
+
+
+
+
+
+
+
+
+
🤖
Create a Robot() to see it here
from strands_robots import Robot; sim = Robot("so100")
+
+
+
+
+
+
+"""
+
+
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+# HTTP + WebSocket Server
+# ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+
+class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
+ """HTTPServer that handles each request in a separate thread."""
+ daemon_threads = True
+
+
+class DashboardHTTPHandler(SimpleHTTPRequestHandler):
+ """Serve the dashboard HTML and API endpoints."""
+
+ def do_GET(self):
+ if self.path == "/" or self.path == "/index.html":
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html; charset=utf-8")
+ self.end_headers()
+ self.wfile.write(DASHBOARD_HTML.encode("utf-8"))
+ elif self.path == "/api/peers":
+ self._json_response(_get_peers())
+ elif self.path == "/api/state":
+ self._json_response(_STATE.get("states", {}))
+ elif self.path == "/api/events":
+ self._json_response(_STATE.get("events", [])[-100:])
+ elif self.path.startswith("/api/state/"):
+ peer_id = self.path.split("/api/state/")[1]
+ self._json_response(_STATE.get("states", {}).get(peer_id, {}))
+ elif self.path.startswith("/api/cameras/"):
+ peer_id = self.path.split("/api/cameras/")[1]
+ self._json_response(_STATE.get("cameras", {}).get(peer_id, {}))
+ else:
+ self.send_response(404)
+ self.end_headers()
+
+ def _json_response(self, data):
+ self.send_response(200)
+ self.send_header("Content-Type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.end_headers()
+ self.wfile.write(json.dumps(data, default=str).encode())
+
+ def log_message(self, format, *args):
+ pass
+
+
+def start_dashboard(port: int = 7860, open_browser: bool = True):
+ """Start the Strands Robots Dashboard.
+
+ Args:
+ port: HTTP port (WebSocket will be port+1)
+ open_browser: Auto-open browser
+ """
+ ws_port = port + 1
+
+ print(f"🤖 Strands Robots Dashboard")
+ print(f"=" * 50)
+
+ zenoh_ok = _start_zenoh()
+ if zenoh_ok:
+ print(f"✅ Zenoh mesh connected (peer: {_DASHBOARD_PEER_ID})")
+ else:
+ print(f"⚠️ Zenoh not available — dashboard will show no live data")
+ print(f" Install: pip install eclipse-zenoh")
+
+ async def _ws_server():
+ import websockets
+ async with websockets.serve(_ws_handler, "0.0.0.0", ws_port):
+ logger.info("WebSocket server on :%d", ws_port)
+ await asyncio.Future()
+
+ ws_thread = threading.Thread(
+ target=lambda: asyncio.new_event_loop().run_until_complete(_ws_server()),
+ daemon=True,
+ )
+ ws_thread.start()
+ print(f"✅ WebSocket server: ws://localhost:{ws_port}")
+
+ httpd = ThreadingHTTPServer(("0.0.0.0", port), DashboardHTTPHandler)
+ print(f"✅ Dashboard: http://localhost:{port}")
+ print(f"=" * 50)
+ print(f"📡 Listening for Robot() instances on Zenoh mesh...")
+ print(f" Create a robot to see it here: Robot('so100')")
+ print()
+
+ if open_browser:
+ try:
+ import webbrowser
+ webbrowser.open(f"http://localhost:{port}")
+ except Exception:
+ pass
+
+ try:
+ httpd.serve_forever()
+ except KeyboardInterrupt:
+ print("\n🛑 Dashboard stopped")
+ httpd.shutdown()
+
+
+def main():
+ """CLI entry point."""
+ parser = argparse.ArgumentParser(description="Strands Robots Dashboard")
+ parser.add_argument("--port", "-p", type=int, default=7860, help="HTTP port (default: 7860)")
+ parser.add_argument("--no-browser", action="store_true", help="Don't auto-open browser")
+ args = parser.parse_args()
+ start_dashboard(port=args.port, open_browser=not args.no_browser)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/strands_robots/robot.py b/strands_robots/robot.py
index a0c07eb..83487eb 100644
--- a/strands_robots/robot.py
+++ b/strands_robots/robot.py
@@ -1037,6 +1037,62 @@ def get_features(self) -> Dict[str, Any]:
],
}
+ # --- Camera streaming for dashboard ---
+
+ def _render_camera_jpeg_b64(self, camera_name: str = "default") -> str | None:
+ """Render a camera frame from the real robot as base64 JPEG for mesh streaming.
+
+ Grabs the latest observation and encodes the first camera image.
+ """
+ try:
+ if not getattr(self.robot, "is_connected", False):
+ return None
+
+ observation = self.robot.get_observation()
+
+ # Find camera keys
+ camera_keys = []
+ if hasattr(self.robot, "config") and hasattr(self.robot.config, "cameras"):
+ camera_keys = list(self.robot.config.cameras.keys())
+
+ # If specific camera requested, use it; otherwise use first available
+ cam_key = camera_name if camera_name in observation else (camera_keys[0] if camera_keys else None)
+ if not cam_key or cam_key not in observation:
+ return None
+
+ img = observation[cam_key]
+ if not hasattr(img, "shape") or len(img.shape) < 2:
+ return None
+
+ import base64
+ import io
+ from PIL import Image
+
+ # Convert numpy array to JPEG
+ if hasattr(img, "numpy"):
+ img = img.numpy()
+ pil_img = Image.fromarray(img)
+ buffer = io.BytesIO()
+ pil_img.save(buffer, format="JPEG", quality=60)
+ return base64.b64encode(buffer.getvalue()).decode("ascii")
+
+ except Exception as e:
+ logger.debug("Robot camera render failed: %s", e)
+ return None
+
+ def start_camera_stream(self, camera: str = "default", fps: int = 5) -> Dict[str, Any]:
+ """Start streaming camera frames from real robot over Zenoh mesh.
+
+ Args:
+ camera: Camera name (default: first available)
+ fps: Frames per second (default: 5, lower than sim for bandwidth)
+ """
+ if not self.mesh or not self.mesh.alive:
+ return {"status": "error", "content": [{"text": "❌ Mesh not enabled"}]}
+
+ self.mesh.start_camera_stream(self._render_camera_jpeg_b64, camera=camera, fps=fps)
+ return {"status": "success", "content": [{"text": f"📹 Camera stream started: {camera} @ {fps}fps"}]}
+
def tool_name(self) -> str:
return self.tool_name_str
diff --git a/strands_robots/simulation/_mjcf_builder.py b/strands_robots/simulation/_mjcf_builder.py
index ec54173..3ce7909 100644
--- a/strands_robots/simulation/_mjcf_builder.py
+++ b/strands_robots/simulation/_mjcf_builder.py
@@ -63,13 +63,26 @@ def build_objects_only(world: SimWorld) -> str:
return "\n".join(parts)
+ @staticmethod
+ def _pad4(val, default):
+ """Pad a list to exactly 4 elements (for quaternion/RGBA)."""
+ if val is None:
+ return list(default)
+ v = list(val)
+ while len(v) < 4:
+ v.append(default[len(v)] if len(v) < len(default) else 0.0)
+ return v[:4]
+
@staticmethod
def _object_xml(obj: SimObject, indent: int = 4) -> str:
"""Generate MJCF XML for a single object."""
pad = " " * indent
- px, py, pz = obj.position
- qw, qx, qy, qz = obj.orientation
- r, g, b, a = obj.color
+ pos = list(obj.position) + [0.0, 0.0, 0.0]
+ px, py, pz = pos[0], pos[1], pos[2]
+ ori = MJCFBuilder._pad4(obj.orientation, [1.0, 0.0, 0.0, 0.0])
+ qw, qx, qy, qz = ori[0], ori[1], ori[2], ori[3]
+ col = MJCFBuilder._pad4(obj.color, [0.5, 0.5, 0.5, 1.0])
+ r, g, b, a = col[0], col[1], col[2], col[3]
lines = []
lines.append(f'{pad}')
diff --git a/strands_robots/simulation/_policy_runner.py b/strands_robots/simulation/_policy_runner.py
index 8aa7bad..61de574 100644
--- a/strands_robots/simulation/_policy_runner.py
+++ b/strands_robots/simulation/_policy_runner.py
@@ -77,6 +77,10 @@ def run_policy(
robot.policy_steps = 0
next_frame_step = 0.0
+ # Notify dashboard
+ if hasattr(self, '_publish_event'):
+ self._publish_event("policy_started", {"robot": robot_name, "policy": policy_provider, "instruction": instruction})
+
sim_duration = duration * control_frequency # target number of control steps
start_time = time.time()
action_sleep = 1.0 / control_frequency
@@ -112,6 +116,19 @@ def run_policy(
self._apply_sim_action(robot_name, action_dict)
robot.policy_steps += 1
+ # Stream step to mesh for dashboard live monitoring
+ if hasattr(self, "mesh") and self.mesh and self.mesh.alive:
+ try:
+ self.mesh.publish_step(
+ step=robot.policy_steps,
+ observation=observation,
+ action=action_dict,
+ instruction=instruction,
+ policy=policy_provider,
+ )
+ except Exception:
+ pass # Never let mesh errors break the control loop
+
if writer and robot.policy_steps >= next_frame_step:
renderer = self._get_renderer(video_width, video_height)
if cam_id >= 0:
@@ -128,6 +145,10 @@ def run_policy(
elapsed = time.time() - start_time
robot.policy_running = False
+ # Notify dashboard
+ if hasattr(self, '_publish_event'):
+ self._publish_event("policy_completed", {"robot": robot_name, "steps": robot.policy_steps, "elapsed": round(elapsed, 1)})
+
result_text = (
f"✅ Policy complete on '{robot_name}'\n"
f"🧠 {policy_provider} | 🎯 {instruction}\n"
diff --git a/strands_robots/simulation/simulation.py b/strands_robots/simulation/simulation.py
index 4994ba6..fc2e228 100644
--- a/strands_robots/simulation/simulation.py
+++ b/strands_robots/simulation/simulation.py
@@ -219,10 +219,39 @@ def _compile_world(self):
def _recompile_world(self) -> Dict[str, Any]:
try:
self._compile_world()
+ self._invalidate_renderers()
return {"status": "success"}
except Exception as e:
return {"status": "error", "content": [{"text": f"❌ Recompile failed: {e}"}]}
+ def _invalidate_renderers(self):
+ """Invalidate all cached renderers after model recompile.
+
+ Called after any operation that changes the MuJoCo model (add_object,
+ remove_object, add_robot, add_camera, etc.) so camera streams and
+ render() produce frames from the new model.
+
+ NOTE: We only set references to None — we do NOT call .close() here
+ because the camera stream thread may still be using the renderer.
+ The old renderer will be garbage collected when no longer referenced.
+ New renderers are lazily created on next use.
+ """
+ # Set to None so next render creates a fresh renderer for the new model.
+ # The old renderer object will be GC'd when the camera thread moves on.
+ self._cam_stream_renderer = None
+
+ # Clear all cached renderers — same pattern: just drop references
+ self._renderers.clear()
+ self._renderer_model = None
+
+ def _publish_event(self, event_type: str, data: dict = None):
+ """Publish a sim mutation event to the Zenoh mesh."""
+ if hasattr(self, "mesh") and self.mesh and self.mesh.alive:
+ try:
+ self.mesh.publish_event(event_type, data or {})
+ except Exception:
+ pass
+
# --- Robot Management ---
@staticmethod
@@ -358,6 +387,8 @@ def add_robot(
mj.mj_step(model, data)
source = f"data_config='{data_config}'" if data_config else os.path.basename(resolved_path)
+ self._invalidate_renderers()
+ self._publish_event("robot_added", {"name": name, "joints": len(robot.joint_names), "source": source})
return {
"status": "success",
"content": [{"text": (
@@ -385,6 +416,7 @@ def remove_robot(self, name: str) -> Dict[str, Any]:
pass
del self._policy_threads[name]
del self._world.robots[name]
+ self._publish_event("robot_removed", {"name": name})
return {"status": "success", "content": [{"text": f"🗑️ Robot '{name}' removed."}]}
def list_robots(self) -> Dict[str, Any]:
@@ -440,12 +472,13 @@ def add_object(self, name: str, shape: str = "box", position: List[float] = None
if name in self._world.objects:
return {"status": "error", "content": [{"text": f"❌ Object '{name}' exists."}]}
+ from strands_robots.simulation._mjcf_builder import MJCFBuilder
obj = SimObject(
name=name, shape=shape,
- position=position or [0.0, 0.0, 0.0],
- orientation=orientation or [1.0, 0.0, 0.0, 0.0],
- size=size or [0.05, 0.05, 0.05],
- color=color or [0.5, 0.5, 0.5, 1.0],
+ position=list(position or [0.0, 0.0, 0.0])[:3],
+ orientation=MJCFBuilder._pad4(orientation, [1.0, 0.0, 0.0, 0.0]),
+ size=list(size or [0.05, 0.05, 0.05]),
+ color=MJCFBuilder._pad4(color, [0.5, 0.5, 0.5, 1.0]),
mass=mass, mesh_path=mesh_path, is_static=is_static,
)
self._world.objects[name] = obj
@@ -453,14 +486,18 @@ def add_object(self, name: str, shape: str = "box", position: List[float] = None
if self._world.robots:
try:
result = inject_object_into_scene(self._world, obj)
+ self._invalidate_renderers()
if result:
+ self._publish_event("object_added", {"name": name, "shape": shape, "position": obj.position})
return {"status": "success", "content": [{"text": f"📦 '{name}' spawned: {shape} at {obj.position}"}]}
+ self._publish_event("object_added", {"name": name, "shape": shape, "position": obj.position})
return {"status": "success", "content": [{"text": (
f"📦 '{name}' registered: {shape} at {obj.position}\n"
"⚠️ Robot scene loaded — object is tracked but not physically spawned."
)}]}
except Exception as e:
logger.warning("Object injection failed, tracking metadata only: %s", e)
+ self._publish_event("object_added", {"name": name, "shape": shape, "position": obj.position, "warning": str(e)})
return {"status": "success", "content": [{"text": f"📦 '{name}' registered: {shape} at {obj.position}\n⚠️ Injection failed ({e})"}]}
result = self._recompile_world()
@@ -468,6 +505,7 @@ def add_object(self, name: str, shape: str = "box", position: List[float] = None
del self._world.objects[name]
return result
+ self._publish_event("object_added", {"name": name, "shape": shape, "position": obj.position})
return {"status": "success", "content": [{"text": f"📦 '{name}' added: {shape} at {obj.position}, size={obj.size}, {'static' if is_static else f'{mass}kg'}"}]}
def remove_object(self, name: str) -> Dict[str, Any]:
@@ -476,8 +514,10 @@ def remove_object(self, name: str) -> Dict[str, Any]:
del self._world.objects[name]
if self._world.robots:
eject_body_from_scene(self._world, name)
+ self._invalidate_renderers()
else:
self._recompile_world()
+ self._publish_event("object_removed", {"name": name})
return {"status": "success", "content": [{"text": f"🗑️ '{name}' removed."}]}
def move_object(self, name: str, position: List[float] = None, orientation: List[float] = None) -> Dict[str, Any]:
@@ -500,6 +540,7 @@ def move_object(self, name: str, position: List[float] = None, orientation: List
self._world.objects[name].orientation = orientation
mj.mj_forward(model, data)
+ self._publish_event("object_moved", {"name": name, "position": position, "orientation": orientation})
return {"status": "success", "content": [{"text": f"📍 '{name}' moved to {position or 'same'}"}]}
def list_objects(self) -> Dict[str, Any]:
@@ -564,6 +605,7 @@ def reset(self) -> Dict[str, Any]:
for r in self._world.robots.values():
r.policy_running = False
r.policy_steps = 0
+ self._publish_event("reset", {})
return {"status": "success", "content": [{"text": "🔄 Reset to initial state."}]}
def get_state(self) -> Dict[str, Any]:
@@ -749,12 +791,17 @@ def _dispatch_action(self, action: str, d: Dict[str, Any]) -> Dict[str, Any]:
"list_urdfs": "list_urdfs_action",
"register_urdf": "register_urdf_action",
"stop_policy": "_stop_policy",
+ "execute": "run_policy",
+ "stop": "_stop_policy",
+ "teleop_start": "run_policy",
+ "teleop_stop": "_stop_policy",
+ "teleop_delta": "teleop_delta",
}
method_name = _ALIASES.get(action, action)
method = getattr(self, method_name, None)
- if method is None or action.startswith("_"):
+ if method is None or (action.startswith("_") and action not in _ALIASES):
return {"status": "error", "content": [{"text": f"❌ Unknown action: {action}"}]}
# Build kwargs from input dict, excluding 'action' itself
@@ -771,6 +818,10 @@ def _dispatch_action(self, action: str, d: Dict[str, Any]) -> Dict[str, Any]:
kwargs["name"] = d["robot_name"]
elif param_name == "robot_name" and "robot_name" not in d and "name" in d:
kwargs["robot_name"] = d["name"]
+ elif param_name == "robot_name" and "robot_name" not in d and "name" not in d:
+ # Auto-detect first robot in world
+ if self._world and self._world.robots:
+ kwargs["robot_name"] = next(iter(self._world.robots))
elif param_name in d:
kwargs[param_name] = d[param_name]
# Forward policy kwargs
@@ -782,15 +833,130 @@ def _dispatch_action(self, action: str, d: Dict[str, Any]) -> Dict[str, Any]:
return method(**kwargs)
+ def teleop_delta(self, joint: int = 0, delta: float = 0.0, robot_name: str = "", **kwargs) -> Dict[str, Any]:
+ """Apply a delta to a specific joint and step the simulation.
+
+ Used by the dashboard keyboard teleop (WASD keys) to nudge joints.
+ The joint is identified by index into the robot's joint_names list.
+
+ Args:
+ joint: Joint index (0-based into robot's joint list)
+ delta: Position delta to apply (radians)
+ robot_name: Robot name (auto-detected if empty)
+ """
+ if self._world is None or self._world._model is None:
+ return {"status": "error", "content": [{"text": "❌ No simulation."}]}
+
+ mj = _ensure_mujoco()
+ model, data = self._world._model, self._world._data
+
+ if not robot_name and self._world.robots:
+ robot_name = next(iter(self._world.robots))
+ robot = self._world.robots.get(robot_name)
+ if not robot:
+ return {"status": "error", "content": [{"text": f"❌ Robot '{robot_name}' not found."}]}
+
+ if joint < 0 or joint >= len(robot.joint_names):
+ return {"status": "error", "content": [{"text": f"❌ Joint index {joint} out of range (0-{len(robot.joint_names)-1})"}]}
+
+ jnt_name = robot.joint_names[joint]
+ jnt_id = mj.mj_name2id(model, mj.mjtObj.mjOBJ_JOINT, jnt_name)
+ if jnt_id < 0:
+ return {"status": "error", "content": [{"text": f"❌ Joint '{jnt_name}' not found in model"}]}
+
+ # Apply delta to joint position
+ qpos_adr = model.jnt_qposadr[jnt_id]
+ data.qpos[qpos_adr] += delta
+
+ # Also set ctrl if there's a matching actuator
+ act_id = mj.mj_name2id(model, mj.mjtObj.mjOBJ_ACTUATOR, jnt_name)
+ if act_id >= 0:
+ data.ctrl[act_id] = float(data.qpos[qpos_adr])
+
+ # Step the simulation to apply
+ for _ in range(5): # small burst of steps for smooth motion
+ mj.mj_step(model, data)
+ self._world.sim_time = data.time
+ self._world.step_count += 5
+
+ if hasattr(self, '_viewer_handle') and self._viewer_handle is not None:
+ self._viewer_handle.sync()
+
+ return {"status": "success"}
+
def _stop_policy(self, robot_name: str = "", **kwargs) -> Dict[str, Any]:
+ # Auto-detect robot if not specified
+ if not robot_name and self._world and self._world.robots:
+ robot_name = next(iter(self._world.robots))
if self._world and robot_name in self._world.robots:
self._world.robots[robot_name].policy_running = False
return {"status": "success", "content": [{"text": f"🛑 Stopped on '{robot_name}'"}]}
+ # Stop ALL robots if name not found
+ if self._world:
+ for rn, robj in self._world.robots.items():
+ robj.policy_running = False
+ return {"status": "success", "content": [{"text": "🛑 All policies stopped"}]}
return {"status": "error", "content": [{"text": f"❌ '{robot_name}' not found."}]}
+ # --- Camera Streaming ---
+
+ def _render_camera_jpeg_b64(self, camera_name: str = "default") -> Optional[str]:
+ """Render a camera view and return as base64 JPEG string for mesh streaming.
+
+ Thread-safe: if model changes between calls, the old renderer ref is
+ dropped by _invalidate_renderers() and a new one is created here.
+ """
+ if self._world is None or self._world._model is None:
+ return None
+ try:
+ mj = _ensure_mujoco()
+ w, h = 320, 240
+ model = self._world._model
+ data = self._world._data
+ renderer = getattr(self, "_cam_stream_renderer", None)
+ if renderer is None:
+ renderer = mj.Renderer(model, height=h, width=w)
+ self._cam_stream_renderer = renderer
+ cam_id = mj.mj_name2id(model, mj.mjtObj.mjOBJ_CAMERA, camera_name)
+ if cam_id >= 0:
+ renderer.update_scene(data, camera=cam_id)
+ else:
+ renderer.update_scene(data)
+ img = renderer.render().copy()
+ from PIL import Image
+ import io
+ import base64
+ pil_img = Image.fromarray(img)
+ buffer = io.BytesIO()
+ pil_img.save(buffer, format="JPEG", quality=60)
+ return base64.b64encode(buffer.getvalue()).decode("ascii")
+ except Exception as e:
+ # If renderer is stale after model change, discard and retry next frame
+ self._cam_stream_renderer = None
+ logger.debug("Camera JPEG render failed: %s", e)
+ return None
+
+ def start_camera_stream(self, camera: str = "default", fps: int = 10, **kwargs) -> Dict[str, Any]:
+ """Start streaming camera frames over Zenoh mesh."""
+ if not hasattr(self, "mesh") or not self.mesh:
+ return {"status": "error", "content": [{"text": "❌ Mesh not enabled"}]}
+ self.mesh.start_camera_stream(self._render_camera_jpeg_b64, camera=camera, fps=fps)
+ return {"status": "success", "content": [{"text": f"📹 Camera stream started: {camera} @ {fps}fps"}]}
+
+ def stop_camera_stream(self, **kwargs) -> Dict[str, Any]:
+ """Stop camera stream."""
+ return {"status": "success", "content": [{"text": "📹 Camera stream will stop on cleanup"}]}
+
# --- Cleanup ---
def cleanup(self):
+ # Close dedicated camera stream renderer
+ if hasattr(self, "_cam_stream_renderer") and self._cam_stream_renderer:
+ try:
+ self._cam_stream_renderer.close()
+ except Exception:
+ pass
+ self._cam_stream_renderer = None
if hasattr(self, "mesh") and self.mesh:
self.mesh.stop()
if self._world:
diff --git a/strands_robots/zenoh_mesh.py b/strands_robots/zenoh_mesh.py
index c60027f..8c34a2d 100644
--- a/strands_robots/zenoh_mesh.py
+++ b/strands_robots/zenoh_mesh.py
@@ -348,7 +348,11 @@ def _state_loop(self):
time.sleep(1.0 / STATE_HZ)
def _read_state(self) -> Optional[dict]:
- """Read current robot/sim state for publishing."""
+ """Read current robot/sim state for publishing.
+
+ Includes joints, task status, simulation world metadata (objects,
+ cameras, robots) so the dashboard can render the full scene.
+ """
r = self.robot
s: dict = {"peer_id": self.peer_id, "t": time.time()}
try:
@@ -371,16 +375,67 @@ def _read_state(self) -> Optional[dict]:
"duration": ts.duration,
}
- # Simulation
+ # Simulation world state
if hasattr(r, "_world") and r._world is not None:
w = r._world
if hasattr(w, "_data") and w._data is not None:
s["sim_time"] = float(w._data.time)
elif hasattr(r, "_data") and r._data is not None:
s["sim_time"] = float(r._data.time)
- if hasattr(r, "_world") and r._world is not None and hasattr(r._world, "robots"):
- for name in r._world.robots:
- s.setdefault("robots", {})[name] = {"active": True}
+
+ # Sim joint positions (for dashboard joint gauges)
+ if "joints" not in s and hasattr(w, "robots") and w.robots and hasattr(w, "_model") and w._model is not None:
+ try:
+ import importlib
+ mj = importlib.import_module("mujoco")
+ model, data = w._model, w._data
+ if data is not None:
+ joints = {}
+ for rname, rob in w.robots.items():
+ for jnt_name in getattr(rob, "joint_names", []):
+ jnt_id = mj.mj_name2id(model, mj.mjtObj.mjOBJ_JOINT, jnt_name)
+ if jnt_id >= 0:
+ joints[jnt_name] = float(data.qpos[model.jnt_qposadr[jnt_id]])
+ if joints:
+ s["joints"] = joints
+ except Exception:
+ pass
+
+ # Robots in world
+ if hasattr(w, "robots"):
+ robots_info = {}
+ for name, rob in w.robots.items():
+ robots_info[name] = {
+ "active": True,
+ "policy_running": getattr(rob, "policy_running", False),
+ "joints": len(getattr(rob, "joint_names", [])),
+ }
+ s["robots"] = robots_info
+
+ # Objects in world (scene composition)
+ if hasattr(w, "objects") and w.objects:
+ s["objects"] = {
+ name: {
+ "shape": getattr(obj, "shape", "?"),
+ "position": getattr(obj, "position", []),
+ "color": getattr(obj, "color", [])[:3] if getattr(obj, "color", None) else [],
+ "is_static": getattr(obj, "is_static", False),
+ }
+ for name, obj in w.objects.items()
+ }
+
+ # Cameras in world
+ if hasattr(w, "cameras") and w.cameras:
+ s["cameras"] = list(w.cameras.keys())
+
+ # World metadata
+ if hasattr(w, "_model") and w._model is not None:
+ s["world"] = {
+ "bodies": w._model.nbody,
+ "joints": w._model.njnt,
+ "actuators": w._model.nu,
+ "step_count": getattr(w, "step_count", 0),
+ }
except Exception:
pass
@@ -444,6 +499,11 @@ def _dispatch(self, cmd: dict) -> dict:
return {"status": getattr(getattr(r, "_task_state", None), "status", "unknown")}
if action == "stop":
+ # Simulation: stop policy on all robots
+ if hasattr(r, "_world") and r._world:
+ for rname, robj in r._world.robots.items():
+ robj.policy_running = False
+ return {"status": "stopped", "content": [{"text": "🛑 Policy stopped"}]}
return r.stop_task() if hasattr(r, "stop_task") else {"ok": True}
if action == "features":
@@ -470,6 +530,32 @@ def _dispatch(self, cmd: dict) -> dict:
)
if k in cmd
}
+
+ # Simulation backend — route to run_policy / start_policy
+ if hasattr(r, "run_policy") and hasattr(r, "_world"):
+ robot_name = None
+ if r._world and r._world.robots:
+ robot_name = cmd.get("robot_name") or next(iter(r._world.robots))
+ if not robot_name:
+ return {"error": "no robots in simulation"}
+ if action == "execute":
+ return r.run_policy(
+ robot_name=robot_name,
+ policy_provider=pp,
+ instruction=instr,
+ duration=dur,
+ **kw,
+ )
+ else: # start
+ return r.start_policy(
+ robot_name=robot_name,
+ policy_provider=pp,
+ instruction=instr,
+ duration=dur,
+ **kw,
+ )
+
+ # Hardware robot backend
if action == "execute" and hasattr(r, "_execute_task_sync"):
return r._execute_task_sync(instr, pp, port, host, dur, **kw)
if action == "start" and hasattr(r, "start_task"):
@@ -481,6 +567,23 @@ def _dispatch(self, cmd: dict) -> dict:
if action == "reset" and hasattr(r, "reset"):
return r.reset()
+ # Camera stream — dispatch to simulation's handler or robot's
+ if action == "start_camera_stream":
+ if hasattr(r, "_dispatch_action"):
+ return r._dispatch_action("start_camera_stream", cmd)
+ elif hasattr(r, "start_camera_stream"):
+ return r.start_camera_stream(
+ camera=cmd.get("camera", "default"),
+ fps=cmd.get("fps", 10),
+ )
+ return {"error": "start_camera_stream not supported by this peer"}
+
+ # Teleop delta — nudge a joint in simulation
+ if action == "teleop_delta":
+ if hasattr(r, "_dispatch_action"):
+ return r._dispatch_action("teleop_delta", cmd)
+ return {"error": "teleop_delta requires Simulation backend"}
+
return {"error": f"unknown action: {action}"}
def _on_response(self, sample):
@@ -577,8 +680,94 @@ def unsubscribe(self, name: str):
pass
self.inbox.pop(name, None)
+ # ── Event publishing (discrete mutations) ─────────────────
+
+ def publish_event(self, event_type: str, data: dict = None):
+ """Publish a discrete event to the mesh.
+
+ Used for mutations like object_added, object_removed, robot_added,
+ policy_started, etc. Dashboard subscribes to strands/*/event.
+
+ Args:
+ event_type: Event type string (e.g. "object_added", "policy_started")
+ data: Event payload dict
+ """
+ _put(
+ f"strands/{self.peer_id}/event",
+ {
+ "peer_id": self.peer_id,
+ "event_type": event_type,
+ "data": data or {},
+ "ts": time.time(),
+ },
+ )
+
# ── VLA execution stream ───────────────────────────────────
+ # ── Camera frame publishing ──────────────────────────────
+
+ def publish_camera_frame(
+ self,
+ frame_b64: str,
+ camera: str = "default",
+ ):
+ """Publish a camera frame (base64 JPEG) to the mesh.
+
+ Dashboard and other peers subscribe to strands/{peer_id}/camera.
+ Frames are JPEG-encoded and base64'd for JSON transport.
+
+ Args:
+ frame_b64: Base64-encoded JPEG image data
+ camera: Camera name (default: "default")
+ """
+ _put(
+ f"strands/{self.peer_id}/camera",
+ {
+ "peer_id": self.peer_id,
+ "camera": camera,
+ "frame": frame_b64,
+ "ts": time.time(),
+ },
+ )
+
+ def start_camera_stream(
+ self,
+ render_fn,
+ camera: str = "default",
+ fps: float = 10.0,
+ ):
+ """Start a background thread that publishes camera frames.
+
+ Args:
+ render_fn: Callable that returns base64 JPEG string.
+ Signature: render_fn(camera_name: str) -> str
+ camera: Camera name
+ fps: Frames per second (default: 10)
+ """
+ if not hasattr(self, "_camera_threads"):
+ self._camera_threads = {}
+
+ key = camera
+ if key in self._camera_threads and self._camera_threads[key].is_alive():
+ return # Already streaming
+
+ def _stream_loop():
+ interval = 1.0 / fps
+ while self._running:
+ try:
+ frame_b64 = render_fn(camera)
+ if frame_b64:
+ self.publish_camera_frame(frame_b64, camera)
+ except Exception as e:
+ logger.debug("Camera stream error: %s", e)
+ time.sleep(interval)
+
+ t = threading.Thread(target=_stream_loop, daemon=True, name=f"cam-{self.peer_id}-{camera}")
+ t.start()
+ self._camera_threads[key] = t
+ logger.info("📹 %s camera stream started: %s @ %.0f fps", self.peer_id, camera, fps)
+
+
def publish_step(
self,
step: int,