Skip to content

feat(mesh): Zenoh-based Robot Mesh — fleet coordination primitive #98

@cagataycali

Description

@cagataycali

Summary

Add a Zenoh-backed Mesh primitive to strands-robots so every Robot() instance automatically joins a peer-to-peer network, discovers other robots via multicast, and exposes high-level methods for agent-to-agent communication across mixed sim+real fleets.

This is the v0.4.0 headliner. The Zenoh dashboard (#48) and Device Connect integration (#52) both depend on this primitive landing first.


Motivation

Before v0.4.0: Robot("so100") is a solo actor — one process, one robot.

After v0.4.0:

from strands_robots import Robot

striker = Robot("unitree_g1", peer_id="striker")
mid_1   = Robot("unitree_g1", peer_id="mid_1")

# Peers auto-discover on multicast — no config, no broker
print(striker.mesh.peers)
# → [{'peer_id': 'mid_1', 'hostname': '...', 'last_seen': ...}]

# Agent-to-agent tool call
striker.mesh.tell("mid_1", "pass the ball to me")
# → {'status': 'done', 'responder_id': 'mid_1', 'result': ...}

# Safety — always broadcasts, always wins
striker.mesh.emergency_stop()

This unlocks:

  • Fleet orchestration — planner robot tells executor robot what to do
  • Mixed sim+real — sim robot plans, real robot executes
  • Live observability — dashboard subscribes to any peer with zero config
  • Emergency stop — every robot on the network halts in ≤100 ms
  • Multi-humanoid demos — coordinated-play scenarios

Design

Topic layout (contract)

strands/{peer_id}/presence        # 2 Hz heartbeat + capabilities
strands/{peer_id}/state           # 10–50 Hz joint/obs publishing
strands/{peer_id}/cmd             # Receive addressable commands
strands/{peer_id}/response/{turn} # Response correlation
strands/{peer_id}/stream          # Media stream (camera/audio)
strands/broadcast                 # Fan-out to all peers

PRs #48 (dashboard) and #52 (Device Connect) already subscribe to these topic names — keeping the contract stable so they rebase cleanly.

Directory layout

strands_robots/
├── robot.py               # hook mesh into __init__
├── zenoh_mesh.py          # Mesh class + PeerInfo
├── mesh_session.py        # ref-counted singleton over zenoh.Session
└── mesh/
    ├── __init__.py
    ├── dispatcher.py      # maps {action: execute|status|stop} → robot
    ├── presence.py        # heartbeat + capability discovery
    ├── streaming.py       # media streaming helpers
    └── tests/
        ├── test_unit.py
        ├── test_two_peer_local.py
        └── test_soak.py   # 10 robots × 10 min (nightly)

Integration with Robot()

class Robot:
    def __init__(self, tool_name, robot, peer_id=None, mesh_enabled=True, ...):
        # ... existing init ...
        self.mesh = None
        if mesh_enabled:
            from strands_robots.zenoh_mesh import Mesh
            self.mesh = Mesh(
                robot=self,
                peer_id=peer_id or _generate_peer_id(tool_name),
                peer_type="robot",
            )
            self.mesh.start()

Mesh class surface

class Mesh:
    def __init__(self, robot, peer_id: str, peer_type: str = "robot"): ...
    
    # Lifecycle
    def start(self) -> None: ...
    def stop(self) -> None: ...
    def alive(self) -> bool: ...
    
    # Discovery
    def peers(self) -> list[dict]: ...
    
    # Core comms — agent-to-agent
    def tell(self, target: str, instruction: str, **kw) -> dict: ...
    def send(self, target: str, cmd: dict, timeout: float = 30.0) -> dict: ...
    def broadcast(self, cmd: dict, timeout: float = 5.0) -> list[dict]: ...
    def emergency_stop(self) -> list[dict]: ...
    
    # Pub/Sub — dashboards, loggers, coordination
    def subscribe(self, topic: str, callback=None, name: str = None): ...
    def unsubscribe(self, name: str) -> None: ...
    def publish_step(self, step: int, observation: dict, action: dict, **kw) -> None: ...
    def on_stream(self, peer_id: str, callback=None): ...

Dispatcher contract

Incoming command shape:

{
    "action": "execute" | "status" | "stop" | "emergency_stop" | "peers",
    "instruction": "pick up the red cube",      # for 'execute'
    "payload": {...},                           # for structured 'send'
    "sender": "striker",
    "turn": 42,
    "reply_to": "strands/striker/response/42",
    "timeout": 30.0,
}

Response shape:

{
    "status": "done" | "error" | "timeout" | "refused",
    "responder_id": "mid_1",
    "turn": 42,
    "result": ...,
    "error": "...",    # only when status != done
}

Definition of Done

Functional

  • Robot("so100") auto-starts mesh in __init__ (opt-out via mesh_enabled=False)
  • robot.mesh.alive is True after start
  • robot.mesh.peer_id is stable and unique on the network
  • robot.mesh.peers returns a fresh list with ~1s TTL
  • robot.mesh.tell(target, instruction) works for NL instructions
  • robot.mesh.send(target, {action, payload}) works for structured commands
  • robot.mesh.broadcast({...}) returns responses from all peers within timeout
  • robot.mesh.emergency_stop() halts every peer on the network in ≤100 ms

Auto-discovery

  • Multicast peer discovery works on local LAN without config
  • Zenoh connect="tcp/host:port" override works for WAN
  • Two separate processes on the same host auto-discover
  • Mac ↔ Linux (Jetson) discovery works on the same Wi-Fi
  • Docker --net=host works; documented for --net=bridge

Pub/Sub

  • mesh.subscribe("strands/*/state", cb) fires on any peer's state publish
  • mesh.publish_step(...) publishes at the robot's control frequency
  • mesh.on_stream(peer_id, cb) fires on media stream
  • Response correlation works correctly under concurrent send calls

Reliability

  • Soak test: 10 robots × 10 minutes × 0 dropped messages (nightly CI)
  • Peer-dead detection in ≤5 s (no heartbeat → alive=False)
  • Graceful reconnect when network flaps (no process crash)
  • Single zenoh.Session per process, reference-counted

Safety

  • emergency_stop() always broadcasts even if queue is full
  • Receiving emergency_stop overrides any in-flight action
  • Receipt logged with millisecond timestamp for audit
  • Works even when sender's robot is dead (session isolation)

Quality

  • pip install strands-robots[mesh] extras resolves (eclipse-zenoh>=0.11)
  • Lazy imports — import strands_robots does NOT trigger zenoh load
  • Unit tests mock zenoh (no network required)
  • Local-two-peer integration test in CI (subprocess-based)
  • Nightly soak test workflow
  • NumPy-style docstrings + type hints on every public method

Documentation

  • docs/mesh/quickstart.md — your first fleet in 5 lines
  • docs/mesh/architecture.md — topic design, session lifecycle
  • docs/mesh/safety.md — emergency_stop semantics + audit logging
  • examples/fleet_orchestration.py — planner + executor demo
  • examples/live_stream.py — dashboard subscribing to all peers
  • examples/emergency_stop.py — safety demo

Compatibility with in-flight PRs


Non-goals

  • End-to-end encryption (use Zenoh's built-in TLS/DTLS when needed)
  • Authentication / authorization (v0.5; trust-the-LAN for now)
  • Bandwidth throttling per peer (post-v0.4)
  • Cloud relay / zenohd behind ingress (post-v0.4; currently rely on LAN multicast)
  • Custom serialization formats (JSON for commands, CBOR only for state streams)

Implementation plan (6 PRs)

  1. feat(mesh): Session singleton + connection configmesh_session.py with reference-counted session management. ~150 LOC.
  2. feat(mesh): Mesh class + presence + peer discovery — join mesh, heartbeat, peers(). ~250 LOC.
  3. feat(mesh): tell / send / broadcast + response correlation — core RPC path. ~250 LOC.
  4. feat(mesh): publish_step + subscribe + on_stream — pub/sub, state streaming. ~200 LOC.
  5. feat(mesh): emergency_stop + safety audit log — E-STOP path with ≤100 ms SLA. ~150 LOC.
  6. feat(robot): wire mesh into Robot.__init__mesh_enabled=True default, configurable. ~50 LOC + examples.

Total estimated: ~1.0K LOC core + ~1.5K LOC tests + ~1K docs.


Risks & mitigations

Risk Severity Mitigation
Zenoh version churn (0.11 → 0.12) MEDIUM Pin in [mesh] extras; multi-version test matrix
Multicast blocked on some networks MEDIUM Document manual connect="tcp/..." fallback
Docker --net=bridge blocks discovery MEDIUM Document --net=host; add connect env var
Session leak on process kill LOW atexit cleanup + __del__ best-effort
JSON payloads too large for big state LOW CBOR for state stream; cap size per message

Acceptance test (two terminals on same LAN)

# Terminal 1
python -c "
from strands_robots import Robot
import time
picker = Robot('so100', peer_id='picker')
while len(picker.mesh.peers) < 1:
    time.sleep(0.1)
print('found:', picker.mesh.peers)
resp = picker.mesh.tell('placer', 'open gripper')
print('response:', resp)
"

# Terminal 2
python -c "
from strands_robots import Robot
import time
placer = Robot('so100', peer_id='placer')
time.sleep(60)   # stay alive to respond
"

Pass criteria:

  • Terminal 1 finds placer in peers within 3 s
  • tell() returns {status: done, ...} within 30 s
  • Terminal 2's robot executes the instruction (or logs why it refused)

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestmeshZenoh mesh networking / fleet coordination

    Type

    No type

    Projects

    Status

    In progress

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions