Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,17 @@ def stop(self) -> None:
class pSHMTransport(PubSubTransport[T]):
_started: bool = False

def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def]
def __init__(self, topic: str, *, default_capacity: int | None = None, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(topic)
self.shm = PickleSharedMemory(**kwargs)
self._default_capacity = default_capacity
self.shm = PickleSharedMemory(default_capacity=default_capacity, **kwargs)

def __reduce__(self): # type: ignore[no-untyped-def]
return (pSHMTransport, (self.topic,))
return (pSHMTransport, (self.topic,), {"default_capacity": self._default_capacity})

def __setstate__(self, state: dict[str, Any]) -> None: # type: ignore[no-untyped-def]
self._default_capacity = state.get("default_capacity")
self.shm = PickleSharedMemory(default_capacity=self._default_capacity)
Comment on lines 168 to +173
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Pickle round-trip allocates a wasted SHM object

With the current __reduce__ / __setstate__ split, unpickling pSHMTransport (and SHMTransport) calls __init__ first (because __reduce__ returns (pSHMTransport, (self.topic,), state)). That __init__ call constructs a PickleSharedMemory(default_capacity=None), which is immediately discarded when __setstate__ constructs the correct one. This is a minor but unnecessary allocation.

One common pattern to avoid this is to delegate to __new__ in __reduce__ and perform all initialisation in __setstate__:

def __reduce__(self):
    return (
        _reconstruct_pshm,  # module-level helper that calls __new__
        (type(self), self.topic, self._default_capacity),
    )

Or simply accept the redundant allocation as a known trade-off and add a comment. The same applies to SHMTransport (lines 203-208).


def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
Expand All @@ -190,12 +195,17 @@ def stop(self) -> None:
class SHMTransport(PubSubTransport[T]):
_started: bool = False

def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def]
def __init__(self, topic: str, *, default_capacity: int | None = None, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(topic)
self.shm = BytesSharedMemory(**kwargs)
self._default_capacity = default_capacity
self.shm = BytesSharedMemory(default_capacity=default_capacity, **kwargs)

def __reduce__(self): # type: ignore[no-untyped-def]
return (SHMTransport, (self.topic,))
return (SHMTransport, (self.topic,), {"default_capacity": self._default_capacity})

def __setstate__(self, state: dict[str, Any]) -> None: # type: ignore[no-untyped-def]
self._default_capacity = state.get("default_capacity")
self.shm = BytesSharedMemory(default_capacity=self._default_capacity)

def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
Expand Down
80 changes: 77 additions & 3 deletions dimos/protocol/pubsub/impl/shmpubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import struct
import threading
import time
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal
import uuid

import numpy as np
Expand Down Expand Up @@ -104,14 +104,14 @@ def __init__(
self,
*,
prefer: str = "auto",
default_capacity: int = 3686400,
default_capacity: int | None = None,
close_channels_on_stop: bool = True,
**_: Any,
) -> None:
super().__init__()
self.config = SharedMemoryConfig(
prefer=prefer,
default_capacity=default_capacity,
default_capacity=default_capacity or SharedMemoryConfig.default_capacity,
close_channels_on_stop=close_channels_on_stop,
)
self._topics: dict[str, SharedMemoryPubSubBase._TopicState] = {}
Expand Down Expand Up @@ -305,6 +305,80 @@ class PickleSharedMemory(
...


# QUALITY_LEVEL: temporary (out of [deprecated, temporary, experimental, sufficient, robust])
class ShmSubset:
"""Subscribe-all adapter for a fixed set of shared-memory topics.

Stop-gap for the Rerun bridge: SHM pubsub has no topic discovery, so the
bridge can't ``subscribe_all`` like it does with LCM. This class wraps
known SHM topics so they can be passed in the bridge's ``pubsubs`` list.
Replace once the bridge auto-discovers active transports from blueprint wiring.

Wraps PickleSharedMemory or BytesSharedMemory and exposes the
``subscribe_all`` interface so it can be used in the bridge's ``pubsubs``
list alongside LCM.

Example::

from dimos.protocol.pubsub.impl.shmpubsub import ShmSubset

bridge = RerunBridgeModule(
pubsubs=[LCM(), ShmSubset(topics=[("color_image", 6220800, "pickle")])],
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Docstring example omits required leading slash

The example topic name is "color_image" (no leading /), but the bridge now asserts topic_str.startswith("/") at bridge.py:274. A developer who copies this example verbatim will trigger an AssertionError at runtime. The example should mirror the actual usage in unitree_go2_basic.py ("/color_image").

Suggested change
pubsubs=[LCM(), ShmSubset(topics=[("color_image", 6220800, "pickle")])],
bridge = RerunBridgeModule(
pubsubs=[LCM(), ShmSubset(topics=[("/color_image", 6220800, "pickle")])],
)

)

Each topic entry is ``(name, capacity, encoding)`` where encoding is
``"pickle"`` (for pSHMTransport / PickleSharedMemory) or
``"bytes"`` (for SHMTransport / BytesSharedMemory).
"""

def __init__(self, topics: list[tuple[str, int, Literal["pickle", "bytes"]]]) -> None:
self._topic_specs = topics
self._shm_instances: list[SharedMemoryPubSubBase] = []
self._unsubs: list[Callable[[], None]] = []

def start(self) -> None:
pass # instances are started lazily in subscribe_all

def stop(self) -> None:
for unsub in self._unsubs:
unsub()
self._unsubs.clear()
for shm in self._shm_instances:
shm.stop()
self._shm_instances.clear()

def subscribe_all(self, callback: Callable[[Any, Any], Any]) -> Callable[[], None]:
try:
for topic_name, capacity, encoding in self._topic_specs:
if encoding == "pickle":
shm: SharedMemoryPubSubBase = PickleSharedMemory(default_capacity=capacity)
elif encoding == "bytes":
shm = BytesSharedMemory(default_capacity=capacity)
else:
logger.error(
f"ShmSubset: unknown encoding '{encoding}', skipping topic '{topic_name}'"
)
continue
shm.start()
self._shm_instances.append(shm)

def _cb(msg: Any, _topic: Any, _tn: str = topic_name) -> None:
callback(msg, _tn)

unsub = shm.subscribe(topic_name, _cb)
self._unsubs.append(unsub)
except Exception:
# If a later topic fails (e.g. bad capacity, SHM allocation error),
# clean up SHM instances already started for earlier topics.
self.stop()
raise

def unsubscribe_all() -> None:
self.stop()

return unsubscribe_all


class LCMSharedMemoryPubSubBase(PubSub[Topic, Any]):
"""SharedMemory pubsub that uses LCM Topic type, delegating to SharedMemoryPubSubBase."""

Expand Down
10 changes: 7 additions & 3 deletions dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dimos.core.transport import pSHMTransport
from dimos.msgs.sensor_msgs.Image import Image
from dimos.protocol.pubsub.impl.lcmpubsub import LCM
from dimos.protocol.pubsub.impl.shmpubsub import ShmSubset
from dimos.protocol.service.system_configurator.clock_sync import ClockSyncConfigurator
from dimos.robot.unitree.go2.connection import GO2Connection
from dimos.web.websocket_vis.websocket_vis_module import WebsocketVisModule
Expand All @@ -32,7 +33,7 @@
# TODO need a global transport toggle on blueprints/global config
_mac_transports: dict[tuple[str, type], pSHMTransport[Image]] = {
("color_image", Image): pSHMTransport(
"color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
"/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
),
}

Expand Down Expand Up @@ -93,11 +94,14 @@ def _go2_rerun_blueprint() -> Any:
)


rerun_config = {
rerun_config: dict[str, Any] = {
"blueprint": _go2_rerun_blueprint,
# any pubsub that supports subscribe_all and topic that supports str(topic)
# is acceptable here
"pubsubs": [LCM()],
"pubsubs": [
LCM(),
ShmSubset(topics=[("/color_image", DEFAULT_CAPACITY_COLOR_IMAGE, "pickle")]),
],
# Custom converters for specific rerun entity paths
# Normally all these would be specified in their respectative modules
# Until this is implemented we have central overrides here
Expand Down
2 changes: 2 additions & 0 deletions dimos/visualization/rerun/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ def _get_entity_path(self, topic: Any) -> str:
topic_str = getattr(topic, "name", None) or str(topic)
# Strip everything after # (LCM topic suffix)
topic_str = topic_str.split("#")[0]
# Ensure / separator between prefix and topic
assert topic_str.startswith("/"), f"{topic_str} doesn't start with slash"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Assert used for runtime input validation

assert is silently stripped when Python runs with the -O (optimise) flag, so this guard will disappear in any optimised deployment. If a topic arrives without a leading slash, the bridge will silently produce a malformed entity path instead of surfacing an error. A proper ValueError (or at minimum a logged warning) is the right tool here.

Suggested change
assert topic_str.startswith("/"), f"{topic_str} doesn't start with slash"
if not topic_str.startswith("/"):
raise ValueError(f"{topic_str!r} doesn't start with slash; SHM topic names must begin with '/'")

return f"{self.config.entity_prefix}{topic_str}"

def _on_message(self, msg: Any, topic: Any) -> None:
Expand Down