Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data/.lfs/go2_china_office_indoor.mcap.tar.gz
Git LFS file not shown
34 changes: 34 additions & 0 deletions dimos/memory2/cli/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""``dimos mem`` — memory2 store commands."""

from __future__ import annotations

import typer

mem_app = typer.Typer(help="memory2 store commands", no_args_is_help=True)


@mem_app.command()
def rerun(
path: str = typer.Argument(..., help="Store to render (.mcap path/name or .db)"),
out: str = typer.Option(None, "--out", help="Output .rrd (default: alongside the source)"),
seconds: float = typer.Option(None, "--seconds", help="Only the first N seconds"),
no_gui: bool = typer.Option(False, "--no-gui", help="Write the .rrd but don't open the viewer"),
) -> None:
"""Render a memory2 store into rerun (writes a .rrd, then opens the viewer)."""
from dimos.memory2.cli.render import open_store, render_store

render_store(open_store(path), out=out, seconds=seconds, no_gui=no_gui)
126 changes: 126 additions & 0 deletions dimos/memory2/cli/render.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Render any memory2 store into rerun.

Generic: walks the store's streams and logs every observation whose payload
implements ``to_rerun()`` (the :class:`RerunConvertible` convention). Streams
whose payload has no ``to_rerun`` are skipped. Each stream becomes an entity
path; observations share one ``time`` timeline (relative to the store's earliest
observation, so streams stay aligned). Writes a ``.rrd`` and opens the viewer.
"""

from __future__ import annotations

from pathlib import Path
import shutil
import subprocess
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from dimos.memory2.store.base import Store


def open_store(path: str) -> Store:
"""Open a store by file type (``.db`` -> SqliteStore, else Go2 mcap)."""
if str(path).endswith(".db"):
from dimos.memory2.store.sqlite import SqliteStore

return SqliteStore(path=path, must_exist=True)
from dimos.robot.unitree.go2dds.store import Go2McapStore # lazy: robot-layer codec set

return Go2McapStore(path=path)


def _open_viewer(rrd: str) -> None:
exe = shutil.which("rerun")
if exe:
subprocess.Popen([exe, rrd])
print(f" opening {rrd} in rerun")
else:
print(f" rerun viewer not found on PATH; open manually:\n rerun {rrd}")


def render_store(
store: Store,
*,
out: str | None = None,
seconds: float | None = None,
no_gui: bool = False,
) -> str:
"""Render ``store`` to a ``.rrd`` and (unless ``no_gui``) open the rerun viewer.

Logs every observation (full res); ``seconds`` bounds the time window from
the start. Returns the ``.rrd`` path.
"""
import rerun as rr

from dimos.memory2.utils.progress import progress
from dimos.visualization.rerun.init import rerun_init

if out is None:
src = getattr(store.config, "path", None) or "store"
out = str(Path(src).with_suffix(".rrd"))

# Discover renderable streams (payload has a working to_rerun) + shared anchor.
renderable = []
t0: float | None = None
for name in store.list_streams():
stream = store.streams[name]
try:
first = stream.first()
except LookupError:
continue
data = first.data
if not hasattr(data, "to_rerun"):
print(f" skip {name}: {type(data).__name__} has no to_rerun()")
continue
try:
data.to_rerun()
except Exception as e:
print(f" skip {name}: to_rerun() failed ({e})")
continue
renderable.append((name, stream))
t0 = first.ts if t0 is None else min(t0, first.ts)

if t0 is None:
print("nothing renderable in this store")
return out

rerun_init("dimos mem rerun")
rr.save(out)

for name, stream in renderable:
report = progress(stream.count(), label=name)
for obs in stream:
if seconds is not None and obs.ts - t0 > seconds:
print() # terminate the windowed (sub-100%) progress line
break
if obs.data is None: # e.g. a truncated/corrupt frame that failed to decode
report(obs)
continue
rr.set_time("time", duration=obs.ts - t0)
data = obs.data.to_rerun()
if isinstance(data, list): # RerunMulti: [(subpath, archetype), ...]
for sub, arch in data:
rr.log(f"{name}/{sub}", arch)
else:
rr.log(name, data)
report(obs)

rr.rerun_shutdown() # flush + close the .rrd before opening it
print(f"wrote {out}")
if not no_gui:
_open_viewer(out)
return out
4 changes: 4 additions & 0 deletions dimos/memory2/store/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ def list_streams(self) -> list[str]:
"""Return names of all streams in this store."""
return list(self._streams.keys())

def summary(self) -> str:
"""One line per stream — name, count, ts range. See :meth:`Stream.summary`."""
return "\n".join(self.stream(name).summary() for name in self.list_streams())

