-
Notifications
You must be signed in to change notification settings - Fork 666
GO2 MCAP/DDS integration as memory2 backend #2314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
leshy
wants to merge
21
commits into
main
Choose a base branch
from
feat/ivan/go2dds
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,109
−45
Open
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
40ca0b6
first pass on go2 dds decoder
leshy 0ccf7ed
mem2 mcap store backend
leshy 9d7e611
mcap memory2 store, go2 custom dds msg defs
leshy d1a613a
type fixes
leshy 076c573
lidar rendering
leshy 13754b9
integrating IMU, global lidar voxelizer
leshy 08a777b
initial sketch renderer wrap
leshy 01546e9
cleanup
leshy 3aa8e62
dimos mem rerun cli + cdr type fix
leshy 67d6489
Merge remote-tracking branch 'origin/main' into feat/ivan/go2dds
leshy 1108b01
deps: add mcap to the unitree extra
leshy 8bd60d8
mcap as unitree-dds dep; run go2dds tests on self-hosted
leshy 47da9b3
strip duplicate license headers in go2dds files
leshy 3f81927
removed go2dds_data1
leshy 8120d6d
ci: use mcap (not unitree-dds) in tests-self-hosted
leshy aa34701
address PR review: None-frame guard, lidar annotation, cdr end check
leshy aa225c6
go2dds render: --image camera pipeline + world_lidar
leshy d9ea9e7
small notes on ros.py
leshy 4d4451b
Merge remote-tracking branch 'origin/main' into feat/ivan/go2dds
leshy 47f37f2
Imu.to_rerun: render orientation as TransformAxes3D
leshy 99ca629
Merge remote-tracking branch 'origin/main' into feat/ivan/go2dds
leshy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Git LFS file not shown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # Copyright 2026 Dimensional Inc. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """``dimos mem`` — memory2 store commands.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import typer | ||
|
|
||
| mem_app = typer.Typer(help="memory2 store commands", no_args_is_help=True) | ||
|
|
||
|
|
||
| @mem_app.command() | ||
| def rerun( | ||
| path: str = typer.Argument(..., help="Store to render (.mcap path/name or .db)"), | ||
| out: str = typer.Option(None, "--out", help="Output .rrd (default: alongside the source)"), | ||
| seconds: float = typer.Option(None, "--seconds", help="Only the first N seconds"), | ||
| no_gui: bool = typer.Option(False, "--no-gui", help="Write the .rrd but don't open the viewer"), | ||
| ) -> None: | ||
| """Render a memory2 store into rerun (writes a .rrd, then opens the viewer).""" | ||
| from dimos.memory2.cli.render import open_store, render_store | ||
|
|
||
| render_store(open_store(path), out=out, seconds=seconds, no_gui=no_gui) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| # Copyright 2026 Dimensional Inc. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Render any memory2 store into rerun. | ||
|
|
||
| Generic: walks the store's streams and logs every observation whose payload | ||
| implements ``to_rerun()`` (the :class:`RerunConvertible` convention). Streams | ||
| whose payload has no ``to_rerun`` are skipped. Each stream becomes an entity | ||
| path; observations share one ``time`` timeline (relative to the store's earliest | ||
| observation, so streams stay aligned). Writes a ``.rrd`` and opens the viewer. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from pathlib import Path | ||
| import shutil | ||
| import subprocess | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from dimos.memory2.store.base import Store | ||
|
|
||
|
|
||
| def open_store(path: str) -> Store: | ||
| """Open a store by file type (``.db`` -> SqliteStore, else Go2 mcap).""" | ||
| if str(path).endswith(".db"): | ||
| from dimos.memory2.store.sqlite import SqliteStore | ||
|
|
||
| return SqliteStore(path=path, must_exist=True) | ||
| from dimos.robot.unitree.go2dds.store import Go2McapStore # lazy: robot-layer codec set | ||
|
|
||
| return Go2McapStore(path=path) | ||
|
|
||
|
|
||
| def _open_viewer(rrd: str) -> None: | ||
| exe = shutil.which("rerun") | ||
| if exe: | ||
| subprocess.Popen([exe, rrd]) | ||
| print(f" opening {rrd} in rerun") | ||
| else: | ||
| print(f" rerun viewer not found on PATH; open manually:\n rerun {rrd}") | ||
|
|
||
|
|
||
| def render_store( | ||
| store: Store, | ||
| *, | ||
| out: str | None = None, | ||
| seconds: float | None = None, | ||
| no_gui: bool = False, | ||
| ) -> str: | ||
| """Render ``store`` to a ``.rrd`` and (unless ``no_gui``) open the rerun viewer. | ||
|
|
||
| Logs every observation (full res); ``seconds`` bounds the time window from | ||
| the start. Returns the ``.rrd`` path. | ||
| """ | ||
| import rerun as rr | ||
|
|
||
| from dimos.memory2.utils.progress import progress | ||
| from dimos.visualization.rerun.init import rerun_init | ||
|
|
||
| if out is None: | ||
| src = getattr(store.config, "path", None) or "store" | ||
| out = str(Path(src).with_suffix(".rrd")) | ||
|
|
||
| # Discover renderable streams (payload has a working to_rerun) + shared anchor. | ||
| renderable = [] | ||
| t0: float | None = None | ||
| for name in store.list_streams(): | ||
| stream = store.streams[name] | ||
| try: | ||
| first = stream.first() | ||
| except LookupError: | ||
| continue | ||
| data = first.data | ||
| if not hasattr(data, "to_rerun"): | ||
| print(f" skip {name}: {type(data).__name__} has no to_rerun()") | ||
| continue | ||
| try: | ||
| data.to_rerun() | ||
| except Exception as e: | ||
| print(f" skip {name}: to_rerun() failed ({e})") | ||
| continue | ||
| renderable.append((name, stream)) | ||
| t0 = first.ts if t0 is None else min(t0, first.ts) | ||
|
|
||
| if t0 is None: | ||
| print("nothing renderable in this store") | ||
| return out | ||
|
|
||
| rerun_init("dimos mem rerun") | ||
| rr.save(out) | ||
|
|
||
| for name, stream in renderable: | ||
| report = progress(stream.count(), label=name) | ||
| for obs in stream: | ||
| if seconds is not None and obs.ts - t0 > seconds: | ||
| print() # terminate the windowed (sub-100%) progress line | ||
| break | ||
| if obs.data is None: # e.g. a truncated/corrupt frame that failed to decode | ||
| report(obs) | ||
| continue | ||
| rr.set_time("time", duration=obs.ts - t0) | ||
| data = obs.data.to_rerun() | ||
| if isinstance(data, list): # RerunMulti: [(subpath, archetype), ...] | ||
| for sub, arch in data: | ||
| rr.log(f"{name}/{sub}", arch) | ||
| else: | ||
| rr.log(name, data) | ||
| report(obs) | ||
|
|
||
| rr.rerun_shutdown() # flush + close the .rrd before opening it | ||
| print(f"wrote {out}") | ||
| if not no_gui: | ||
| _open_viewer(out) | ||
| return out |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,181 @@ | ||
| # Copyright 2026 Dimensional Inc. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| """Read-only memory2 store backed by an mcap file. | ||
| Generic and codec-injected — it knows nothing about any robot. The caller | ||
| supplies ``codecs`` (DDS/wire topic -> codec that decodes a message's stored | ||
| bytes) and an optional ``streams`` map (friendly stream name -> topic). See | ||
| ``dimos.robot.unitree.go2dds.store.Go2McapStore`` for the Go2 wiring. | ||
| Read-only: no append, blobs, vectors, or embeddings. Payloads decode lazily on | ||
| ``obs.data``; ts and counts are cheap (counts come from the mcap index). | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from collections.abc import Iterator, Mapping | ||
| from dataclasses import replace | ||
| from functools import partial | ||
| from typing import Any, Protocol, runtime_checkable | ||
|
|
||
| from dimos.memory2.backend import Backend | ||
| from dimos.memory2.codecs.base import codec_for | ||
| from dimos.memory2.notifier.subject import SubjectNotifier | ||
| from dimos.memory2.observationstore.base import ObservationStore, ObservationStoreConfig | ||
| from dimos.memory2.store.base import Store, StoreConfig | ||
| from dimos.memory2.type.filter import StreamQuery | ||
| from dimos.memory2.type.observation import Observation | ||
|
|
||
|
|
||
| @runtime_checkable | ||
| class StreamCodec(Protocol): | ||
| """What the store needs to turn a channel's stored bytes into a payload.""" | ||
|
|
||
| @property | ||
| def payload_type(self) -> type: ... | ||
|
|
||
| def decode(self, data: bytes) -> Any: ... | ||
|
|
||
|
|
||
| def _slug(topic: str) -> str: | ||
| """Auto stream name from a topic: drop the ``rt/`` prefix and ``/`` -> ``_``. | ||
| ``rt/`` is the ROS2-over-DDS topic prefix; ``removeprefix`` only strips it | ||
| where present (e.g. app-level ``control_log`` is left alone). | ||
| """ | ||
| return topic.removeprefix("rt/").replace("/", "_") | ||
|
|
||
|
|
||
| class McapObservationStoreConfig(ObservationStoreConfig): | ||
| name: str = "<mcap>" | ||
|
|
||
|
|
||
| class McapObservationStore(ObservationStore[Any]): | ||
| """Read-only metadata/query over one mcap channel. Payloads load lazily.""" | ||
|
|
||
| config: McapObservationStoreConfig | ||
|
|
||
| def __init__(self, *, name: str, path: str, topic: str, codec: StreamCodec, count: int) -> None: | ||
| super().__init__(name=name) | ||
| self._path = path | ||
| self._topic = topic | ||
| self._codec = codec | ||
| self._count = count | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return self.config.name | ||
|
|
||
| def _iter(self, reverse: bool = False) -> Iterator[Observation[Any]]: | ||
| from mcap.reader import make_reader # optional dep (go2/unitree extra) | ||
|
|
||
| decode, dtype, n = self._codec.decode, self._codec.payload_type, self._count | ||
| with open(self._path, "rb") as f: | ||
| msgs = make_reader(f).iter_messages(topics=[self._topic], reverse=reverse) | ||
| for i, (_s, _c, m) in enumerate(msgs): | ||
| yield Observation( | ||
| id=(n - 1 - i) if reverse else i, | ||
| ts=m.log_time / 1e9, | ||
| data_type=dtype, | ||
| _loader=partial(decode, m.data), | ||
| ) | ||
|
|
||
| def query(self, q: StreamQuery) -> Iterator[Observation[Any]]: | ||
| # mcap is natively log-time ordered (== ts == our id), so serve ts/id | ||
| # ordering by iterating forward/reverse instead of materializing + sorting. | ||
| if q.order_field in ("ts", "id"): | ||
| it = self._iter(reverse=q.order_desc) | ||
| q = replace(q, order_field=None, order_desc=False) | ||
| return q.apply(it) | ||
| return q.apply(self._iter()) | ||
|
|
||
| def count(self, q: StreamQuery) -> int: | ||
| if not q.filters and q.search_text is None and q.search_vec is None: | ||
| n = self._count | ||
| if q.offset_val: | ||
| n = max(0, n - q.offset_val) | ||
| if q.limit_val is not None: | ||
| n = min(n, q.limit_val) | ||
| return n | ||
| return sum(1 for _ in self.query(q)) | ||
|
|
||
| def fetch_by_ids(self, ids: list[int]) -> list[Observation[Any]]: | ||
| want = set(ids) | ||
| return [o for o in self._iter() if o.id in want] | ||
|
|
||
| def insert(self, obs: Observation[Any]) -> int: | ||
| raise NotImplementedError("McapStore is read-only") | ||
|
|
||
|
|
||
| class McapStoreConfig(StoreConfig): | ||
| path: str = "" | ||
|
|
||
|
|
||
| class McapStore(Store): | ||
| """A memory2 store backed by an mcap file (read-only). | ||
| Every channel present in the file with a codec is exposed. Names default to | ||
| the slugified topic (see :func:`_slug`); ``streams`` (friendly name -> topic) | ||
| overrides the name for specific topics. | ||
| """ | ||
|
|
||
| config: McapStoreConfig | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| codecs: Mapping[str, StreamCodec], | ||
| streams: dict[str, str] | None = None, | ||
| **kwargs: Any, | ||
| ) -> None: | ||
| from mcap.reader import make_reader # optional dep (go2/unitree extra) | ||
|
|
||
| super().__init__(**kwargs) | ||
| self._codecs = codecs | ||
| name_of = {topic: name for name, topic in (streams or {}).items()} # topic -> override | ||
| with open(self.config.path, "rb") as f: | ||
| summary = make_reader(f).get_summary() | ||
| self._stream_topic: dict[str, str] = {} # stream name -> topic | ||
| self._available: dict[str, int] = {} # stream name -> message count | ||
| if summary is not None and summary.statistics is not None: | ||
| for cid, ch in summary.channels.items(): | ||
| if ch.topic not in self._codecs: | ||
| continue | ||
| name = name_of.get(ch.topic) or _slug(ch.topic) | ||
| self._stream_topic[name] = ch.topic | ||
| self._available[name] = summary.statistics.channel_message_counts.get(cid, 0) | ||
|
|
||
| def list_streams(self) -> list[str]: | ||
| return sorted(set(self._available) | set(self._streams)) | ||
|
|
||
| def _create_backend( | ||
| self, name: str, payload_type: type | None = None, **config: Any | ||
| ) -> Backend[Any]: | ||
| if name not in self._available: | ||
| raise KeyError(f"No stream {name!r}. Available: {sorted(self._available)}") | ||
| topic = self._stream_topic[name] | ||
| codec = self._codecs[topic] | ||
| ptype = codec.payload_type | ||
| obs = McapObservationStore( | ||
| name=name, path=self.config.path, topic=topic, codec=codec, count=self._available[name] | ||
| ) | ||
| return Backend( | ||
| metadata_store=obs, | ||
| codec=codec_for(ptype), # storage codec, unused (blob_store=None) | ||
| data_type=ptype, | ||
| blob_store=None, | ||
| vector_store=None, | ||
| notifier=SubjectNotifier(), | ||
| ) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.