Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dimos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
DEFAULT_CAPACITY_COLOR_IMAGE = 1920 * 1080 * 3
# Default depth image size: 1280x720 frame * 4 (float32 size)
DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4
# Fixed-capacity SHM channels must be sized before the first message arrives.
# These defaults cover current Go2 replay and navigation payloads while keeping
# large local streams off UDP multicast.
DEFAULT_CAPACITY_POINTCLOUD = 64 * 1024 * 1024
DEFAULT_CAPACITY_OCCUPANCY_GRID = 16 * 1024 * 1024

# From https://github.com/lcm-proj/lcm.git
LCM_MAX_CHANNEL_NAME_LENGTH = 63
Expand Down
37 changes: 36 additions & 1 deletion dimos/core/coordination/module_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@
from dimos.core.global_config import GlobalConfig, global_config
from dimos.core.module import ModuleBase, ModuleSpec
from dimos.core.resource import Resource
from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport
from dimos.core.transport import (
JpegShmTransport,
LCMTransport,
PubSubTransport,
SHMTransport,
pLCMTransport,
pSHMTransport,
)
from dimos.spec.utils import is_spec, spec_annotation_compliance, spec_structural_compliance
from dimos.utils.generic import short_id
from dimos.utils.logging_config import setup_logger
Expand Down Expand Up @@ -279,6 +286,9 @@ def _connect_streams(self, blueprint: Blueprint) -> None:
module=module.__name__,
transport=transport.__class__.__name__,
)
# SHM streams are concrete transport objects, not LCM topics. Forward
# them to Rerun after stream wiring has resolved the transport registry.
_configure_rerun_bridge_visual_transports(self)

@classmethod
def build(
Expand Down Expand Up @@ -584,6 +594,31 @@ def _get_transport_for(blueprint: Blueprint, name: str, stream_type: type) -> Pu
return transport


def _configure_rerun_bridge_visual_transports(coordinator: ModuleCoordinator) -> None:
"""Send resolved SHM transports to an active Rerun bridge.

RerunBridgeModule subscribes to configured pubsubs directly. For SHM
streams, the coordinator forwards the concrete transport objects after
stream wiring has selected them.
"""
from dimos.visualization.rerun.bridge import RerunBridgeModule

if RerunBridgeModule not in coordinator._deployed_modules:
return

# LCM transports are already visible through RerunBridgeModule.config.pubsubs.
transports = [
transport
for transport in coordinator._transport_registry.values()
if isinstance(transport, SHMTransport | pSHMTransport | JpegShmTransport)
]
if not transports:
return

bridge = coordinator.get_instance(RerunBridgeModule)
bridge.set_visual_transports(transports)
Comment on lines +597 to +619
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Visualization-layer import inside core coordination. _configure_rerun_bridge_visual_transports does a deferred from dimos.visualization.rerun.bridge import RerunBridgeModule inside the core coordinator module. This creates a layering dependency: the coordination layer now has knowledge of—and a runtime dependency on—the visualization layer. The runtime guard (if RerunBridgeModule not in coordinator._deployed_modules) keeps headless deployments safe, but the right long-term home for this wiring is likely a post-wire hook registered by the bridge itself, keeping core coordination decoupled from visualization concerns.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!



def _verify_no_name_conflicts(blueprint: Blueprint) -> None:
name_to_types: dict[Any, set[type]] = defaultdict(set)
name_to_modules: dict[Any, list[tuple[type, type]]] = defaultdict(list)
Expand Down
36 changes: 33 additions & 3 deletions dimos/core/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,23 @@ def stop(self) -> None:


class pSHMTransport(PubSubTransport[T]):
"""Pickled shared-memory transport for local Python object streams."""

_started: bool = False

def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(topic)
self._kwargs = kwargs
self.shm = PickleSharedMemory(**kwargs)

def __reduce__(self): # type: ignore[no-untyped-def]
return (pSHMTransport, (self.topic,))
# Preserve sizing options such as default_capacity when the coordinator
# sends this transport to workers or to Rerun.
return (pSHMTransport, (self.topic,), self._kwargs)

def __setstate__(self, state: dict[str, Any]) -> None:
self._kwargs = state
self.shm = PickleSharedMemory(**state)
Comment on lines 175 to +182
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Redundant SHM object created during unpickling. When pickle reconstructs a pSHMTransport it first calls pSHMTransport(topic) (which runs __init__ and creates PickleSharedMemory() with the default 3.5 MiB capacity), then immediately calls __setstate__ which creates a second PickleSharedMemory(**state) with the correct capacity and replaces self.shm. The first object is abandoned and GC'd. While no OS-level shared memory is allocated until .start(), the unnecessary construction can be avoided by having __reduce__ return a factory callable that bypasses __init__.

Suggested change
def __reduce__(self): # type: ignore[no-untyped-def]
return (pSHMTransport, (self.topic,))
# Preserve sizing options such as default_capacity when the coordinator
# sends this transport to workers or to Rerun.
return (pSHMTransport, (self.topic,), self._kwargs)
def __setstate__(self, state: dict[str, Any]) -> None:
self._kwargs = state
self.shm = PickleSharedMemory(**state)
def __reduce__(self): # type: ignore[no-untyped-def]
# Preserve sizing options such as default_capacity when the coordinator
# sends this transport to workers or to Rerun.
return (_rebuild_pshm, (self.topic, self._kwargs))
def __setstate__(self, state: dict[str, Any]) -> None:
self._kwargs = state
self.shm = PickleSharedMemory(**state)
def _rebuild_pshm(topic: str, kwargs: dict) -> "pSHMTransport":
"""Pickle helper that constructs pSHMTransport without a double-init."""
obj = object.__new__(pSHMTransport)
pSHMTransport.__init__(obj, topic, **kwargs)
return obj


def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
Expand All @@ -193,14 +202,23 @@ def stop(self) -> None:


class SHMTransport(PubSubTransport[T]):
"""Raw bytes shared-memory transport for local fixed-size payloads."""

_started: bool = False

def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def]
super().__init__(topic)
self._kwargs = kwargs
self.shm = BytesSharedMemory(**kwargs)