def delete_stream(self, name: str) -> None:
"""Delete a stream by name (from cache and underlying storage)."""
stream = self._streams.pop(name, None)
Expand Down
181 changes: 181 additions & 0 deletions dimos/memory2/store/mcap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Read-only memory2 store backed by an mcap file.
Generic and codec-injected — it knows nothing about any robot. The caller
supplies ``codecs`` (DDS/wire topic -> codec that decodes a message's stored
bytes) and an optional ``streams`` map (friendly stream name -> topic). See
``dimos.robot.unitree.go2dds.store.Go2McapStore`` for the Go2 wiring.
Read-only: no append, blobs, vectors, or embeddings. Payloads decode lazily on
``obs.data``; ts and counts are cheap (counts come from the mcap index).
"""

from __future__ import annotations

from collections.abc import Iterator, Mapping
from dataclasses import replace
from functools import partial
from typing import Any, Protocol, runtime_checkable

from dimos.memory2.backend import Backend
from dimos.memory2.codecs.base import codec_for
from dimos.memory2.notifier.subject import SubjectNotifier
from dimos.memory2.observationstore.base import ObservationStore, ObservationStoreConfig
from dimos.memory2.store.base import Store, StoreConfig
from dimos.memory2.type.filter import StreamQuery
from dimos.memory2.type.observation import Observation


@runtime_checkable
class StreamCodec(Protocol):
"""What the store needs to turn a channel's stored bytes into a payload."""

@property
def payload_type(self) -> type: ...

def decode(self, data: bytes) -> Any: ...


def _slug(topic: str) -> str:
"""Auto stream name from a topic: drop the ``rt/`` prefix and ``/`` -> ``_``.
``rt/`` is the ROS2-over-DDS topic prefix; ``removeprefix`` only strips it
where present (e.g. app-level ``control_log`` is left alone).
"""
return topic.removeprefix("rt/").replace("/", "_")


class McapObservationStoreConfig(ObservationStoreConfig):
name: str = "<mcap>"


class McapObservationStore(ObservationStore[Any]):
"""Read-only metadata/query over one mcap channel. Payloads load lazily."""

config: McapObservationStoreConfig

def __init__(self, *, name: str, path: str, topic: str, codec: StreamCodec, count: int) -> None:
super().__init__(name=name)
self._path = path
self._topic = topic
self._codec = codec
self._count = count

@property
def name(self) -> str:
return self.config.name

def _iter(self, reverse: bool = False) -> Iterator[Observation[Any]]:
from mcap.reader import make_reader # optional dep (go2/unitree extra)

decode, dtype, n = self._codec.decode, self._codec.payload_type, self._count
with open(self._path, "rb") as f:
msgs = make_reader(f).iter_messages(topics=[self._topic], reverse=reverse)
for i, (_s, _c, m) in enumerate(msgs):
yield Observation(
id=(n - 1 - i) if reverse else i,
ts=m.log_time / 1e9,
data_type=dtype,
_loader=partial(decode, m.data),
)

def query(self, q: StreamQuery) -> Iterator[Observation[Any]]:
# mcap is natively log-time ordered (== ts == our id), so serve ts/id
# ordering by iterating forward/reverse instead of materializing + sorting.
if q.order_field in ("ts", "id"):
it = self._iter(reverse=q.order_desc)
q = replace(q, order_field=None, order_desc=False)
return q.apply(it)
return q.apply(self._iter())

def count(self, q: StreamQuery) -> int:
if not q.filters and q.search_text is None and q.search_vec is None:
n = self._count
if q.offset_val:
n = max(0, n - q.offset_val)
if q.limit_val is not None:
n = min(n, q.limit_val)
return n
return sum(1 for _ in self.query(q))
Comment thread
leshy marked this conversation as resolved.

def fetch_by_ids(self, ids: list[int]) -> list[Observation[Any]]:
want = set(ids)
return [o for o in self._iter() if o.id in want]

def insert(self, obs: Observation[Any]) -> int:
raise NotImplementedError("McapStore is read-only")


class McapStoreConfig(StoreConfig):
path: str = ""


class McapStore(Store):
"""A memory2 store backed by an mcap file (read-only).
Every channel present in the file with a codec is exposed. Names default to
the slugified topic (see :func:`_slug`); ``streams`` (friendly name -> topic)
overrides the name for specific topics.
"""

config: McapStoreConfig

def __init__(
self,
*,
codecs: Mapping[str, StreamCodec],
streams: dict[str, str] | None = None,
**kwargs: Any,
) -> None:
from mcap.reader import make_reader # optional dep (go2/unitree extra)

super().__init__(**kwargs)
self._codecs = codecs
name_of = {topic: name for name, topic in (streams or {}).items()} # topic -> override
with open(self.config.path, "rb") as f:
summary = make_reader(f).get_summary()
self._stream_topic: dict[str, str] = {} # stream name -> topic
self._available: dict[str, int] = {} # stream name -> message count
if summary is not None and summary.statistics is not None:
for cid, ch in summary.channels.items():
if ch.topic not in self._codecs:
continue
name = name_of.get(ch.topic) or _slug(ch.topic)
self._stream_topic[name] = ch.topic
self._available[name] = summary.statistics.channel_message_counts.get(cid, 0)

def list_streams(self) -> list[str]:
return sorted(set(self._available) | set(self._streams))

def _create_backend(
self, name: str, payload_type: type | None = None, **config: Any
) -> Backend[Any]:
if name not in self._available:
raise KeyError(f"No stream {name!r}. Available: {sorted(self._available)}")
topic = self._stream_topic[name]
codec = self._codecs[topic]
ptype = codec.payload_type
obs = McapObservationStore(
name=name, path=self.config.path, topic=topic, codec=codec, count=self._available[name]
)
return Backend(
metadata_store=obs,
codec=codec_for(ptype), # storage codec, unused (blob_store=None)
data_type=ptype,
blob_store=None,
vector_store=None,
notifier=SubjectNotifier(),
)
Loading
Loading