diff --git a/data/.lfs/go2_china_office_indoor.mcap.tar.gz b/data/.lfs/go2_china_office_indoor.mcap.tar.gz new file mode 100644 index 0000000000..ea42727902 --- /dev/null +++ b/data/.lfs/go2_china_office_indoor.mcap.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:569b8401f15745ef68663dc3ac5fba0dcbdc7ed4ac9f599b3a9db895f63bf2a4 +size 1670777651 diff --git a/dimos/memory2/cli/app.py b/dimos/memory2/cli/app.py new file mode 100644 index 0000000000..2424c600df --- /dev/null +++ b/dimos/memory2/cli/app.py @@ -0,0 +1,34 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""``dimos mem`` — memory2 store commands.""" + +from __future__ import annotations + +import typer + +mem_app = typer.Typer(help="memory2 store commands", no_args_is_help=True) + + +@mem_app.command() +def rerun( + path: str = typer.Argument(..., help="Store to render (.mcap path/name or .db)"), + out: str = typer.Option(None, "--out", help="Output .rrd (default: alongside the source)"), + seconds: float = typer.Option(None, "--seconds", help="Only the first N seconds"), + no_gui: bool = typer.Option(False, "--no-gui", help="Write the .rrd but don't open the viewer"), +) -> None: + """Render a memory2 store into rerun (writes a .rrd, then opens the viewer).""" + from dimos.memory2.cli.render import open_store, render_store + + render_store(open_store(path), out=out, seconds=seconds, no_gui=no_gui) diff --git a/dimos/memory2/cli/render.py b/dimos/memory2/cli/render.py new file mode 100644 index 0000000000..0e025af52f --- /dev/null +++ b/dimos/memory2/cli/render.py @@ -0,0 +1,126 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Render any memory2 store into rerun. + +Generic: walks the store's streams and logs every observation whose payload +implements ``to_rerun()`` (the :class:`RerunConvertible` convention). Streams +whose payload has no ``to_rerun`` are skipped. Each stream becomes an entity +path; observations share one ``time`` timeline (relative to the store's earliest +observation, so streams stay aligned). Writes a ``.rrd`` and opens the viewer. +""" + +from __future__ import annotations + +from pathlib import Path +import shutil +import subprocess +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from dimos.memory2.store.base import Store + + +def open_store(path: str) -> Store: + """Open a store by file type (``.db`` -> SqliteStore, else Go2 mcap).""" + if str(path).endswith(".db"): + from dimos.memory2.store.sqlite import SqliteStore + + return SqliteStore(path=path, must_exist=True) + from dimos.robot.unitree.go2dds.store import Go2McapStore # lazy: robot-layer codec set + + return Go2McapStore(path=path) + + +def _open_viewer(rrd: str) -> None: + exe = shutil.which("rerun") + if exe: + subprocess.Popen([exe, rrd]) + print(f" opening {rrd} in rerun") + else: + print(f" rerun viewer not found on PATH; open manually:\n rerun {rrd}") + + +def render_store( + store: Store, + *, + out: str | None = None, + seconds: float | None = None, + no_gui: bool = False, +) -> str: + """Render ``store`` to a ``.rrd`` and (unless ``no_gui``) open the rerun viewer. + + Logs every observation (full res); ``seconds`` bounds the time window from + the start. Returns the ``.rrd`` path. + """ + import rerun as rr + + from dimos.memory2.utils.progress import progress + from dimos.visualization.rerun.init import rerun_init + + if out is None: + src = getattr(store.config, "path", None) or "store" + out = str(Path(src).with_suffix(".rrd")) + + # Discover renderable streams (payload has a working to_rerun) + shared anchor. + renderable = [] + t0: float | None = None + for name in store.list_streams(): + stream = store.streams[name] + try: + first = stream.first() + except LookupError: + continue + data = first.data + if not hasattr(data, "to_rerun"): + print(f" skip {name}: {type(data).__name__} has no to_rerun()") + continue + try: + data.to_rerun() + except Exception as e: + print(f" skip {name}: to_rerun() failed ({e})") + continue + renderable.append((name, stream)) + t0 = first.ts if t0 is None else min(t0, first.ts) + + if t0 is None: + print("nothing renderable in this store") + return out + + rerun_init("dimos mem rerun") + rr.save(out) + + for name, stream in renderable: + report = progress(stream.count(), label=name) + for obs in stream: + if seconds is not None and obs.ts - t0 > seconds: + print() # terminate the windowed (sub-100%) progress line + break + if obs.data is None: # e.g. a truncated/corrupt frame that failed to decode + report(obs) + continue + rr.set_time("time", duration=obs.ts - t0) + data = obs.data.to_rerun() + if isinstance(data, list): # RerunMulti: [(subpath, archetype), ...] + for sub, arch in data: + rr.log(f"{name}/{sub}", arch) + else: + rr.log(name, data) + report(obs) + + rr.rerun_shutdown() # flush + close the .rrd before opening it + print(f"wrote {out}") + if not no_gui: + _open_viewer(out) + return out diff --git a/dimos/memory2/store/base.py b/dimos/memory2/store/base.py index fb6d76379d..74f94b6581 100644 --- a/dimos/memory2/store/base.py +++ b/dimos/memory2/store/base.py @@ -200,6 +200,10 @@ def list_streams(self) -> list[str]: """Return names of all streams in this store.""" return list(self._streams.keys()) + def summary(self) -> str: + """One line per stream — name, count, ts range. See :meth:`Stream.summary`.""" + return "\n".join(self.stream(name).summary() for name in self.list_streams()) + def delete_stream(self, name: str) -> None: """Delete a stream by name (from cache and underlying storage).""" stream = self._streams.pop(name, None) diff --git a/dimos/memory2/store/mcap.py b/dimos/memory2/store/mcap.py new file mode 100644 index 0000000000..06744c3ec2 --- /dev/null +++ b/dimos/memory2/store/mcap.py @@ -0,0 +1,181 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Read-only memory2 store backed by an mcap file. + +Generic and codec-injected — it knows nothing about any robot. The caller +supplies ``codecs`` (DDS/wire topic -> codec that decodes a message's stored +bytes) and an optional ``streams`` map (friendly stream name -> topic). See +``dimos.robot.unitree.go2dds.store.Go2McapStore`` for the Go2 wiring. + +Read-only: no append, blobs, vectors, or embeddings. Payloads decode lazily on +``obs.data``; ts and counts are cheap (counts come from the mcap index). +""" + +from __future__ import annotations + +from collections.abc import Iterator, Mapping +from dataclasses import replace +from functools import partial +from typing import Any, Protocol, runtime_checkable + +from dimos.memory2.backend import Backend +from dimos.memory2.codecs.base import codec_for +from dimos.memory2.notifier.subject import SubjectNotifier +from dimos.memory2.observationstore.base import ObservationStore, ObservationStoreConfig +from dimos.memory2.store.base import Store, StoreConfig +from dimos.memory2.type.filter import StreamQuery +from dimos.memory2.type.observation import Observation + + +@runtime_checkable +class StreamCodec(Protocol): + """What the store needs to turn a channel's stored bytes into a payload.""" + + @property + def payload_type(self) -> type: ... + + def decode(self, data: bytes) -> Any: ... + + +def _slug(topic: str) -> str: + """Auto stream name from a topic: drop the ``rt/`` prefix and ``/`` -> ``_``. + + ``rt/`` is the ROS2-over-DDS topic prefix; ``removeprefix`` only strips it + where present (e.g. app-level ``control_log`` is left alone). + """ + return topic.removeprefix("rt/").replace("/", "_") + + +class McapObservationStoreConfig(ObservationStoreConfig): + name: str = "" + + +class McapObservationStore(ObservationStore[Any]): + """Read-only metadata/query over one mcap channel. Payloads load lazily.""" + + config: McapObservationStoreConfig + + def __init__(self, *, name: str, path: str, topic: str, codec: StreamCodec, count: int) -> None: + super().__init__(name=name) + self._path = path + self._topic = topic + self._codec = codec + self._count = count + + @property + def name(self) -> str: + return self.config.name + + def _iter(self, reverse: bool = False) -> Iterator[Observation[Any]]: + from mcap.reader import make_reader # optional dep (go2/unitree extra) + + decode, dtype, n = self._codec.decode, self._codec.payload_type, self._count + with open(self._path, "rb") as f: + msgs = make_reader(f).iter_messages(topics=[self._topic], reverse=reverse) + for i, (_s, _c, m) in enumerate(msgs): + yield Observation( + id=(n - 1 - i) if reverse else i, + ts=m.log_time / 1e9, + data_type=dtype, + _loader=partial(decode, m.data), + ) + + def query(self, q: StreamQuery) -> Iterator[Observation[Any]]: + # mcap is natively log-time ordered (== ts == our id), so serve ts/id + # ordering by iterating forward/reverse instead of materializing + sorting. + if q.order_field in ("ts", "id"): + it = self._iter(reverse=q.order_desc) + q = replace(q, order_field=None, order_desc=False) + return q.apply(it) + return q.apply(self._iter()) + + def count(self, q: StreamQuery) -> int: + if not q.filters and q.search_text is None and q.search_vec is None: + n = self._count + if q.offset_val: + n = max(0, n - q.offset_val) + if q.limit_val is not None: + n = min(n, q.limit_val) + return n + return sum(1 for _ in self.query(q)) + + def fetch_by_ids(self, ids: list[int]) -> list[Observation[Any]]: + want = set(ids) + return [o for o in self._iter() if o.id in want] + + def insert(self, obs: Observation[Any]) -> int: + raise NotImplementedError("McapStore is read-only") + + +class McapStoreConfig(StoreConfig): + path: str = "" + + +class McapStore(Store): + """A memory2 store backed by an mcap file (read-only). + + Every channel present in the file with a codec is exposed. Names default to + the slugified topic (see :func:`_slug`); ``streams`` (friendly name -> topic) + overrides the name for specific topics. + """ + + config: McapStoreConfig + + def __init__( + self, + *, + codecs: Mapping[str, StreamCodec], + streams: dict[str, str] | None = None, + **kwargs: Any, + ) -> None: + from mcap.reader import make_reader # optional dep (go2/unitree extra) + + super().__init__(**kwargs) + self._codecs = codecs + name_of = {topic: name for name, topic in (streams or {}).items()} # topic -> override + with open(self.config.path, "rb") as f: + summary = make_reader(f).get_summary() + self._stream_topic: dict[str, str] = {} # stream name -> topic + self._available: dict[str, int] = {} # stream name -> message count + if summary is not None and summary.statistics is not None: + for cid, ch in summary.channels.items(): + if ch.topic not in self._codecs: + continue + name = name_of.get(ch.topic) or _slug(ch.topic) + self._stream_topic[name] = ch.topic + self._available[name] = summary.statistics.channel_message_counts.get(cid, 0) + + def list_streams(self) -> list[str]: + return sorted(set(self._available) | set(self._streams)) + + def _create_backend( + self, name: str, payload_type: type | None = None, **config: Any + ) -> Backend[Any]: + if name not in self._available: + raise KeyError(f"No stream {name!r}. Available: {sorted(self._available)}") + topic = self._stream_topic[name] + codec = self._codecs[topic] + ptype = codec.payload_type + obs = McapObservationStore( + name=name, path=self.config.path, topic=topic, codec=codec, count=self._available[name] + ) + return Backend( + metadata_store=obs, + codec=codec_for(ptype), # storage codec, unused (blob_store=None) + data_type=ptype, + blob_store=None, + vector_store=None, + notifier=SubjectNotifier(), + ) diff --git a/dimos/memory2/stream.py b/dimos/memory2/stream.py index d0f4f14ca0..e2fb5c5b0a 100644 --- a/dimos/memory2/stream.py +++ b/dimos/memory2/stream.py @@ -196,8 +196,22 @@ def limit(self, k: int) -> Stream[T, O]: def offset(self, n: int) -> Stream[T, O]: return self._replace_query(offset_val=n) - # Time windowing — None means unbounded on that side. ``*_time`` is relative - # to the stream's first observation; ``*_timestamp`` is absolute epoch seconds. + # Windowing helpers — None on either bound means unbounded on that side. + # Index helpers (``*_seek``) count observations; time helpers (``*_time``) + # are relative to the first observation; ``*_timestamp`` is absolute epoch. + def from_seek(self, i: int | None) -> Stream[T, O]: + """Window by index: drop the first ``i`` observations.""" + return self if i is None else self.offset(i) + + def to_seek(self, i: int | None) -> Stream[T, O]: + """Window by index: keep the first ``i`` observations.""" + return self if i is None else self.limit(i) + + def range_seek(self, start: int | None, stop: int | None) -> Stream[T, O]: + """Window by index: observations ``[start, stop)``.""" + s = self if start is None else self.offset(start) + return s if stop is None else s.limit(stop - (start or 0)) + def from_time(self, seconds: float | None) -> Stream[T, O]: """Keep observations from ``seconds`` after the first (relative).""" if seconds is None: @@ -218,6 +232,17 @@ def to_time(self, seconds: float | None) -> Stream[T, O]: return self return self.before(t0 + seconds) + def range_time(self, start: float | None, stop: float | None) -> Stream[T, O]: + """Window by time: ``[start, stop)`` seconds from the first observation.""" + if start is None and stop is None: + return self + try: + t0 = self.first().ts + except LookupError: + return self + s = self if start is None else self.after(t0 + start) + return s if stop is None else s.before(t0 + stop) + def from_timestamp(self, ts: float | None) -> Stream[T, O]: """Keep observations after absolute epoch ``ts``.""" return self if ts is None else self.after(ts) @@ -388,20 +413,25 @@ def get_time_range(self) -> tuple[float, float]: return (first.ts, last.ts) def summary(self) -> str: - """Return a short human-readable summary: count, time range, duration.""" + """Return a short human-readable summary: count, time range, avg frequency. + + Relies on ``count()`` and ``get_time_range()`` (``first()`` + ``last()``), + all of which backends are expected to serve cheaply — SQL via ORDER BY, + the mcap store via forward/reverse iteration. + """ from datetime import datetime, timezone n = self.count() if n == 0: return f"{self}: empty" - (t0, t1) = self.get_time_range() - + t0, t1 = self.get_time_range() + dur = t1 - t0 + hz = (n - 1) / dur if n > 1 and dur > 0 else 0.0 fmt = "%Y-%m-%d %H:%M:%S" dt0 = datetime.fromtimestamp(t0, tz=timezone.utc).strftime(fmt) dt1 = datetime.fromtimestamp(t1, tz=timezone.utc).strftime(fmt) - dur = t1 - t0 - return f"{self}: {n} items, {dt0} — {dt1} ({dur:.1f}s)" + return f"{self}: {n} items, {dt0} — {dt1} ({dur:.1f}s, {hz:.1f} Hz)" def materialize(self) -> Stream[T, O]: """Materialize into memory and return a replayable stream. @@ -416,6 +446,9 @@ def materialize(self) -> Stream[T, O]: self.save(target).drain() return target + def run(self) -> int: + return self.drain() + def drain(self) -> int: """Consume all observations, discarding results. Returns count consumed. diff --git a/dimos/memory2/test_stream.py b/dimos/memory2/test_stream.py index fe92650a12..e5d691f88c 100644 --- a/dimos/memory2/test_stream.py +++ b/dimos/memory2/test_stream.py @@ -783,6 +783,38 @@ def consumer(): assert results == ["b", "d"] +class TestWindowing: + """Index (*_seek) and time (*_time) windowing; None = unbounded that side.""" + + def test_to_seek_keeps_first_i(self, make_stream): + assert [o.data for o in make_stream(10).to_seek(3)] == [0, 10, 20] + + def test_from_seek_drops_first_i(self, make_stream): + assert [o.data for o in make_stream(10).from_seek(7)] == [70, 80, 90] + + def test_range_seek(self, make_stream): + assert [o.data for o in make_stream(10).range_seek(2, 5)] == [20, 30, 40] + + def test_to_time_keeps_first_seconds(self, make_stream): + # ts == index here; to_time(s) keeps ts < first_ts + s + assert [o.data for o in make_stream(10).to_time(2.0)] == [0, 10] + + def test_from_time(self, make_stream): + assert [o.data for o in make_stream(10).from_time(7.0)] == [80, 90] + + def test_range_time_uses_one_anchor(self, make_stream): + # anchor is the first observation, not the post-filter first + assert [o.data for o in make_stream(10, start_ts=100.0).range_time(2.0, 5.0)] == [30, 40] + + def test_none_is_noop(self, make_stream): + s = make_stream(10) + assert s.to_seek(None).count() == 10 + assert s.from_seek(None).count() == 10 + assert s.range_seek(None, None).count() == 10 + assert s.to_time(None).count() == 10 + assert s.range_time(None, None).count() == 10 + + class TestTimeWindowing: """``*_time`` is relative to the first observation, ``*_timestamp`` is absolute. diff --git a/dimos/memory2/utils/progress.py b/dimos/memory2/utils/progress.py new file mode 100644 index 0000000000..29aaf8d4e4 --- /dev/null +++ b/dimos/memory2/utils/progress.py @@ -0,0 +1,62 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A per-observation progress callback, shaped for ``Stream.tap``. + + stream.tap(progress(stream.count(), "render")).drain() + +Prints a single rewritten line: percent, count, data-seconds covered, speed +relative to wall-clock (``x rt``), and per-frame latency. +""" + +from __future__ import annotations + +from collections.abc import Callable +import time +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from dimos.memory2.type.observation import Observation + + +def progress(total: int, label: str = "") -> Callable[[Observation[Any]], None]: + """Return a callback that prints streaming progress, one call per observation.""" + seen = 0 + wall_start: float | None = None + last_wall: float | None = None + first_ts: float | None = None + + def _progress(obs: Observation[Any]) -> None: + nonlocal seen, wall_start, last_wall, first_ts + now = time.monotonic() + if wall_start is None: + wall_start = now + first_ts = obs.ts + assert first_ts is not None # narrowed by the same `if` above + frame_ms = (now - last_wall) * 1000 if last_wall is not None else 0.0 + last_wall = now + seen += 1 + pct = 100 * seen // total if total else 100 + wall = now - wall_start + data = obs.ts - first_ts + speed = data / wall if wall > 0 else 0.0 + end = "\n" if seen >= total else "" + prefix = f"{label} " if label else "" + print( + f"\r{prefix}{pct:>3}% [{seen}/{total}] {data:.1f}s ({speed:.1f} x rt) {frame_ms:.0f}ms/frame", + end=end, + flush=True, + ) + + return _progress diff --git a/dimos/msgs/geometry_msgs/Transform.py b/dimos/msgs/geometry_msgs/Transform.py index 44bd6b2440..232708f36e 100644 --- a/dimos/msgs/geometry_msgs/Transform.py +++ b/dimos/msgs/geometry_msgs/Transform.py @@ -308,17 +308,24 @@ def lcm_decode(cls, data: bytes | BinaryIO) -> Transform: ts=ts, ) - def to_rerun(self) -> rr.Transform3D: - """Convert to rerun Transform3D format with frame IDs. + def to_rerun(self, frameless: bool = False) -> rr.Transform3D: + """Convert to a rerun Transform3D. - Returns: - rr.Transform3D archetype for logging to rerun with parent/child frames + Args: + frameless: omit ``parent_frame``/``child_frame``. By default the + transform carries its frame IDs (resolved via rerun's tf-graph); + set this when positioning by entity-path hierarchy instead, where + the named frames would not compose. """ import rerun as rr + translation = [self.translation.x, self.translation.y, self.translation.z] + rotation = self.rotation.to_rerun() + if frameless: + return rr.Transform3D(translation=translation, rotation=rotation) return rr.Transform3D( - translation=[self.translation.x, self.translation.y, self.translation.z], - rotation=self.rotation.to_rerun(), + translation=translation, + rotation=rotation, parent_frame="tf#/" + self.frame_id, child_frame="tf#/" + self.child_frame_id, ) diff --git a/dimos/msgs/sensor_msgs/Imu.py b/dimos/msgs/sensor_msgs/Imu.py index dd80e819ff..e1cd1f6b62 100644 --- a/dimos/msgs/sensor_msgs/Imu.py +++ b/dimos/msgs/sensor_msgs/Imu.py @@ -117,3 +117,9 @@ def __repr__(self) -> str: f"linear_acceleration={self.linear_acceleration}, " f"orientation={self.orientation})" ) + + def to_rerun(self, axis_length: float = 0.2): # type: ignore[no-untyped-def] + """Orientation axes for rerun.""" + import rerun as rr + + return rr.TransformAxes3D(axis_length=axis_length) diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 209c30c3e8..f237f75069 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -675,6 +675,10 @@ def send( main.command(name="map")(_map_main) +from dimos.memory2.cli.app import mem_app + +main.add_typer(mem_app, name="mem") + @main.command() def cameracalibrate( diff --git a/dimos/robot/unitree/go2dds/.gitignore b/dimos/robot/unitree/go2dds/.gitignore new file mode 100644 index 0000000000..98d8a5a630 --- /dev/null +++ b/dimos/robot/unitree/go2dds/.gitignore @@ -0,0 +1 @@ +logs diff --git a/dimos/robot/unitree/go2dds/cdr.py b/dimos/robot/unitree/go2dds/cdr.py new file mode 100644 index 0000000000..581086596c --- /dev/null +++ b/dimos/robot/unitree/go2dds/cdr.py @@ -0,0 +1,135 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Generic little-endian CDR (XCDR1) decoder driven by a field spec. + +A spec class declares ``__cdr_fields__`` — an ordered list of ``(name, type)``. +``type`` is a primitive code (``"u8"``, ``"f32"`` …), ``"string"``, a nested spec +class, or ``("array", elem, n)`` / ``("seq", elem)``. ``decode(buf, Cls)`` walks +the body with CDR alignment and returns a populated instance. + +This replaces per-message hand-rolled decoders: the wire layout lives in the +spec, not in code. The same spec also generates the IDL we embed into mcaps. +""" + +from __future__ import annotations + +import struct +from typing import Any + +import numpy as np + +# code -> (struct char, byte size, numpy dtype) +_PRIM: dict[str, tuple[str, int, str]] = { + "i8": ("b", 1, " int: + """CDR alignment (bytes) of a field type.""" + if isinstance(t, str): + if t == "string": + return 4 # u32 length prefix + return _PRIM[t][1] + if isinstance(t, tuple): + if t[0] == "array": + return _align_of(t[1]) + if t[0] == "seq": + return 4 # u32 length prefix + if isinstance(t, type): + return _struct_align(t) + raise TypeError(f"unknown field type {t!r}") + + +def _struct_align(cls: Any) -> int: + a = getattr(cls, "__cdr_align__", None) + if a is None: + a = max((_align_of(t) for _, t in cls.__cdr_fields__), default=1) + cls.__cdr_align__ = a + return a + + +class Cursor: + """Body-relative cursor; offset 0 is just after the 4-byte encapsulation header.""" + + __slots__ = ("b", "p") + + def __init__(self, b: bytes) -> None: + self.b = b + self.p = 4 # skip CDR encapsulation header + + def align(self, n: int) -> None: + m = (self.p - 4) % n + if m: + self.p += n - m + + def prim(self, code: str) -> Any: + ch, sz, _ = _PRIM[code] + self.align(sz) + v = struct.unpack_from("<" + ch, self.b, self.p)[0] + self.p += sz + return v + + def prim_array(self, code: str, n: int) -> np.ndarray: + _, sz, dt = _PRIM[code] + self.align(sz) + a = np.frombuffer(self.b, dt, n, self.p).copy() + self.p += sz * n + return a + + def string(self) -> str: + n = self.prim("u32") + v = self.b[self.p : self.p + max(0, n - 1)].decode("ascii", "replace") + self.p += n + return v + + +def _read(cur: Cursor, t: Any) -> Any: + if isinstance(t, str): + return cur.string() if t == "string" else cur.prim(t) + if isinstance(t, tuple): + kind, elem = t[0], t[1] + n = t[2] if kind == "array" else cur.prim("u32") + if isinstance(elem, str) and elem in _PRIM: + return cur.prim_array(elem, n) + return [_read(cur, elem) for _ in range(n)] + if isinstance(t, type): + cur.align(_struct_align(t)) + return _read_struct(cur, t) + raise TypeError(f"unknown field type {t!r}") + + +def _read_struct(cur: Cursor, cls: Any) -> Any: + return cls(**{name: _read(cur, t) for name, t in cls.__cdr_fields__}) + + +def decode(buf: bytes, cls: Any) -> tuple[Any, int]: + """Decode ``buf`` as a CDR ``cls``. Returns ``(instance, end_offset)``. + + ``end_offset`` should equal ``len(buf)`` for a fixed-layout message — the + cheapest correctness check against a real recording. + """ + cur = Cursor(buf) + cur.align(_struct_align(cls)) + return _read_struct(cur, cls), cur.p diff --git a/dimos/robot/unitree/go2dds/cli/render.py b/dimos/robot/unitree/go2dds/cli/render.py new file mode 100644 index 0000000000..142e1355c2 --- /dev/null +++ b/dimos/robot/unitree/go2dds/cli/render.py @@ -0,0 +1,318 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Render Go2 odom sources to rerun — memory2 store pipelines (standalone). + +Each *pipeline* is a function ``(store, seconds) -> None`` composed from +reusable stream transforms over standard dimos messages. ``leg_odom`` logs both +the per-frame pose (Transform3D) and the accumulated trajectory (nav_msgs/Path):: + + sportmodestate.map_data(pose).tap(log_pose) # moving frame, full rate + .transform(throttle(0.1)).transform(accumulate_path).tap(log_path) # growing path + +Standalone — not wired into the dimos CLI: + + uv run python -m dimos.robot.unitree.go2dds.cli.render \ + go2_china_office_indoor.mcap --seconds 120 +""" + +from __future__ import annotations + +from collections.abc import Callable +import shutil +import subprocess +from typing import TYPE_CHECKING, Any, cast + +import numpy as np +import rerun as rr +import typer + +from dimos.memory2.transform import throttle +from dimos.memory2.utils.progress import progress +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.nav_msgs.Path import Path +from dimos.robot.unitree.go2dds.extrinsics import LIDAR_TO_BASE +from dimos.robot.unitree.go2dds.msgs.SportModeState import SportModeState +from dimos.robot.unitree.go2dds.store import Go2McapStore + +if TYPE_CHECKING: + from collections.abc import Iterator + + from dimos.memory2.type.observation import Observation + from dimos.msgs.sensor_msgs.Image import Image + from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 + +WORLD = "world" + + +def world_accel(obs: Observation[Any]) -> np.ndarray: + """IMU linear acceleration rotated into the world frame (includes gravity).""" + la = obs.data.linear_acceleration + a = obs.data.orientation.rotate_vector(Vector3(la.x, la.y, la.z)) + return np.array([float(a.x), float(a.y), float(a.z)]) + + +def gravity_bias( + store: Go2McapStore, window: float = 500.0, thr: float = 0.02, min_static: float = 0.5 +) -> np.ndarray: + """Calibrate gravity + accel bias from the middles of leg-odom-stationary windows. + + Over the first ``window`` s, where sportmode speed < ``thr`` for >= ``min_static`` s + the robot is still. Average the IMU world accel over the *middle half* of each such + stretch (skipping the accel/decel edges). That mean is gravity + bias; subtracting + it (vs a hardcoded [0,0,9.81]) zeroes the residual while stationary. + """ + sm = store.streams.sportmodestate.to_time(window).to_list() + ts = np.array([o.ts for o in sm]) + speed = np.array([float(np.linalg.norm(o.data.velocity)) for o in sm]) + + mids: list[tuple[float, float]] = [] # middle-half of each static stretch + i, n = 0, len(speed) + while i < n: + if speed[i] >= thr: + i += 1 + continue + j = i + while j < n and speed[j] < thr: + j += 1 + t0, t1 = float(ts[i]), float(ts[j - 1]) + if t1 - t0 >= min_static: + center, half = (t0 + t1) / 2, (t1 - t0) / 4 + mids.append((center - half, center + half)) + i = j + + accs = [ + world_accel(o) + for o in store.streams.imu.to_time(window) + if any(lo <= o.ts <= hi for lo, hi in mids) + ] + if not accs: + return np.array([0.0, 0.0, 9.81]) + return cast("np.ndarray", np.mean(accs, axis=0)) + + +def sportmode_pose(obs: Observation[SportModeState]) -> PoseStamped: + """map_data: SportModeState -> PoseStamped (leg-inertial pose).""" + sm = obs.data + w, x, y, z = (float(v) for v in sm.imu_state.quaternion) # Unitree order: wxyz + return PoseStamped( + ts=obs.ts, + frame_id=WORLD, + position=[float(v) for v in sm.position], + orientation=[x, y, z, w], + ) + + +def integrate_velocity( + gravity: np.ndarray, +) -> Callable[[Any, Observation[Any]], tuple[Any, TwistStamped]]: + """scan_data factory: Imu -> TwistStamped, integrating (world accel - gravity).""" + + def step(state: Any, obs: Observation[Any]) -> tuple[Any, TwistStamped]: + vel, prev = state + a_world = world_accel(obs) - gravity + if prev is not None: + vel = vel + a_world * (obs.ts - prev) + twist = TwistStamped( + ts=obs.ts, frame_id=WORLD, linear=vel.tolist(), angular=[0.0, 0.0, 0.0] + ) + return (vel, obs.ts), twist + + return step + + +def integrate_position(state: Any, obs: Observation[Any]) -> tuple[Any, PoseStamped]: + """scan_data: TwistStamped -> PoseStamped (velocity integrated to position).""" + pos, prev = state + v = obs.data.linear + if prev is not None: + pos = pos + np.array([v.x, v.y, v.z]) * (obs.ts - prev) + pose = PoseStamped( + ts=obs.ts, frame_id=WORLD, position=pos.tolist(), orientation=[0.0, 0.0, 0.0, 1.0] + ) + return (pos, obs.ts), pose + + +def accumulate_path(upstream: Iterator[Observation[PoseStamped]]) -> Iterator[Observation[Path]]: + """transform: yield the growing nav_msgs/Path as each pose streams in.""" + path = Path(frame_id=WORLD) + for obs in upstream: + path = path.push(obs.data) + yield obs.derive(data=path) + + +def leg_odom(store: Go2McapStore, seconds: float | None) -> None: + """Leg-inertial odometry — pose stream (Transform3D) + accumulated Path line.""" + + def log_pose(obs: Observation[PoseStamped]) -> None: + rr.set_time("time", timestamp=obs.ts) + rr.log("world/leg_odom", obs.data.to_rerun(), rr.TransformAxes3D(axis_length=0.2)) + + def log_path(obs: Observation[Path]) -> None: + rr.set_time("time", timestamp=obs.ts) + rr.log("world/leg_odom_path", obs.data.to_rerun()) + + src = store.streams.sportmodestate.to_time(seconds) + ( + src.tap(progress(src.count(), "leg_odom")) + .map_data(sportmode_pose) + .tap(log_pose) + .transform(throttle(0.1)) # reduce_rate: thin the path to ~10 Hz + .transform(accumulate_path) # yield the growing path each step + .tap(log_path) + .drain() + ) + + +def imu_odom(store: Go2McapStore, seconds: float | None) -> None: + """Dead-reckoned IMU odometry — accel -> velocity -> position -> growing Path (drifts).""" + + def log_path(obs: Observation[Path]) -> None: + rr.set_time("time", timestamp=obs.ts) + rr.log("world/imu_odom_path", obs.data.to_rerun(color=(220, 90, 90))) + + gravity = gravity_bias(store) # calibrate gravity+bias on the stationary start + src = store.streams.imu.to_time(seconds) + ( + src.tap(progress(src.count(), "imu_odom")) + .scan_data((np.zeros(3), None), integrate_velocity(gravity)) # -> velocity (TwistStamped) + .scan_data((np.zeros(3), None), integrate_position) # -> position (PoseStamped) + .transform(throttle(0.1)) # thin the path after integrating at full IMU rate + .transform(accumulate_path) + .tap(log_path) + .drain() + ) + + +def lidar(store: Go2McapStore, seconds: float | None) -> None: + """Lidar point cloud, under the leg_odom transform (lidar -> base -> world).""" + + def log_lidar(obs: Observation[PointCloud2]) -> None: + rr.set_time("time", timestamp=obs.ts) + rr.log("world/leg_odom/lidar", obs.data.to_rerun()) + + src = store.streams.lidar.to_time(seconds) + rr.log("world/leg_odom/lidar", LIDAR_TO_BASE.to_rerun(frameless=True), static=True) + (src.tap(progress(src.count(), "lidar")).tap(log_lidar).drain()) + + +def _interp_pose( + tt: np.ndarray, pos: np.ndarray, quat: np.ndarray, t: float +) -> tuple[np.ndarray, np.ndarray]: + """LERP position + NLERP quaternion (xyzw) of a trajectory at scalar time t.""" + i = int(np.clip(np.searchsorted(tt, t), 1, len(tt) - 1)) + t0, t1 = tt[i - 1], tt[i] + f = 0.0 if t1 == t0 else float(np.clip((t - t0) / (t1 - t0), 0.0, 1.0)) + p = pos[i - 1] * (1 - f) + pos[i] * f + q0, q1 = quat[i - 1], quat[i].copy() + if float(q0 @ q1) < 0: + q1 = -q1 + q = q0 * (1 - f) + q1 * f + return p, q / np.linalg.norm(q) + + +def world_lidar(store: Go2McapStore, seconds: float | None) -> None: + from dimos.mapping.voxels import VoxelMapTransformer + + ext = LIDAR_TO_BASE # lidar -> base (standard Transform from extrinsics) + + # pre-load the leg-odom trajectory for per-cloud pose interpolation + odom = store.streams.odom.to_time(seconds).to_list() + tt = np.array([o.ts for o in odom]) + poses = [o.data.pose.pose for o in odom] + pos = np.array([[p.position.x, p.position.y, p.position.z] for p in poses]) + quat = np.array( + [[p.orientation.x, p.orientation.y, p.orientation.z, p.orientation.w] for p in poses] + ) + + def to_world(obs: Observation[PointCloud2]) -> PointCloud2: + p, q = _interp_pose(tt, pos, quat, obs.ts) + b2w = Transform.from_pose( + WORLD, + PoseStamped(ts=obs.ts, frame_id=WORLD, position=p.tolist(), orientation=q.tolist()), + ) + return obs.data.transform(b2w.apply(ext)) # lidar -> base -> world + + def log_voxels(obs: Observation[PointCloud2]) -> None: + rr.set_time("time", timestamp=obs.ts) + rr.log("world/world_lidar", obs.data.to_rerun()) + + src = store.streams.lidar.to_time(seconds) + ( + src.tap(progress(src.count(), "world_lidar")) + .map_data(to_world) # lidar cloud -> world-frame cloud + .transform(VoxelMapTransformer(emit_every=10, voxel_size=0.1)) # global voxel map + .tap(log_voxels) + .drain() + ) + + +def camera(store: Go2McapStore, seconds: float | None, hz: float) -> None: + """Front camera frames, throttled to ``hz`` (off by default; enable with --image). + + Throttling runs before ``obs.data``, so thinned frames never pay the jpeg + decode; frames that fail to decode (``None``) are skipped. + """ + + def log_image(obs: Observation[Image]) -> None: + if obs.data is None: # truncated/corrupt frame + return + rr.set_time("time", timestamp=obs.ts) + rr.log("world/camera", obs.data.to_rerun()) + + src = store.streams.color_image.to_time(seconds) + (src.tap(progress(src.count(), "camera")).transform(throttle(1.0 / hz)).tap(log_image).drain()) + + +# Add a source: write a (store, seconds) -> None function and append it. +PIPELINES: list[Callable[[Go2McapStore, float | None], None]] = [ + leg_odom, + imu_odom, + lidar, + world_lidar, +] + + +def main( + mcap: str = typer.Argument(..., help="Go2 .mcap (path or data-dir name)"), + out: str = typer.Option("go2_odom.rrd", "--out", help="Output .rrd"), + seconds: float = typer.Option(None, "--seconds", help="Only the first N seconds"), + image: bool = typer.Option(False, "--image", help="Also render the front camera"), + image_hz: float = typer.Option(10.0, "--image-hz", help="Camera frame rate when --image"), + no_gui: bool = typer.Option(False, "--no-gui", help="Write the .rrd but don't open the viewer"), +) -> None: + from dimos.visualization.rerun.init import rerun_init + + store = Go2McapStore(path=mcap) + rerun_init("go2_odom") # registers the turbo height colormap for PointCloud2.to_rerun + rr.save(out) + for pipeline in PIPELINES: + pipeline(store, seconds) + if image: + camera(store, seconds, image_hz) + rr.rerun_shutdown() + print(f"wrote {out}") + if not no_gui: + exe = shutil.which("rerun") + if exe: + subprocess.Popen([exe, out]) + else: + print(f" rerun viewer not on PATH; open manually:\n rerun {out}") + + +if __name__ == "__main__": + typer.run(main) diff --git a/dimos/robot/unitree/go2dds/codec.py b/dimos/robot/unitree/go2dds/codec.py new file mode 100644 index 0000000000..ebda8ca352 --- /dev/null +++ b/dimos/robot/unitree/go2dds/codec.py @@ -0,0 +1,117 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""DDS wire codecs: CDR bytes <-> message, keyed by DDS topic. + +A :class:`DdsCodec` is the bytes<->payload pair for one DDS message type. The +same codec decodes a recorded mcap message and a live DDS sample (both are CDR), +and its ``encode`` half publishes back to the wire — so this is shared by the +reader, :class:`~dimos.robot.unitree.go2dds.store.Go2McapStore`, and (later) a live +DDS bridge. It is distinct from memory2's storage codecs (pickle/lcm/jpeg); +they only coincide when an mcap is opened as a store. + +``GO2_CODECS`` is the Go2 channel set — the default registry today. +""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass, fields +import json +from typing import Any, Protocol, runtime_checkable + +from dimos.msgs.nav_msgs.Odometry import Odometry +from dimos.msgs.sensor_msgs.Image import Image +from dimos.msgs.sensor_msgs.Imu import Imu +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.robot.unitree.go2dds import cdr, ros +from dimos.robot.unitree.go2dds.msgs.ControlEvent import ControlEvent +from dimos.robot.unitree.go2dds.msgs.LowState import LowState +from dimos.robot.unitree.go2dds.msgs.SportModeState import SportModeState +from dimos.robot.unitree.go2dds.msgs.Telemetry import Telemetry + + +@runtime_checkable +class DdsCodec(Protocol): + """Codec between DDS wire bytes (CDR) and a payload message.""" + + @property + def payload_type(self) -> type: ... + + def decode(self, data: bytes) -> Any: ... + def encode(self, msg: Any) -> bytes: ... + + +@dataclass(frozen=True) +class CdrStructCodec: + """Codec for a fixed CDR struct spec (e.g. the Unitree custom msgs).""" + + payload_type: type # the spec dataclass; also the decoded payload type + + def decode(self, data: bytes) -> Any: + msg, end = cdr.decode(data, self.payload_type) + # Fixed-layout struct: leftover bytes mean the spec is wrong — fail loud. + assert end == len(data), f"{self.payload_type.__name__}: {end} != {len(data)} bytes" + return msg + + def encode(self, msg: Any) -> bytes: + raise NotImplementedError("CDR struct encode not implemented yet") + + +@dataclass(frozen=True) +class FnCodec: + """Codec wrapping a decode function (e.g. ROS wire -> dimos msg).""" + + payload_type: type + decoder: Callable[[bytes], Any] + + def decode(self, data: bytes) -> Any: + return self.decoder(data) + + def encode(self, msg: Any) -> bytes: + raise NotImplementedError(f"encode not implemented for {self.payload_type.__name__}") + + +@dataclass(frozen=True) +class JsonCodec: + """Codec for app-level JSON channels -> a dataclass. + + Keys absent from ``payload_type`` are dropped, so heterogeneous event logs + (e.g. ``control_log``) and future fields decode without error. + """ + + payload_type: type + + def decode(self, data: bytes) -> Any: + d = json.loads(data) + names = {f.name for f in fields(self.payload_type)} + return self.payload_type(**{k: v for k, v in d.items() if k in names}) + + def encode(self, msg: Any) -> bytes: + from dataclasses import asdict + + return json.dumps(asdict(msg)).encode() + + +# Go2 channel topic -> codec. The default registry (only platform we have today). +GO2_CODECS: dict[str, DdsCodec] = { + "rt/utlidar/cloud": FnCodec(PointCloud2, ros.decode_pointcloud2), + "rt/utlidar/imu": FnCodec(Imu, ros.decode_imu), + "rt/utlidar/robot_odom": FnCodec(Odometry, ros.decode_odometry), + "rt/frontvideo": FnCodec(Image, ros.decode_compressed_image), + "rt/lowstate": CdrStructCodec(LowState), + "rt/sportmodestate": CdrStructCodec(SportModeState), + "telemetry": JsonCodec(Telemetry), + "control_log": JsonCodec(ControlEvent), +} diff --git a/dimos/robot/unitree/go2dds/extrinsics.py b/dimos/robot/unitree/go2dds/extrinsics.py new file mode 100644 index 0000000000..8db4f79753 --- /dev/null +++ b/dimos/robot/unitree/go2dds/extrinsics.py @@ -0,0 +1,61 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Go2 L1 lidar extrinsic (base_link <- lidar) and camera mount. + +The L1 is mounted nearly upside-down: raw lidar +z points at the floor. The +official Unitree config (go2_l1_lidar.yaml) encodes this as base->lidar +``[0.28216, 0, -0.02467, roll=0, pitch=2.88, yaw=0]`` plus a separate +``rotate_yaw_bias`` (~-123 deg, calibrated to the front-leg position). EXT_R below +is that flip, leveled to the averaged ground normal over several stationary windows +(floor tilt ~1-2 deg, floor below the robot) and yawed so the map heading matches +the trajectory. Validated against the official "default imu reading" +[yaw -57.9, pitch -8.1, roll -167.3] (agrees to a few degrees). +""" + +import numpy as np + +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.geometry_msgs.Vector3 import Vector3 + +# base_link <- lidar rotation (lidar points -> base frame: p_base = EXT_R @ p_lidar + EXT_T) +EXT_R = np.array( + [ + [0.504486, -0.843018, 0.186588], + [-0.853668, -0.519391, -0.038544], + [0.129405, -0.139840, -0.981682], + ], + dtype=np.float64, +) +EXT_T = np.array([0.28216, 0.0, -0.02467], dtype=np.float64) + +# base_link -> camera_optical (from dimos GO2 connection BASE_TO_OPTICAL): +# translate 0.3m forward, then rotate into the optical frame. +CAM_T = np.array([0.30, 0.0, 0.0], dtype=np.float64) +CAM_Q = np.array([-0.5, 0.5, -0.5, 0.5], dtype=np.float64) # xyzw + +# Same mounts as standard Transform msgs (typed; carry frame ids; have to_rerun). +LIDAR_TO_BASE = Transform( + translation=Vector3(EXT_T), + rotation=Quaternion.from_rotation_matrix(EXT_R), + frame_id="base_link", + child_frame_id="lidar", +) +BASE_TO_CAMERA = Transform( + translation=Vector3(CAM_T), + rotation=Quaternion(CAM_Q), + frame_id="base_link", + child_frame_id="camera_optical", +) diff --git a/dimos/robot/unitree/go2dds/msgs/BmsState.py b/dimos/robot/unitree/go2dds/msgs/BmsState.py new file mode 100644 index 0000000000..070c075cd3 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/BmsState.py @@ -0,0 +1,46 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::BmsState_ (battery management system)""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class BmsState: + version_high: int + version_low: int + status: int + soc: int # state of charge, % + current: int + cycle: int + bq_ntc: np.ndarray # u8[2], °C + mcu_ntc: np.ndarray # u8[2], °C + cell_vol: np.ndarray # u16[15], mV + + __cdr_fields__ = [ + ("version_high", "u8"), + ("version_low", "u8"), + ("status", "u8"), + ("soc", "u8"), + ("current", "i32"), + ("cycle", "u16"), + ("bq_ntc", ("array", "u8", 2)), + ("mcu_ntc", ("array", "u8", 2)), + ("cell_vol", ("array", "u16", 15)), + ] diff --git a/dimos/robot/unitree/go2dds/msgs/ControlEvent.py b/dimos/robot/unitree/go2dds/msgs/ControlEvent.py new file mode 100644 index 0000000000..90b9a2bef7 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/ControlEvent.py @@ -0,0 +1,36 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""go2/ControlEvent — app-level JSON control-log events (topic ``control_log``). + +A tagged event stream keyed by ``type`` (e.g. ``velocity_input`` carries +``lx/ly/az``; ``brightness`` carries ``level``). Fields beyond ``type`` are +optional; :class:`~dimos.robot.unitree.go2dds.codec.JsonCodec` drops keys it +doesn't recognise, so new event shapes won't break decoding. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from dimos.robot.unitree.go2dds.msgs.base import PrettyMsg + + +@dataclass(repr=False) +class ControlEvent(PrettyMsg): + type: str + lx: float | None = None # velocity_input + ly: float | None = None + az: float | None = None + level: int | None = None # brightness diff --git a/dimos/robot/unitree/go2dds/msgs/IMUState.py b/dimos/robot/unitree/go2dds/msgs/IMUState.py new file mode 100644 index 0000000000..99b01699bc --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/IMUState.py @@ -0,0 +1,38 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::IMUState_""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class IMUState: + quaternion: np.ndarray # f32[4], w x y z + gyroscope: np.ndarray # f32[3] + accelerometer: np.ndarray # f32[3] + rpy: np.ndarray # f32[3] + temperature: int + + __cdr_fields__ = [ + ("quaternion", ("array", "f32", 4)), + ("gyroscope", ("array", "f32", 3)), + ("accelerometer", ("array", "f32", 3)), + ("rpy", ("array", "f32", 3)), + ("temperature", "u8"), + ] diff --git a/dimos/robot/unitree/go2dds/msgs/LowState.py b/dimos/robot/unitree/go2dds/msgs/LowState.py new file mode 100644 index 0000000000..efa9e6d020 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/LowState.py @@ -0,0 +1,77 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::LowState_ — full low-level robot state (rt/lowstate).""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + +from dimos.robot.unitree.go2dds.msgs.base import PrettyMsg +from dimos.robot.unitree.go2dds.msgs.BmsState import BmsState +from dimos.robot.unitree.go2dds.msgs.IMUState import IMUState +from dimos.robot.unitree.go2dds.msgs.MotorState import MotorState + + +@dataclass(repr=False) +class LowState(PrettyMsg): + head: np.ndarray # u8[2] + level_flag: int + frame_reserve: int + sn: np.ndarray # u32[2] + version: np.ndarray # u32[2] + bandwidth: int + imu_state: IMUState + motor_state: list[MotorState] # [20] + bms_state: BmsState + foot_force: np.ndarray # i16[4] + foot_force_est: np.ndarray # i16[4] + tick: int + wireless_remote: np.ndarray # u8[40] + bit_flag: int + adc_reel: float + temperature_ntc1: int + temperature_ntc2: int + power_v: float + power_a: float + fan_frequency: np.ndarray # u16[4] + reserve: int + # NOTE: the SDK's trailing `crc` (uint32) is absent on this Go2's firmware + # wire format — verified against the recording (body ends after `reserve`). + + __cdr_fields__ = [ + ("head", ("array", "u8", 2)), + ("level_flag", "u8"), + ("frame_reserve", "u8"), + ("sn", ("array", "u32", 2)), + ("version", ("array", "u32", 2)), + ("bandwidth", "u16"), + ("imu_state", IMUState), + ("motor_state", ("array", MotorState, 20)), + ("bms_state", BmsState), + ("foot_force", ("array", "i16", 4)), + ("foot_force_est", ("array", "i16", 4)), + ("tick", "u32"), + ("wireless_remote", ("array", "u8", 40)), + ("bit_flag", "u8"), + ("adc_reel", "f32"), + ("temperature_ntc1", "u8"), + ("temperature_ntc2", "u8"), + ("power_v", "f32"), + ("power_a", "f32"), + ("fan_frequency", ("array", "u16", 4)), + ("reserve", "u32"), + ] diff --git a/dimos/robot/unitree/go2dds/msgs/MotorState.py b/dimos/robot/unitree/go2dds/msgs/MotorState.py new file mode 100644 index 0000000000..55f851bf67 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/MotorState.py @@ -0,0 +1,50 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::MotorState_""" + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class MotorState: + mode: int + q: float + dq: float + ddq: float + tau_est: float + q_raw: float + dq_raw: float + ddq_raw: float + temperature: int + lost: int + reserve: np.ndarray # u32[2] + + __cdr_fields__ = [ + ("mode", "u8"), + ("q", "f32"), + ("dq", "f32"), + ("ddq", "f32"), + ("tau_est", "f32"), + ("q_raw", "f32"), + ("dq_raw", "f32"), + ("ddq_raw", "f32"), + ("temperature", "u8"), + ("lost", "u32"), + ("reserve", ("array", "u32", 2)), + ] diff --git a/dimos/robot/unitree/go2dds/msgs/PathPoint.py b/dimos/robot/unitree/go2dds/msgs/PathPoint.py new file mode 100644 index 0000000000..0541a68988 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/PathPoint.py @@ -0,0 +1,40 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::PathPoint_""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class PathPoint: + t_from_start: float + x: float + y: float + yaw: float + vx: float + vy: float + vyaw: float + + __cdr_fields__ = [ + ("t_from_start", "f32"), + ("x", "f32"), + ("y", "f32"), + ("yaw", "f32"), + ("vx", "f32"), + ("vy", "f32"), + ("vyaw", "f32"), + ] diff --git a/dimos/robot/unitree/go2dds/msgs/SportModeState.py b/dimos/robot/unitree/go2dds/msgs/SportModeState.py new file mode 100644 index 0000000000..1074c87f87 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/SportModeState.py @@ -0,0 +1,76 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::SportModeState_ — high-level sport state (rt/sportmodestate).""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +import numpy as np + +from dimos.robot.unitree.go2dds.msgs.base import PrettyMsg +from dimos.robot.unitree.go2dds.msgs.IMUState import IMUState +from dimos.robot.unitree.go2dds.msgs.PathPoint import PathPoint +from dimos.robot.unitree.go2dds.msgs.TimeSpec import TimeSpec + + +@dataclass(repr=False) +class SportModeState(PrettyMsg): + stamp: TimeSpec + error_code: int + imu_state: IMUState + mode: int + progress: float + gait_type: int + foot_raise_height: float + position: np.ndarray # f32[3] + body_height: float + velocity: np.ndarray # f32[3] + yaw_speed: float + range_obstacle: np.ndarray # f32[4] + foot_force: np.ndarray # i16[4] + foot_position_body: np.ndarray # f32[12] + foot_speed_body: np.ndarray # f32[12] + path_point: list[PathPoint] # [10] + + __cdr_fields__ = [ + ("stamp", TimeSpec), + ("error_code", "u32"), + ("imu_state", IMUState), + ("mode", "u8"), + ("progress", "f32"), + ("gait_type", "u8"), + ("foot_raise_height", "f32"), + ("position", ("array", "f32", 3)), + ("body_height", "f32"), + ("velocity", ("array", "f32", 3)), + ("yaw_speed", "f32"), + ("range_obstacle", ("array", "f32", 4)), + ("foot_force", ("array", "i16", 4)), + ("foot_position_body", ("array", "f32", 12)), + ("foot_speed_body", ("array", "f32", 12)), + ("path_point", ("array", PathPoint, 10)), + ] + + def to_rerun(self) -> Any: + """Sport-mode pose as a rerun Transform3D (position + body orientation).""" + import rerun as rr + + w, x, y, z = (float(v) for v in self.imu_state.quaternion) # Unitree order: wxyz + return rr.Transform3D( + translation=[float(v) for v in self.position], + rotation=rr.Quaternion(xyzw=[x, y, z, w]), + ) diff --git a/dimos/robot/unitree/go2dds/msgs/Telemetry.py b/dimos/robot/unitree/go2dds/msgs/Telemetry.py new file mode 100644 index 0000000000..c76b28bad0 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/Telemetry.py @@ -0,0 +1,44 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""go2/Telemetry — app-level JSON status packet (topic ``telemetry``).""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any + +from dimos.robot.unitree.go2dds.msgs.base import PrettyMsg + + +@dataclass(repr=False) +class Telemetry(PrettyMsg): + type: str + battery: float # fraction 0..1 + body_h: float + current_a: float + imu_hz: float + lidar: bool + lidar_hz: float + lowstate_hz: float + mode: int + obstacle: bool + odom_hz: float + points_per_s: float + rage: bool + recording: dict[str, Any] # {active, bytes, duration_s, file} + rss_mb: float + sportmode_hz: float + vel: list[float] # [vx, vy] + yaw: float diff --git a/dimos/robot/unitree/go2dds/msgs/TimeSpec.py b/dimos/robot/unitree/go2dds/msgs/TimeSpec.py new file mode 100644 index 0000000000..388f2d8583 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/TimeSpec.py @@ -0,0 +1,27 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""unitree_go::msg::dds_::TimeSpec_""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass +class TimeSpec: + sec: int + nanosec: int + + __cdr_fields__ = [("sec", "i32"), ("nanosec", "u32")] diff --git a/dimos/robot/unitree/go2dds/msgs/base.py b/dimos/robot/unitree/go2dds/msgs/base.py new file mode 100644 index 0000000000..d5f7231f47 --- /dev/null +++ b/dimos/robot/unitree/go2dds/msgs/base.py @@ -0,0 +1,52 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pretty multi-line ``__repr__`` for the Go2 message dataclasses. + +Mixin for top-level messages; the formatter recurses through nested dataclasses, +lists, and numpy arrays, so nested types need not inherit it. +""" + +from __future__ import annotations + +from dataclasses import fields, is_dataclass +from typing import Any + +import numpy as np + + +def _fmt(v: Any, indent: int) -> str: + pad, close = " " * (indent + 1), " " * indent + if is_dataclass(v) and not isinstance(v, type): + body = "".join( + f"\n{pad}{f.name}={_fmt(getattr(v, f.name), indent + 1)}," for f in fields(v) + ) + return f"{type(v).__name__}({body}\n{close})" + if isinstance(v, np.ndarray): + a = np.round(v, 4) if v.dtype.kind == "f" else v + return np.array2string(a, separator=", ", max_line_width=100) + if isinstance(v, list): + if v and is_dataclass(v[0]): + return "[" + "".join(f"\n{pad}{_fmt(x, indent + 1)}," for x in v) + f"\n{close}]" + return repr(v) + if isinstance(v, float): + return f"{v:.4f}" + return repr(v) + + +class PrettyMsg: + """Mixin giving a readable multi-line ``__repr__``. Use with ``@dataclass(repr=False)``.""" + + def __repr__(self) -> str: + return _fmt(self, 0) diff --git a/dimos/robot/unitree/go2dds/ros.py b/dimos/robot/unitree/go2dds/ros.py new file mode 100644 index 0000000000..759b56ed72 --- /dev/null +++ b/dimos/robot/unitree/go2dds/ros.py @@ -0,0 +1,259 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Decode the standard-ROS DDS channels into ``dimos.msgs`` types. + +The wire layouts are declared as ``__cdr_fields__`` specs and walked by the +generic :mod:`cdr` decoder (no hand-rolled byte cursors). A thin per-type +adapter maps the decoded wire struct into the dimos message — only the parts +that genuinely differ from a field copy (point-buffer reinterpretation, jpeg +decode, pose nesting) live there. +""" + +# TODO this file needs to go away, dimos/msgs are structurally the same as +# these messages here so we will write an automatic translator, temporary so +# we can iterate on go2 dds research, see if it's viable at all +# +# TODO pointcloud has timestamps and intensities, we drop those on LCM round trip +# and our pointcloud2 message doesn't support arbitrary fields per point, we need +# to implement those + +from __future__ import annotations + +from dataclasses import dataclass + +import numpy as np + +from dimos.msgs.geometry_msgs.Pose import Pose +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.nav_msgs.Odometry import Odometry +from dimos.msgs.sensor_msgs.Image import Image, ImageFormat +from dimos.msgs.sensor_msgs.Imu import Imu +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.robot.unitree.go2dds import cdr + + +# Shared wire specs (Header/Time) reused by the per-message layouts below. +@dataclass +class _Time: + sec: int + nanosec: int + + __cdr_fields__ = [("sec", "i32"), ("nanosec", "u32")] + + +@dataclass +class _Header: + stamp: _Time + frame_id: str + + __cdr_fields__ = [("stamp", _Time), ("frame_id", "string")] + + +def _ts(h: _Header) -> float: + return h.stamp.sec + h.stamp.nanosec * 1e-9 + + +# sensor_msgs/Imu +@dataclass +class _ImuWire: + header: _Header + orientation: np.ndarray # f64[4] xyzw + orientation_covariance: np.ndarray # f64[9] + angular_velocity: np.ndarray # f64[3] + angular_velocity_covariance: np.ndarray + linear_acceleration: np.ndarray # f64[3] + linear_acceleration_covariance: np.ndarray + + __cdr_fields__ = [ + ("header", _Header), + ("orientation", ("array", "f64", 4)), + ("orientation_covariance", ("array", "f64", 9)), + ("angular_velocity", ("array", "f64", 3)), + ("angular_velocity_covariance", ("array", "f64", 9)), + ("linear_acceleration", ("array", "f64", 3)), + ("linear_acceleration_covariance", ("array", "f64", 9)), + ] + + +def decode_imu(buf: bytes) -> Imu: + w: _ImuWire = cdr.decode(buf, _ImuWire)[0] + # Unitree fills orientation wxyz even in this sensor_msgs/Imu — reorder to xyzw + # (verified: rotating accel by this lands gravity on +z in leg-odom-stationary windows). + qw, qx, qy, qz = (float(v) for v in w.orientation) + return Imu( + orientation=Quaternion(qx, qy, qz, qw), + angular_velocity=Vector3(w.angular_velocity.tolist()), + linear_acceleration=Vector3(w.linear_acceleration.tolist()), + orientation_covariance=w.orientation_covariance.tolist(), + angular_velocity_covariance=w.angular_velocity_covariance.tolist(), + linear_acceleration_covariance=w.linear_acceleration_covariance.tolist(), + frame_id=w.header.frame_id, + ts=_ts(w.header), + ) + + +# nav_msgs/Odometry +@dataclass +class _PoseWire: + position: np.ndarray # f64[3] + orientation: np.ndarray # f64[4] xyzw + + __cdr_fields__ = [("position", ("array", "f64", 3)), ("orientation", ("array", "f64", 4))] + + +@dataclass +class _PoseWithCov: + pose: _PoseWire + covariance: np.ndarray # f64[36] + + __cdr_fields__ = [("pose", _PoseWire), ("covariance", ("array", "f64", 36))] + + +@dataclass +class _TwistWire: + linear: np.ndarray # f64[3] + angular: np.ndarray # f64[3] + + __cdr_fields__ = [("linear", ("array", "f64", 3)), ("angular", ("array", "f64", 3))] + + +@dataclass +class _TwistWithCov: + twist: _TwistWire + covariance: np.ndarray # f64[36] + + __cdr_fields__ = [("twist", _TwistWire), ("covariance", ("array", "f64", 36))] + + +@dataclass +class _OdomWire: + header: _Header + child_frame_id: str + pose: _PoseWithCov + twist: _TwistWithCov + + __cdr_fields__ = [ + ("header", _Header), + ("child_frame_id", "string"), + ("pose", _PoseWithCov), + ("twist", _TwistWithCov), + ] + + +def decode_odometry(buf: bytes) -> Odometry: + w: _OdomWire = cdr.decode(buf, _OdomWire)[0] + pose = Pose() + pose.position = Vector3(w.pose.pose.position.tolist()) + pose.orientation = Quaternion(*w.pose.pose.orientation.tolist()) + twist = Twist() + twist.linear = Vector3(w.twist.twist.linear.tolist()) + twist.angular = Vector3(w.twist.twist.angular.tolist()) + return Odometry( + ts=_ts(w.header), + frame_id=w.header.frame_id, + child_frame_id=w.child_frame_id, + pose=pose, + twist=twist, + ) + + +# sensor_msgs/PointCloud2 +@dataclass +class _PointField: + name: str + offset: int + datatype: int + count: int + + __cdr_fields__ = [ + ("name", "string"), + ("offset", "u32"), + ("datatype", "u8"), + ("count", "u32"), + ] + + +@dataclass +class _Pc2Wire: + header: _Header + height: int + width: int + fields: list[_PointField] + is_bigendian: int + point_step: int + row_step: int + data: np.ndarray # u8[] + is_dense: int + + __cdr_fields__ = [ + ("header", _Header), + ("height", "u32"), + ("width", "u32"), + ("fields", ("seq", _PointField)), + ("is_bigendian", "u8"), + ("point_step", "u32"), + ("row_step", "u32"), + ("data", ("seq", "u8")), + ("is_dense", "u8"), + ] + + +# ROS PointField datatype code -> numpy dtype +_PF_DT = {1: " PointCloud2: + w: _Pc2Wire = cdr.decode(buf, _Pc2Wire)[0] + ts, frame = _ts(w.header), w.header.frame_id + if w.point_step == 0 or w.data.size < w.point_step: + return PointCloud2.from_numpy(np.empty((0, 3), np.float32), frame, ts) + dt = np.dtype( + { + "names": [f.name for f in w.fields], + "formats": [_PF_DT[f.datatype] for f in w.fields], + "offsets": [f.offset for f in w.fields], + "itemsize": w.point_step, + } + ) + arr = w.data.view(dt) + xyz = np.stack([arr["x"], arr["y"], arr["z"]], axis=-1).astype(np.float32) + inten = arr["intensity"].astype(np.float32) if dt.names and "intensity" in dt.names else None + return PointCloud2.from_numpy(xyz, frame, ts, inten) + + +# sensor_msgs/CompressedImage +@dataclass +class _CImgWire: + header: _Header + format: str + data: np.ndarray # u8[] + + __cdr_fields__ = [ + ("header", _Header), + ("format", "string"), + ("data", ("seq", "u8")), + ] + + +def decode_compressed_image(buf: bytes) -> Image | None: + import cv2 + + w: _CImgWire = cdr.decode(buf, _CImgWire)[0] + bgr = cv2.imdecode(w.data, cv2.IMREAD_COLOR) + if bgr is None: + return None + return Image.from_numpy(bgr, ImageFormat.BGR, w.header.frame_id, _ts(w.header)) diff --git a/dimos/robot/unitree/go2dds/store.py b/dimos/robot/unitree/go2dds/store.py new file mode 100644 index 0000000000..991e2fae1b --- /dev/null +++ b/dimos/robot/unitree/go2dds/store.py @@ -0,0 +1,57 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Open a Go2 DDS mcap directly as a read-only memory2 store. + + from dimos.robot.unitree.go2dds.store import Go2McapStore + + store = Go2McapStore(path="go2_china_office_indoor.mcap") + print(store.list_streams()) + for obs in store.streams.lidar.limit(5): + print(obs.ts, obs.data) # obs.data is a dimos PointCloud2 + +Thin Go2 wiring over the generic :class:`dimos.memory2.store.mcap.McapStore`: +supplies the Go2 codec set and stream-name map, and resolves the path through +the repo data dir / LFS. +""" + +from __future__ import annotations + +from typing import Any + +from dimos.memory2.store.mcap import McapStore +from dimos.robot.unitree.go2dds.codec import GO2_CODECS +from dimos.utils.data import resolve_named_path + +# memory2 stream name -> Go2 DDS topic. +STREAMS: dict[str, str] = { + "lidar": "rt/utlidar/cloud", + "imu": "rt/utlidar/imu", + "odom": "rt/utlidar/robot_odom", + "color_image": "rt/frontvideo", + "lowstate": "rt/lowstate", + "sportmodestate": "rt/sportmodestate", +} + + +class Go2McapStore(McapStore): + """``McapStore`` preset with the Go2 codecs, stream names, and path resolution.""" + + def __init__(self, *, path: str, **kwargs: Any) -> None: + super().__init__( + path=str(resolve_named_path(path, ".mcap")), + codecs=GO2_CODECS, + streams=STREAMS, + **kwargs, + ) diff --git a/dimos/robot/unitree/go2dds/test_store.py b/dimos/robot/unitree/go2dds/test_store.py new file mode 100644 index 0000000000..450d5d1812 --- /dev/null +++ b/dimos/robot/unitree/go2dds/test_store.py @@ -0,0 +1,145 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Open a real Go2 DDS mcap as a memory2 store (LFS-backed). + +One test per message type — each prints a sample and checks the decode. +""" + +from __future__ import annotations + +import importlib.util + +import numpy as np +import pytest + +from dimos.msgs.nav_msgs.Odometry import Odometry +from dimos.msgs.sensor_msgs.Image import Image +from dimos.msgs.sensor_msgs.Imu import Imu +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.robot.unitree.go2dds.msgs.ControlEvent import ControlEvent +from dimos.robot.unitree.go2dds.msgs.LowState import LowState +from dimos.robot.unitree.go2dds.msgs.SportModeState import SportModeState +from dimos.robot.unitree.go2dds.msgs.Telemetry import Telemetry +from dimos.robot.unitree.go2dds.store import Go2McapStore + +pytestmark = [ + pytest.mark.self_hosted, + pytest.mark.skipif(importlib.util.find_spec("mcap") is None, reason="mcap not installed"), +] + + +@pytest.fixture(scope="module") +def store() -> Go2McapStore: + return Go2McapStore(path="go2_china_office_indoor.mcap") + + +def test_lists_streams(store: Go2McapStore) -> None: + """Every decodable channel present in the file is listed (CDR + JSON).""" + print("\n" + store.summary()) + assert set(store.list_streams()) == { + "lidar", + "imu", + "odom", + "color_image", + "lowstate", + "sportmodestate", + "control_log", + "telemetry", + } + assert store.streams.lowstate.count() > 0 + + +def test_lidar(store: Go2McapStore) -> None: + pc = store.streams.lidar.first().data + xyz = pc.points_f32() + print( + f"\nPointCloud2: {xyz.shape[0]} pts frame={pc.frame_id!r} " + f"range=[{np.linalg.norm(xyz[:, :3], axis=1).min():.2f}, " + f"{np.linalg.norm(xyz[:, :3], axis=1).max():.2f}] m" + ) + assert isinstance(pc, PointCloud2) + assert xyz.shape[1] == 3 and len(xyz) > 0 + + +def test_imu(store: Go2McapStore) -> None: + imu = store.streams.imu.first().data + q = imu.orientation + print( + f"\nImu: |q|={np.linalg.norm([q.x, q.y, q.z, q.w]):.4f} " + f"acc=({imu.linear_acceleration.x:.2f}, {imu.linear_acceleration.y:.2f}, " + f"{imu.linear_acceleration.z:.2f}) frame={imu.frame_id!r}" + ) + assert isinstance(imu, Imu) + assert abs(imu.linear_acceleration.z) == pytest.approx(9.8, abs=0.5) # gravity + + +def test_odom(store: Go2McapStore) -> None: + odom = store.streams.odom.first().data + p = odom.pose.pose.position + print( + f"\nOdometry: pos=({p.x:.2f}, {p.y:.2f}, {p.z:.2f}) " + f"{odom.frame_id!r} -> {odom.child_frame_id!r}" + ) + assert isinstance(odom, Odometry) + assert odom.child_frame_id == "base_link" + + +def test_color_image(store: Go2McapStore) -> None: + img = store.streams.color_image.first().data + arr = img.as_numpy() + print(f"\nImage: {arr.shape} frame={img.frame_id!r}") + assert isinstance(img, Image) + assert arr.ndim == 3 + + +def test_lowstate(store: Go2McapStore) -> None: + ls = store.streams.lowstate.first().data + print(f"\n{ls}") + assert isinstance(ls, LowState) + assert len(ls.motor_state) == 20 + assert np.isclose(np.linalg.norm(ls.imu_state.quaternion), 1.0, atol=1e-2) + + +def test_sportmodestate(store: Go2McapStore) -> None: + sm = store.streams.sportmodestate.first().data + print(f"\n{sm}") + assert isinstance(sm, SportModeState) + assert sm.body_height > 0 + + +def test_control_log(store: Go2McapStore) -> None: + ev = store.streams.control_log.first().data + print(f"\n{ev}") + assert isinstance(ev, ControlEvent) + assert isinstance(ev.type, str) + + +def test_telemetry(store: Go2McapStore) -> None: + t = store.streams.telemetry.first().data + print(f"\n{t}") + assert isinstance(t, Telemetry) + assert 0.0 <= t.battery <= 1.0 + + +def test_read_contract(store: Go2McapStore) -> None: + """count / first / last / limit / offset / order_by / time filter.""" + s = store.streams.odom + first, last = s.first(), s.last() + assert first.ts < last.ts + assert len(s.limit(3).to_list()) == 3 + assert [o.id for o in s.offset(2).limit(2).to_list()] == [2, 3] + # mcap is ts-ascending, so desc just reverses — top item is the last obs + assert s.order_by("ts", desc=True).first().ts == last.ts + assert s.after(first.ts + 10).first().ts > first.ts + 10 diff --git a/dimos/utils/cli/map.py b/dimos/utils/cli/map.py index c162a015ef..7236544f6b 100644 --- a/dimos/utils/cli/map.py +++ b/dimos/utils/cli/map.py @@ -17,7 +17,6 @@ from collections.abc import Callable, Iterable import math from pathlib import Path -import time from typing import TYPE_CHECKING, Any import rerun as rr @@ -129,37 +128,6 @@ def prepared() -> Iterable[Observation[PointCloud2]]: return result.data if result is not None else None -def progress(total: int, label: str = "") -> Callable[[Observation[Any]], None]: - seen = 0 - wall_start: float | None = None - last_wall: float | None = None - first_ts: float | None = None - - def _progress(obs: Observation[Any]) -> None: - nonlocal seen, wall_start, last_wall, first_ts - now = time.monotonic() - if wall_start is None: - wall_start = now - first_ts = obs.ts - assert first_ts is not None # narrowed by the same `if` above - frame_ms = (now - last_wall) * 1000 if last_wall is not None else 0.0 - last_wall = now - seen += 1 - pct = 100 * seen // total if total else 100 - wall = now - wall_start - data = obs.ts - first_ts - speed = data / wall if wall > 0 else 0.0 - end = "\n" if seen >= total else "" - prefix = f"{label} " if label else "" - print( - f"\r{prefix}{pct:>3}% [{seen}/{total}] {data:.1f}s ({speed:.1f} x rt) {frame_ms:.0f}ms/frame", - end=end, - flush=True, - ) - - return _progress - - def main( dataset: str = typer.Argument(..., help="Dataset .db: bare name (cwd or data/) or path"), voxel: float = typer.Option(0.05, "--voxel", help="Voxel size for the rebuild"), @@ -228,6 +196,7 @@ def main( from dimos.mapping.loop_closure.pgo import PGO from dimos.memory2.store.sqlite import SqliteStore from dimos.memory2.transform import QualityWindow, SpeedLimit + from dimos.memory2.utils.progress import progress from dimos.memory2.vis.color import Color from dimos.msgs.geometry_msgs.Transform import Transform from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo diff --git a/pyproject.toml b/pyproject.toml index 5cd748fbfa..a1574cc5bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -239,6 +239,7 @@ unitree-dds = [ "dimos[unitree]", "unitree-sdk2py-dimos>=1.0.2", "cyclonedds>=0.10.5", + "mcap>=1.2.0", # decode Go2 DDS mcap recordings (go2dds store) ] manipulation = [ @@ -445,6 +446,10 @@ lint = [ tests-self-hosted = [ {include-group = "tests"}, "dimos[agents,perception,manipulation,sim,unitree,misc]", + # go2dds store tests decode mcap (pure-Python). The full `unitree-dds` extra + # pulls `cyclonedds`, whose wheel needs a CycloneDDS C lib the ros-dev image + # doesn't expose to the build — and these tests never open a live DDS link. + "mcap>=1.2.0", # Needed to compile the in-tree extensions. "pybind11>=2.12", ] @@ -535,6 +540,8 @@ module = [ "faster_whisper", "geometry_msgs.*", "lazy_loader", + "mcap", + "mcap.*", "mujoco", "mujoco_playground.*", "nav_msgs.*", diff --git a/uv.lock b/uv.lock index 01b754e0b9..7644b4ff30 100644 --- a/uv.lock +++ b/uv.lock @@ -2187,6 +2187,7 @@ unitree-dds = [ { name = "langchain-openai" }, { name = "langchain-text-splitters" }, { name = "lap" }, + { name = "mcap" }, { name = "moondream" }, { name = "ollama" }, { name = "omegaconf" }, @@ -2330,6 +2331,7 @@ tests-self-hosted = [ { name = "langchain-core" }, { name = "langchain-openai" }, { name = "lap" }, + { name = "mcap" }, { name = "md-babel-py" }, { name = "moondream" }, { name = "mujoco" }, @@ -2415,6 +2417,7 @@ requires-dist = [ { name = "llvmlite", specifier = ">=0.42.0" }, { name = "lz4", specifier = ">=4.4.5" }, { name = "matplotlib", marker = "extra == 'manipulation'", specifier = ">=3.7.1" }, + { name = "mcap", marker = "extra == 'unitree-dds'", specifier = ">=1.2.0" }, { name = "moondream", marker = "extra == 'perception'" }, { name = "mujoco", marker = "extra == 'sim'", specifier = ">=3.3.4" }, { name = "numba", specifier = ">=0.60.0" }, @@ -2620,6 +2623,7 @@ tests-self-hosted = [ { name = "langchain-core", specifier = "==1.3.3" }, { name = "langchain-openai", specifier = ">=1,<2" }, { name = "lap", specifier = ">=0.5.12" }, + { name = "mcap", specifier = ">=1.2.0" }, { name = "md-babel-py", specifier = ">=1.2.0" }, { name = "moondream" }, { name = "mujoco", specifier = ">=3.3.4" }, @@ -5441,6 +5445,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/af/33/ee4519fa02ed11a94aef9559552f3b17bb863f2ecfe1a35dc7f548cde231/matplotlib_inline-0.2.1-py3-none-any.whl", hash = "sha256:d56ce5156ba6085e00a9d54fead6ed29a9c47e215cd1bba2e976ef39f5710a76", size = 9516, upload-time = "2025-10-23T09:00:20.675Z" }, ] +[[package]] +name = "mcap" +version = "1.3.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "lz4" }, + { name = "zstandard" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/8b/38/8bd73953b9c37dd7a2c590e72ab4fa4682a43864fe1d55f7209f2536fa64/mcap-1.3.1.tar.gz", hash = "sha256:2878879a786021aa7f7f36319276396a778717ccd013b2191fe94d37572d7551", size = 21676, upload-time = "2025-12-24T21:31:35.476Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/63/22/dad47e86047344f110a9d589d90de49d9c54db7a7ea4f0ef91fb3e8e9f3f/mcap-1.3.1-py3-none-any.whl", hash = "sha256:9098685d67288a8087166504cf4adf617cfa8639bb60e936af113f62c11c293f", size = 20678, upload-time = "2025-12-24T21:31:33.959Z" }, +] + [[package]] name = "mccabe" version = "0.7.0"