def __reduce__(self): # type: ignore[no-untyped-def]
return (SHMTransport, (self.topic,))
# Preserve sizing options such as default_capacity when the coordinator
# sends this transport to workers or to Rerun.
return (SHMTransport, (self.topic,), self._kwargs)

def __setstate__(self, state: dict[str, Any]) -> None:
self._kwargs = state
self.shm = BytesSharedMemory(**state)

def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
Expand All @@ -223,6 +241,8 @@ def stop(self) -> None:


class JpegShmTransport(PubSubTransport[T]):
"""JPEG-compressed shared-memory transport for local image streams."""

_started: bool = False

def __init__(self, topic: str, quality: int = 75, **kwargs) -> None: # type: ignore[no-untyped-def]
Expand All @@ -233,9 +253,19 @@ def __init__(self, topic: str, quality: int = 75, **kwargs) -> None: # type: ig

self.shm = JpegSharedMemory(quality=quality, **kwargs)
self.quality = quality
self._kwargs = kwargs

def __reduce__(self): # type: ignore[no-untyped-def]
return (JpegShmTransport, (self.topic, self.quality))
# Preserve quality and sizing options when crossing worker boundaries.
return (JpegShmTransport, (self.topic, self.quality), self._kwargs)

def __setstate__(self, state: dict[str, Any]) -> None:
from dimos.protocol.pubsub.impl.jpeg_shm import (
JpegSharedMemory,
) # deferred to avoid pulling in Image/cv2/rerun

self._kwargs = state
self.shm = JpegSharedMemory(quality=self.quality, **state)
Comment on lines +262 to +268
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 self.quality is implicitly required before __setstate__ runs. __setstate__ references self.quality (line 268), which is only guaranteed to be set because pickle always calls __init__ (via the args tuple in __reduce__) before calling __setstate__. If the reconstruction path ever changes (e.g., __new__ + __setstate__ directly, as in some frameworks), this would raise AttributeError. Storing quality in the state dict alongside _kwargs would make the dependency explicit and the reconstruction self-contained.


def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
if not self._started:
Expand Down
38 changes: 28 additions & 10 deletions dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,47 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import platform
from typing import Any

from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE
from dimos.constants import (
DEFAULT_CAPACITY_COLOR_IMAGE,
DEFAULT_CAPACITY_OCCUPANCY_GRID,
DEFAULT_CAPACITY_POINTCLOUD,
)
from dimos.core.coordination.blueprints import autoconnect
from dimos.core.global_config import global_config
from dimos.core.transport import pSHMTransport
from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid
from dimos.msgs.sensor_msgs.Image import Image
from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2
from dimos.robot.unitree.go2.connection import GO2Connection
from dimos.visualization.vis_module import vis_module

# Mac has some issue with high bandwidth UDP, so we use pSHMTransport for color_image
# actually we can use pSHMTransport for all platforms, and for all streams
# TODO need a global transport toggle on blueprints/global config
_mac_transports: dict[tuple[str, type], pSHMTransport[Image]] = {
# Route large local replay and mapping streams through SHM on every platform.
# Small control/status streams continue to use the default LCM transport.
_local_high_bandwidth_transports: dict[tuple[str, type], pSHMTransport[Any]] = {
("color_image", Image): pSHMTransport(
"color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
"/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
),
("lidar", PointCloud2): pSHMTransport("/lidar", default_capacity=DEFAULT_CAPACITY_POINTCLOUD),
("pointcloud", PointCloud2): pSHMTransport(
"/pointcloud", default_capacity=DEFAULT_CAPACITY_POINTCLOUD
),
("global_map", PointCloud2): pSHMTransport(
"/global_map", default_capacity=DEFAULT_CAPACITY_POINTCLOUD
),
("merged_map", PointCloud2): pSHMTransport(
"/merged_map", default_capacity=DEFAULT_CAPACITY_POINTCLOUD
),
("global_costmap", OccupancyGrid): pSHMTransport(
"/global_costmap", default_capacity=DEFAULT_CAPACITY_OCCUPANCY_GRID
),
("navigation_costmap", OccupancyGrid): pSHMTransport(
"/navigation_costmap", default_capacity=DEFAULT_CAPACITY_OCCUPANCY_GRID
),
}

_transports_base = (
autoconnect() if platform.system() == "Linux" else autoconnect().transports(_mac_transports)
)
_transports_base = autoconnect().transports(_local_high_bandwidth_transports)


def _convert_camera_info(camera_info: Any) -> Any:
Expand Down
62 changes: 59 additions & 3 deletions dimos/visualization/rerun/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

from dimos.core.core import rpc
from dimos.core.module import Module, ModuleConfig
from dimos.core.transport import PubSubTransport
from dimos.protocol.pubsub.impl.lcmpubsub import LCM
from dimos.protocol.pubsub.patterns import Glob, pattern_matches
from dimos.protocol.pubsub.spec import SubscribeAllCapable
Expand Down Expand Up @@ -164,7 +165,10 @@ def _default_blueprint() -> Blueprint:


class Config(ModuleConfig):
# Pubsubs cover discoverable sources such as LCM. visual_transports is
# populated by the coordinator for concrete local streams such as SHM.
pubsubs: list[SubscribeAllCapable[Any, Any]] = field(default_factory=lambda: [LCM()])
visual_transports: list[PubSubTransport[Any]] = field(default_factory=list)

visual_override: dict[Glob | str, Callable[[Any], Archetype] | None] = field(
default_factory=dict
Expand All @@ -186,10 +190,12 @@ class Config(ModuleConfig):


class RerunBridgeModule(Module):
"""Bridge that logs messages from pubsubs to Rerun.
"""Bridge that logs transport messages to Rerun.

Spawns its own Rerun viewer and subscribes to all topics on each provided
pubsub. Any message that has a to_rerun() method is automatically logged.
Spawns its own Rerun viewer and subscribes to configured pubsubs and
explicit visual transports. Pubsubs cover discoverable transports such as
LCM; visual_transports covers concrete local transports such as SHM.
Any message that has a to_rerun() method is automatically logged.

Example:
from dimos.protocol.pubsub.impl.lcmpubsub import LCM
Expand All @@ -215,6 +221,8 @@ def __init__(self, **kwargs: Any) -> None:
self._last_log = {}
self._override_cache: dict[str, Callable[[Any], RerunData | None]] = {}
self._frame_attached: dict[str, str] = {}
self._subscribed_visual_transport_topics: set[str] = set()
self._started = False

@property
def host(self) -> str:
Expand Down Expand Up @@ -265,13 +273,56 @@ def composed(msg: Any) -> RerunData | None:
return composed

def _get_entity_path(self, topic: Any) -> str:
"""Map a transport topic to a Rerun entity path.

LCM topics usually already include a leading slash and a type suffix.
SHM topics are plain strings. Normalize both forms so visual overrides
such as "world/color_image" match consistently.
"""
if self.config.topic_to_entity:
return self.config.topic_to_entity(topic)

topic_str = getattr(topic, "name", None) or str(topic)
topic_str = topic_str.split("#")[0] # strip LCM topic suffix
if not topic_str.startswith("/"):
topic_str = f"/{topic_str}"
return f"{self.config.entity_prefix}{topic_str}"

@rpc
def set_visual_transports(self, transports: list[PubSubTransport[Any]]) -> None:
"""Replace explicit visual transports and subscribe when running.

The coordinator calls this after stream wiring and after loading
additional blueprints into an existing coordinator.
"""
self.config.visual_transports = transports
if self._started:
self._subscribe_visual_transports()

def _subscribe_visual_transports(self) -> None:
"""Attach to configured SHM streams once per topic."""
for transport in self.config.visual_transports:
topic = str(getattr(transport, "topic", ""))
if not topic or topic in self._subscribed_visual_transport_topics:
continue
self._subscribed_visual_transport_topics.add(topic)
if hasattr(transport, "start"):
transport.start()
# If subscribe raises, the bridge still owns cleanup for the transport it started.
self.register_disposable(Disposable(transport.stop))
transport_topic = getattr(transport, "topic", topic)

def on_visual_message(msg: Any, transport_topic: Any = transport_topic) -> None:
self._on_message(msg, transport_topic)

unsub = transport.subscribe(
# Capture the current topic so callbacks keep the correct
# entity path even as this loop advances to the next transport.
on_visual_message
)
if unsub is not None:
self.register_disposable(Disposable(unsub))

def _on_message(self, msg: Any, topic: Any) -> None:
"""Handle incoming message - log to rerun."""

Expand Down Expand Up @@ -306,6 +357,7 @@ def _on_message(self, msg: Any, topic: Any) -> None:
@rpc
def start(self) -> None:
super().start()
self._started = True

logger.info("Rerun bridge starting")

Expand Down Expand Up @@ -397,6 +449,8 @@ def start(self) -> None:
unsub = pubsub.subscribe_all(self._on_message)
self.register_disposable(Disposable(unsub))

self._subscribe_visual_transports()

for pubsub in self.config.pubsubs:
if hasattr(pubsub, "stop"):
self.register_disposable(Disposable(pubsub.stop)) # type: ignore[union-attr]
Expand Down Expand Up @@ -506,8 +560,10 @@ def log_blueprint_graph(self, dot_code: str, module_names: list[str]) -> None:

@rpc
def stop(self) -> None:
self._started = False
self._override_cache.clear()
self._frame_attached.clear()
self._subscribed_visual_transport_topics.clear()
super().stop()


Expand Down
Loading