diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 2300122c52..2eef686b2e 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -160,12 +160,17 @@ def stop(self) -> None: class pSHMTransport(PubSubTransport[T]): _started: bool = False - def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def] + def __init__(self, topic: str, *, default_capacity: int | None = None, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(topic) - self.shm = PickleSharedMemory(**kwargs) + self._default_capacity = default_capacity + self.shm = PickleSharedMemory(default_capacity=default_capacity, **kwargs) def __reduce__(self): # type: ignore[no-untyped-def] - return (pSHMTransport, (self.topic,)) + return (pSHMTransport, (self.topic,), {"default_capacity": self._default_capacity}) + + def __setstate__(self, state: dict[str, Any]) -> None: # type: ignore[no-untyped-def] + self._default_capacity = state.get("default_capacity") + self.shm = PickleSharedMemory(default_capacity=self._default_capacity) def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] if not self._started: @@ -190,12 +195,17 @@ def stop(self) -> None: class SHMTransport(PubSubTransport[T]): _started: bool = False - def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def] + def __init__(self, topic: str, *, default_capacity: int | None = None, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(topic) - self.shm = BytesSharedMemory(**kwargs) + self._default_capacity = default_capacity + self.shm = BytesSharedMemory(default_capacity=default_capacity, **kwargs) def __reduce__(self): # type: ignore[no-untyped-def] - return (SHMTransport, (self.topic,)) + return (SHMTransport, (self.topic,), {"default_capacity": self._default_capacity}) + + def __setstate__(self, state: dict[str, Any]) -> None: # type: ignore[no-untyped-def] + self._default_capacity = state.get("default_capacity") + self.shm = BytesSharedMemory(default_capacity=self._default_capacity) def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] if not self._started: diff --git a/dimos/protocol/pubsub/impl/shmpubsub.py b/dimos/protocol/pubsub/impl/shmpubsub.py index 883afcdcc0..5389dedc4d 100644 --- a/dimos/protocol/pubsub/impl/shmpubsub.py +++ b/dimos/protocol/pubsub/impl/shmpubsub.py @@ -23,7 +23,7 @@ import struct import threading import time -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Literal import uuid import numpy as np @@ -104,14 +104,14 @@ def __init__( self, *, prefer: str = "auto", - default_capacity: int = 3686400, + default_capacity: int | None = None, close_channels_on_stop: bool = True, **_: Any, ) -> None: super().__init__() self.config = SharedMemoryConfig( prefer=prefer, - default_capacity=default_capacity, + default_capacity=default_capacity or SharedMemoryConfig.default_capacity, close_channels_on_stop=close_channels_on_stop, ) self._topics: dict[str, SharedMemoryPubSubBase._TopicState] = {} @@ -305,6 +305,80 @@ class PickleSharedMemory( ... +# QUALITY_LEVEL: temporary (out of [deprecated, temporary, experimental, sufficient, robust]) +class ShmSubset: + """Subscribe-all adapter for a fixed set of shared-memory topics. + + Stop-gap for the Rerun bridge: SHM pubsub has no topic discovery, so the + bridge can't ``subscribe_all`` like it does with LCM. This class wraps + known SHM topics so they can be passed in the bridge's ``pubsubs`` list. + Replace once the bridge auto-discovers active transports from blueprint wiring. + + Wraps PickleSharedMemory or BytesSharedMemory and exposes the + ``subscribe_all`` interface so it can be used in the bridge's ``pubsubs`` + list alongside LCM. + + Example:: + + from dimos.protocol.pubsub.impl.shmpubsub import ShmSubset + + bridge = RerunBridgeModule( + pubsubs=[LCM(), ShmSubset(topics=[("color_image", 6220800, "pickle")])], + ) + + Each topic entry is ``(name, capacity, encoding)`` where encoding is + ``"pickle"`` (for pSHMTransport / PickleSharedMemory) or + ``"bytes"`` (for SHMTransport / BytesSharedMemory). + """ + + def __init__(self, topics: list[tuple[str, int, Literal["pickle", "bytes"]]]) -> None: + self._topic_specs = topics + self._shm_instances: list[SharedMemoryPubSubBase] = [] + self._unsubs: list[Callable[[], None]] = [] + + def start(self) -> None: + pass # instances are started lazily in subscribe_all + + def stop(self) -> None: + for unsub in self._unsubs: + unsub() + self._unsubs.clear() + for shm in self._shm_instances: + shm.stop() + self._shm_instances.clear() + + def subscribe_all(self, callback: Callable[[Any, Any], Any]) -> Callable[[], None]: + try: + for topic_name, capacity, encoding in self._topic_specs: + if encoding == "pickle": + shm: SharedMemoryPubSubBase = PickleSharedMemory(default_capacity=capacity) + elif encoding == "bytes": + shm = BytesSharedMemory(default_capacity=capacity) + else: + logger.error( + f"ShmSubset: unknown encoding '{encoding}', skipping topic '{topic_name}'" + ) + continue + shm.start() + self._shm_instances.append(shm) + + def _cb(msg: Any, _topic: Any, _tn: str = topic_name) -> None: + callback(msg, _tn) + + unsub = shm.subscribe(topic_name, _cb) + self._unsubs.append(unsub) + except Exception: + # If a later topic fails (e.g. bad capacity, SHM allocation error), + # clean up SHM instances already started for earlier topics. + self.stop() + raise + + def unsubscribe_all() -> None: + self.stop() + + return unsubscribe_all + + class LCMSharedMemoryPubSubBase(PubSub[Topic, Any]): """SharedMemory pubsub that uses LCM Topic type, delegating to SharedMemoryPubSubBase.""" diff --git a/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py b/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py index f32561e11d..b886f28b1b 100644 --- a/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py +++ b/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py @@ -23,6 +23,7 @@ from dimos.core.transport import pSHMTransport from dimos.msgs.sensor_msgs.Image import Image from dimos.protocol.pubsub.impl.lcmpubsub import LCM +from dimos.protocol.pubsub.impl.shmpubsub import ShmSubset from dimos.protocol.service.system_configurator.clock_sync import ClockSyncConfigurator from dimos.robot.unitree.go2.connection import GO2Connection from dimos.web.websocket_vis.websocket_vis_module import WebsocketVisModule @@ -32,7 +33,7 @@ # TODO need a global transport toggle on blueprints/global config _mac_transports: dict[tuple[str, type], pSHMTransport[Image]] = { ("color_image", Image): pSHMTransport( - "color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + "/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE ), } @@ -93,11 +94,14 @@ def _go2_rerun_blueprint() -> Any: ) -rerun_config = { +rerun_config: dict[str, Any] = { "blueprint": _go2_rerun_blueprint, # any pubsub that supports subscribe_all and topic that supports str(topic) # is acceptable here - "pubsubs": [LCM()], + "pubsubs": [ + LCM(), + ShmSubset(topics=[("/color_image", DEFAULT_CAPACITY_COLOR_IMAGE, "pickle")]), + ], # Custom converters for specific rerun entity paths # Normally all these would be specified in their respectative modules # Until this is implemented we have central overrides here diff --git a/dimos/visualization/rerun/bridge.py b/dimos/visualization/rerun/bridge.py index de89c5d347..afb45142da 100644 --- a/dimos/visualization/rerun/bridge.py +++ b/dimos/visualization/rerun/bridge.py @@ -270,6 +270,8 @@ def _get_entity_path(self, topic: Any) -> str: topic_str = getattr(topic, "name", None) or str(topic) # Strip everything after # (LCM topic suffix) topic_str = topic_str.split("#")[0] + # Ensure / separator between prefix and topic + assert topic_str.startswith("/"), f"{topic_str} doesn't start with slash" return f"{self.config.entity_prefix}{topic_str}" def _on_message(self, msg: Any, topic: Any) -> None: