diff --git a/dimos/constants.py b/dimos/constants.py index d849f4aaf3..5b83bce636 100644 --- a/dimos/constants.py +++ b/dimos/constants.py @@ -45,6 +45,11 @@ DEFAULT_CAPACITY_COLOR_IMAGE = 1920 * 1080 * 3 # Default depth image size: 1280x720 frame * 4 (float32 size) DEFAULT_CAPACITY_DEPTH_IMAGE = 1280 * 720 * 4 +# Fixed-capacity SHM channels must be sized before the first message arrives. +# These defaults cover current Go2 replay and navigation payloads while keeping +# large local streams off UDP multicast. +DEFAULT_CAPACITY_POINTCLOUD = 64 * 1024 * 1024 +DEFAULT_CAPACITY_OCCUPANCY_GRID = 16 * 1024 * 1024 # From https://github.com/lcm-proj/lcm.git LCM_MAX_CHANNEL_NAME_LENGTH = 63 diff --git a/dimos/core/coordination/module_coordinator.py b/dimos/core/coordination/module_coordinator.py index 60c41a6ba1..d0593b8e02 100644 --- a/dimos/core/coordination/module_coordinator.py +++ b/dimos/core/coordination/module_coordinator.py @@ -30,7 +30,14 @@ from dimos.core.global_config import GlobalConfig, global_config from dimos.core.module import ModuleBase, ModuleSpec from dimos.core.resource import Resource -from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport +from dimos.core.transport import ( + JpegShmTransport, + LCMTransport, + PubSubTransport, + SHMTransport, + pLCMTransport, + pSHMTransport, +) from dimos.spec.utils import is_spec, spec_annotation_compliance, spec_structural_compliance from dimos.utils.generic import short_id from dimos.utils.logging_config import setup_logger @@ -279,6 +286,9 @@ def _connect_streams(self, blueprint: Blueprint) -> None: module=module.__name__, transport=transport.__class__.__name__, ) + # SHM streams are concrete transport objects, not LCM topics. Forward + # them to Rerun after stream wiring has resolved the transport registry. + _configure_rerun_bridge_visual_transports(self) @classmethod def build( @@ -584,6 +594,31 @@ def _get_transport_for(blueprint: Blueprint, name: str, stream_type: type) -> Pu return transport +def _configure_rerun_bridge_visual_transports(coordinator: ModuleCoordinator) -> None: + """Send resolved SHM transports to an active Rerun bridge. + + RerunBridgeModule subscribes to configured pubsubs directly. For SHM + streams, the coordinator forwards the concrete transport objects after + stream wiring has selected them. + """ + from dimos.visualization.rerun.bridge import RerunBridgeModule + + if RerunBridgeModule not in coordinator._deployed_modules: + return + + # LCM transports are already visible through RerunBridgeModule.config.pubsubs. + transports = [ + transport + for transport in coordinator._transport_registry.values() + if isinstance(transport, SHMTransport | pSHMTransport | JpegShmTransport) + ] + if not transports: + return + + bridge = coordinator.get_instance(RerunBridgeModule) + bridge.set_visual_transports(transports) + + def _verify_no_name_conflicts(blueprint: Blueprint) -> None: name_to_types: dict[Any, set[type]] = defaultdict(set) name_to_modules: dict[Any, list[tuple[type, type]]] = defaultdict(list) diff --git a/dimos/core/transport.py b/dimos/core/transport.py index 6435003758..de2ced9cbb 100644 --- a/dimos/core/transport.py +++ b/dimos/core/transport.py @@ -163,14 +163,23 @@ def stop(self) -> None: class pSHMTransport(PubSubTransport[T]): + """Pickled shared-memory transport for local Python object streams.""" + _started: bool = False def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(topic) + self._kwargs = kwargs self.shm = PickleSharedMemory(**kwargs) def __reduce__(self): # type: ignore[no-untyped-def] - return (pSHMTransport, (self.topic,)) + # Preserve sizing options such as default_capacity when the coordinator + # sends this transport to workers or to Rerun. + return (pSHMTransport, (self.topic,), self._kwargs) + + def __setstate__(self, state: dict[str, Any]) -> None: + self._kwargs = state + self.shm = PickleSharedMemory(**state) def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] if not self._started: @@ -193,14 +202,23 @@ def stop(self) -> None: class SHMTransport(PubSubTransport[T]): + """Raw bytes shared-memory transport for local fixed-size payloads.""" + _started: bool = False def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def] super().__init__(topic) + self._kwargs = kwargs self.shm = BytesSharedMemory(**kwargs) def __reduce__(self): # type: ignore[no-untyped-def] - return (SHMTransport, (self.topic,)) + # Preserve sizing options such as default_capacity when the coordinator + # sends this transport to workers or to Rerun. + return (SHMTransport, (self.topic,), self._kwargs) + + def __setstate__(self, state: dict[str, Any]) -> None: + self._kwargs = state + self.shm = BytesSharedMemory(**state) def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] if not self._started: @@ -223,6 +241,8 @@ def stop(self) -> None: class JpegShmTransport(PubSubTransport[T]): + """JPEG-compressed shared-memory transport for local image streams.""" + _started: bool = False def __init__(self, topic: str, quality: int = 75, **kwargs) -> None: # type: ignore[no-untyped-def] @@ -233,9 +253,19 @@ def __init__(self, topic: str, quality: int = 75, **kwargs) -> None: # type: ig self.shm = JpegSharedMemory(quality=quality, **kwargs) self.quality = quality + self._kwargs = kwargs def __reduce__(self): # type: ignore[no-untyped-def] - return (JpegShmTransport, (self.topic, self.quality)) + # Preserve quality and sizing options when crossing worker boundaries. + return (JpegShmTransport, (self.topic, self.quality), self._kwargs) + + def __setstate__(self, state: dict[str, Any]) -> None: + from dimos.protocol.pubsub.impl.jpeg_shm import ( + JpegSharedMemory, + ) # deferred to avoid pulling in Image/cv2/rerun + + self._kwargs = state + self.shm = JpegSharedMemory(quality=self.quality, **state) def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def] if not self._started: diff --git a/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py b/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py index 96a291163d..ea315119c5 100644 --- a/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py +++ b/dimos/robot/unitree/go2/blueprints/basic/unitree_go2_basic.py @@ -14,29 +14,47 @@ # See the License for the specific language governing permissions and # limitations under the License. -import platform from typing import Any -from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE +from dimos.constants import ( + DEFAULT_CAPACITY_COLOR_IMAGE, + DEFAULT_CAPACITY_OCCUPANCY_GRID, + DEFAULT_CAPACITY_POINTCLOUD, +) from dimos.core.coordination.blueprints import autoconnect from dimos.core.global_config import global_config from dimos.core.transport import pSHMTransport +from dimos.msgs.nav_msgs.OccupancyGrid import OccupancyGrid from dimos.msgs.sensor_msgs.Image import Image +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.robot.unitree.go2.connection import GO2Connection from dimos.visualization.vis_module import vis_module -# Mac has some issue with high bandwidth UDP, so we use pSHMTransport for color_image -# actually we can use pSHMTransport for all platforms, and for all streams -# TODO need a global transport toggle on blueprints/global config -_mac_transports: dict[tuple[str, type], pSHMTransport[Image]] = { +# Route large local replay and mapping streams through SHM on every platform. +# Small control/status streams continue to use the default LCM transport. +_local_high_bandwidth_transports: dict[tuple[str, type], pSHMTransport[Any]] = { ("color_image", Image): pSHMTransport( - "color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + "/color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ), + ("lidar", PointCloud2): pSHMTransport("/lidar", default_capacity=DEFAULT_CAPACITY_POINTCLOUD), + ("pointcloud", PointCloud2): pSHMTransport( + "/pointcloud", default_capacity=DEFAULT_CAPACITY_POINTCLOUD + ), + ("global_map", PointCloud2): pSHMTransport( + "/global_map", default_capacity=DEFAULT_CAPACITY_POINTCLOUD + ), + ("merged_map", PointCloud2): pSHMTransport( + "/merged_map", default_capacity=DEFAULT_CAPACITY_POINTCLOUD + ), + ("global_costmap", OccupancyGrid): pSHMTransport( + "/global_costmap", default_capacity=DEFAULT_CAPACITY_OCCUPANCY_GRID + ), + ("navigation_costmap", OccupancyGrid): pSHMTransport( + "/navigation_costmap", default_capacity=DEFAULT_CAPACITY_OCCUPANCY_GRID ), } -_transports_base = ( - autoconnect() if platform.system() == "Linux" else autoconnect().transports(_mac_transports) -) +_transports_base = autoconnect().transports(_local_high_bandwidth_transports) def _convert_camera_info(camera_info: Any) -> Any: diff --git a/dimos/visualization/rerun/bridge.py b/dimos/visualization/rerun/bridge.py index 2f5fb1efa9..eb00f23730 100644 --- a/dimos/visualization/rerun/bridge.py +++ b/dimos/visualization/rerun/bridge.py @@ -43,6 +43,7 @@ from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig +from dimos.core.transport import PubSubTransport from dimos.protocol.pubsub.impl.lcmpubsub import LCM from dimos.protocol.pubsub.patterns import Glob, pattern_matches from dimos.protocol.pubsub.spec import SubscribeAllCapable @@ -164,7 +165,10 @@ def _default_blueprint() -> Blueprint: class Config(ModuleConfig): + # Pubsubs cover discoverable sources such as LCM. visual_transports is + # populated by the coordinator for concrete local streams such as SHM. pubsubs: list[SubscribeAllCapable[Any, Any]] = field(default_factory=lambda: [LCM()]) + visual_transports: list[PubSubTransport[Any]] = field(default_factory=list) visual_override: dict[Glob | str, Callable[[Any], Archetype] | None] = field( default_factory=dict @@ -186,10 +190,12 @@ class Config(ModuleConfig): class RerunBridgeModule(Module): - """Bridge that logs messages from pubsubs to Rerun. + """Bridge that logs transport messages to Rerun. - Spawns its own Rerun viewer and subscribes to all topics on each provided - pubsub. Any message that has a to_rerun() method is automatically logged. + Spawns its own Rerun viewer and subscribes to configured pubsubs and + explicit visual transports. Pubsubs cover discoverable transports such as + LCM; visual_transports covers concrete local transports such as SHM. + Any message that has a to_rerun() method is automatically logged. Example: from dimos.protocol.pubsub.impl.lcmpubsub import LCM @@ -215,6 +221,8 @@ def __init__(self, **kwargs: Any) -> None: self._last_log = {} self._override_cache: dict[str, Callable[[Any], RerunData | None]] = {} self._frame_attached: dict[str, str] = {} + self._subscribed_visual_transport_topics: set[str] = set() + self._started = False @property def host(self) -> str: @@ -265,13 +273,56 @@ def composed(msg: Any) -> RerunData | None: return composed def _get_entity_path(self, topic: Any) -> str: + """Map a transport topic to a Rerun entity path. + + LCM topics usually already include a leading slash and a type suffix. + SHM topics are plain strings. Normalize both forms so visual overrides + such as "world/color_image" match consistently. + """ if self.config.topic_to_entity: return self.config.topic_to_entity(topic) topic_str = getattr(topic, "name", None) or str(topic) topic_str = topic_str.split("#")[0] # strip LCM topic suffix + if not topic_str.startswith("/"): + topic_str = f"/{topic_str}" return f"{self.config.entity_prefix}{topic_str}" + @rpc + def set_visual_transports(self, transports: list[PubSubTransport[Any]]) -> None: + """Replace explicit visual transports and subscribe when running. + + The coordinator calls this after stream wiring and after loading + additional blueprints into an existing coordinator. + """ + self.config.visual_transports = transports + if self._started: + self._subscribe_visual_transports() + + def _subscribe_visual_transports(self) -> None: + """Attach to configured SHM streams once per topic.""" + for transport in self.config.visual_transports: + topic = str(getattr(transport, "topic", "")) + if not topic or topic in self._subscribed_visual_transport_topics: + continue + self._subscribed_visual_transport_topics.add(topic) + if hasattr(transport, "start"): + transport.start() + # If subscribe raises, the bridge still owns cleanup for the transport it started. + self.register_disposable(Disposable(transport.stop)) + transport_topic = getattr(transport, "topic", topic) + + def on_visual_message(msg: Any, transport_topic: Any = transport_topic) -> None: + self._on_message(msg, transport_topic) + + unsub = transport.subscribe( + # Capture the current topic so callbacks keep the correct + # entity path even as this loop advances to the next transport. + on_visual_message + ) + if unsub is not None: + self.register_disposable(Disposable(unsub)) + def _on_message(self, msg: Any, topic: Any) -> None: """Handle incoming message - log to rerun.""" @@ -306,6 +357,7 @@ def _on_message(self, msg: Any, topic: Any) -> None: @rpc def start(self) -> None: super().start() + self._started = True logger.info("Rerun bridge starting") @@ -397,6 +449,8 @@ def start(self) -> None: unsub = pubsub.subscribe_all(self._on_message) self.register_disposable(Disposable(unsub)) + self._subscribe_visual_transports() + for pubsub in self.config.pubsubs: if hasattr(pubsub, "stop"): self.register_disposable(Disposable(pubsub.stop)) # type: ignore[union-attr] @@ -506,8 +560,10 @@ def log_blueprint_graph(self, dot_code: str, module_names: list[str]) -> None: @rpc def stop(self) -> None: + self._started = False self._override_cache.clear() self._frame_attached.clear() + self._subscribed_visual_transport_topics.clear() super().stop() diff --git a/hackathon/pirate/README.md b/hackathon/pirate/README.md new file mode 100644 index 0000000000..35ee8cf7a9 --- /dev/null +++ b/hackathon/pirate/README.md @@ -0,0 +1,86 @@ +# Pirate — Go2 Vision-Guided Object Interaction + +Hackathon submission from 0xmandy. + +## Summary + +Pirate is a vision-servoing stack for Unitree Go2 that makes the robot hunt down +objects like a pirate hunting treasure. The robot scans with its onboard camera, +runs YOLO detection to find a target (bottle or ball), closes in with bearing-locked +visual control, pushes the object, then celebrates with sport gestures. + +The core loop is: + +```text +camera frame → YOLO detection → bearing + distance estimate + → approach burst → realign → push → Hello → backup → Scrape +``` + +This submission keeps DimOS as the robot/runtime layer and adds a vision-servoing +control layer with replayable per-run decision traces on top. + +## Links + +- Project repo: https://github.com/0xmandy/pirate +- Final demo video: https://github.com/0xmandy/pirate/blob/main/artifacts/showcase/final_demo.mp4 +- Decision traces: https://github.com/0xmandy/pirate/tree/main/artifacts/decision_traces +- Sensor stream profile: https://github.com/0xmandy/pirate/blob/main/artifacts/sensor_stream_profile.json + +## What We Built + +- YOLO visual servoing: detect bottle/ball → extract bearing → drive toward it +- Closed-loop realign: if yaw error > 8° at standoff, back off and re-approach +- Push phase: forward burst 0.35 m/s × 0.5 s, displacement confirmed via ball pose +- Full gesture sequence: approach → push → Hello (api_id=1016) → backup → Scrape (api_id=1029) +- MuJoCo sim proof: ball freejoint physics, push displacement 0.118–0.196 m confirmed +- Real hardware smoke test: odom ~19 Hz, camera ~10–12 Hz, lidar ~7–8 Hz verified +- Per-run decision trace JSON: yaw error, realign decision, push outcome, ball displacement + +## Why It Fits DimOS + +DimOS already exposes everything needed: streams, camera frames, odometry, and +Go2 control skills. Pirate adds a thin vision-and-action evidence layer on top: + +```text +DimOS observes and executes +YOLO guides the approach bearing +decision traces explain each run +``` + +That boundary keeps DimOS as the hardware/control system while making robot +decisions inspectable and reproducible. + +## Results + +| Metric | Value | +|---|---| +| Push displacement — run 001 | **0.118 m** (yaw 11.5° → realign → 7.1°) | +| Push displacement — run 003 | **0.196 m** (yaw 9.6° → realign → 1.5°) | +| Vision approach success rate | 100% (2/2 valid runs) | +| Premature contact abort rate | ~33% (1/3); fix: offset approach target_y by −0.15 m | +| Real odom rate | ~19 Hz | +| Real camera rate | ~10–12 Hz | +| Confirmed gestures on hardware | Hello ✅ Scrape ✅ | + +## Repro + +Clone the project repo and run: + +```bash +# Syntax check all scripts +make check + +# Mujoco sim — ball scene (no robot needed) +make sim-run + +# Real robot — water bottle +python examples/go2_bottle_demo_sequence.py \ + --backend webrtc-ap --ball-distance 0.8 \ + --execute --i-understand-this-is-real-robot +``` + +## Submission Note + +This PR intentionally does not vendor the full project into DimOS. The full source, +generated artifacts, decision traces, and demo video live in the project repo +linked above. DimOS core files are not modified by this submission.