diff --git a/data/.lfs/go2_mid360_stairs.db.tar.gz b/data/.lfs/go2_mid360_stairs.db.tar.gz new file mode 100644 index 0000000000..615aad3a17 --- /dev/null +++ b/data/.lfs/go2_mid360_stairs.db.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:02d8d2194332291cf71988458965aa9f8019b8d61661ff153e93886dd72e85e9 +size 54082131 diff --git a/dimos/mapping/loop_closure/utils/map_rrd.py b/dimos/mapping/loop_closure/utils/map_rrd.py new file mode 100644 index 0000000000..f6a30e30ae --- /dev/null +++ b/dimos/mapping/loop_closure/utils/map_rrd.py @@ -0,0 +1,365 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dump a recorded dataset to .rrd: lidar point clouds + camera frames. + +Lidar clouds are assumed to be in world frame and logged directly under +their entity path (no parent transform). Entities written: + +- ``world/lidar`` — Go2 L1 per-frame point cloud +- ``world/lidar_voxels`` — accumulated voxel map of the primary lidar (``--map``) +- ``world/fastlio_lidar`` — fastlio_lidar raw cloud (if present) +- ``world/fastlio_voxels``— accumulated voxel map of fastlio_lidar (``--map``) +- ``world/fastlio`` — fastlio_odometry pose axis (if present) +- ``world/fastlio_path`` — fastlio_odometry trajectory (growing LineStrips3D) +- ``world/odom`` — Go2 onboard odom pose axis (if present) +- ``world/odom_path`` — Go2 onboard odom trajectory (growing LineStrips3D) +- ``world/camera`` — color_image camera pose (static pinhole + Transform3D) +- ``world/camera/image`` — color_image frames + +Usage: + uv run python -m dimos.mapping.loop_closure.utils.map_rrd mid360 --out map.rrd + uv run python -m dimos.mapping.loop_closure.utils.map_rrd mid360 --out map.rrd --map + rerun map.rrd +""" + +from __future__ import annotations + +from collections.abc import Callable +from pathlib import Path +import time +from typing import Any + +import rerun as rr +import typer + +from dimos.mapping.voxels import VoxelMapTransformer +from dimos.memory2.store.sqlite import SqliteStore +from dimos.memory2.stream import Stream +from dimos.memory2.transform import throttle +from dimos.memory2.type.observation import Observation +from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.nav_msgs.Odometry import Odometry +from dimos.msgs.sensor_msgs.Image import Image +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2, register_colormap_annotation +from dimos.robot.unitree.go2.connection import BASE_TO_OPTICAL, _camera_info_static +from dimos.utils.data import resolve_named_path + +TIMELINE = "ts" + + +def _progress(total: int, label: str) -> Callable[[Observation[Any]], None]: + """Matches dimos/utils/cli/map.py:progress.""" + seen = 0 + wall_start: float | None = None + last_wall: float | None = None + first_ts: float | None = None + + def tick(obs: Observation[Any]) -> None: + nonlocal seen, wall_start, last_wall, first_ts + now = time.monotonic() + if wall_start is None: + wall_start = now + first_ts = obs.ts + assert first_ts is not None + frame_ms = (now - last_wall) * 1000 if last_wall is not None else 0.0 + last_wall = now + seen += 1 + pct = 100 * seen // total if total else 100 + wall = now - wall_start + data = obs.ts - first_ts + speed = data / wall if wall > 0 else 0.0 + end = "\n" if seen >= total else "" + print( + f"\r{label} {pct:>3}% [{seen}/{total}] {data:.1f}s ({speed:.1f} x rt) {frame_ms:.0f}ms/frame", + end=end, + flush=True, + ) + + return tick + + +def _log_clouds( + label: str, + stream: Stream[PointCloud2], + entity: str, + voxel: float, + point_mode: str, + *, + total: int | None = None, +) -> None: + """Iterate a PointCloud2 stream and log each obs to ``entity``. + + ``total`` overrides the progress denominator — useful for transform + pipelines where calling :py:meth:`Stream.count` would materialize the + whole pipeline. + """ + n = total if total is not None else stream.count() + cb = _progress(n, label) + for obs in stream: + cb(obs) + rr.set_time(TIMELINE, timestamp=obs.ts) + rr.log(entity, obs.data.to_rerun(voxel_size=voxel, mode=point_mode)) + + +def _log_path( + label: str, + stream: Stream[Any], + entity: str, + color: tuple[int, int, int], + *, + emit_every: int = 10, +) -> None: + """Iterate a pose-bearing stream and log a growing :class:`LineStrips3D` to + ``entity`` every ``emit_every`` poses (and once more at the end). Frames + without a pose are skipped. + """ + n = stream.count() + cb = _progress(n, label) + points: list[tuple[float, float, float]] = [] + last_ts: float | None = None + emit_count = 0 + for obs in stream: + cb(obs) + p = obs.pose_tuple + if p is None: + continue + points.append((float(p[0]), float(p[1]), float(p[2]))) + last_ts = obs.ts + emit_count += 1 + if emit_every > 0 and emit_count % emit_every == 0 and len(points) >= 2: + rr.set_time(TIMELINE, timestamp=obs.ts) + rr.log(entity, rr.LineStrips3D([points], colors=[color])) + if ( + last_ts is not None + and len(points) >= 2 + and (emit_every <= 0 or emit_count % emit_every != 0) + ): + rr.set_time(TIMELINE, timestamp=last_ts) + rr.log(entity, rr.LineStrips3D([points], colors=[color])) + + +def main( + dataset: str = typer.Argument(..., help="Dataset .db: bare name (cwd or data/) or path"), + out: Path = typer.Option(..., "--out", help="Output .rrd path"), + voxel: float = typer.Option( + 0.05, "--voxel", help="Voxel size hint for the point cloud renderer" + ), + point_mode: str = typer.Option( + "spheres", "--point-mode", help="Render mode: 'spheres', 'boxes', or 'points'" + ), + camera_hz: float = typer.Option( + 2.0, "--camera-hz", help="Throttle color_image to at most this rate; 0 disables" + ), + map: bool = typer.Option( + False, + "--map", + help="Accumulate each lidar stream into a VoxelGrid and log only the final map", + ), + map_voxel: float = typer.Option( + 0.05, "--map-voxel", help="Voxel size for the accumulated map (m); --map only" + ), + map_device: str = typer.Option( + "CUDA:0", "--map-device", help="Open3D device for the VoxelGrid; --map only" + ), + map_emit_every: int = typer.Option( + 10, + "--map-emit-every", + help="Emit accumulated map every N frames (0 = only at end); --map only", + ), + image_pose_from: str = typer.Option( + "own", + "--image-pose-from", + help="Pose authority for color_image frames: 'own' (image pose) or 'fastlio_odom' " + "(nearest fastlio_odometry frame in time)", + ), + image_pose_tol: float = typer.Option( + 0.1, + "--image-pose-tol", + help="Max time gap (s) when matching --image-pose-from fastlio_odom", + ), +) -> None: + db_path = resolve_named_path(dataset, ".db") + cam_info = _camera_info_static() + + rr.init("dimos map_rrd", recording_id=db_path.stem) + rr.save(str(out)) + register_colormap_annotation("turbo") + + # Static pinhole on the camera entity; per-frame Transform3D goes on the + # same entity. Image is the child so it projects through the pinhole. + pinhole = cam_info.to_rerun() + assert not isinstance(pinhole, list) + rr.log("world/camera", pinhole, static=True) + + # Static axis triads as children of each moving Transform3D, so the + # transforms are actually visible in the 3D view. + axes = rr.Arrows3D( + vectors=[[0.3, 0, 0], [0, 0.3, 0], [0, 0, 0.3]], + colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]], + ) + rr.log("world/fastlio/axes", axes, static=True) + rr.log("world/odom/axes", axes, static=True) + + store = SqliteStore(path=str(db_path)) + with store: + print(store.summary()) + + lidar = store.stream("lidar", PointCloud2) + color_image = store.stream("color_image", Image) + has_livox = "fastlio_lidar" in store.streams + livox = store.stream("fastlio_lidar", PointCloud2) if has_livox else None + + _log_clouds(" lidar", lidar, "world/lidar", voxel, point_mode) + if livox is not None: + _log_clouds("fastlio_lidar", livox, "world/fastlio_lidar", voxel, point_mode) + + # Accumulated voxel maps (--map only). + # Go2 L1 forward-facing → column carving on. + # Mid360 spherical → column carving off, just aggregate. + if map: + grid_kwargs = {"voxel_size": map_voxel, "device": map_device, "show_startup_log": False} + _log_clouds( + " lidar_voxels", + lidar.transform( + VoxelMapTransformer( + emit_every=map_emit_every, carve_columns=True, **grid_kwargs + ) + ), + "world/lidar_voxels", + voxel, + point_mode, + total=max(1, lidar.count() // max(map_emit_every, 1)), + ) + if livox is not None: + _log_clouds( + "fastlio_voxels", + livox.transform( + VoxelMapTransformer( + emit_every=map_emit_every, carve_columns=False, **grid_kwargs + ) + ), + "world/fastlio_voxels", + voxel, + point_mode, + total=max(1, livox.count() // max(map_emit_every, 1)), + ) + + # fastlio pose axis + path from fastlio_odometry stream. + if "fastlio_odometry" in store.streams: + odometry = store.stream("fastlio_odometry", Odometry) + cb = _progress(odometry.count(), "fastlio_odometry") + for obs in odometry: + cb(obs) + p = obs.pose_tuple + if p is None: + continue + rr.set_time(TIMELINE, timestamp=obs.ts) + x, y, z, qx, qy, qz, qw = p + rr.log( + "world/fastlio", + rr.Transform3D( + translation=[x, y, z], + quaternion=rr.Quaternion(xyzw=[qx, qy, qz, qw]), + ), + ) + _log_path( + " fastlio_path", + store.stream("fastlio_odometry", Odometry), + "world/fastlio_path", + color=(255, 165, 0), # orange + ) + + # Go2 native odom pose axis + path. + if "odom" in store.streams: + odom = store.stream("odom", PoseStamped) + cb = _progress(odom.count(), " odom") + for pose_obs in odom: + cb(pose_obs) + p = pose_obs.pose_tuple + if p is None: + continue + rr.set_time(TIMELINE, timestamp=pose_obs.ts) + x, y, z, qx, qy, qz, qw = p + rr.log( + "world/odom", + rr.Transform3D( + translation=[x, y, z], + quaternion=rr.Quaternion(xyzw=[qx, qy, qz, qw]), + ), + ) + _log_path( + " odom_path", + store.stream("odom", PoseStamped), + "world/odom_path", + color=(0, 200, 100), # green + ) + + # Pass 2: camera pose + image per color_image. + cam_pipeline = ( + color_image.transform(throttle(1.0 / camera_hz)) if camera_hz > 0 else color_image + ) + n_img = cam_pipeline.count() + cb = _progress(n_img, " color_image") + pose_authority: Stream[Any] | None = None + if image_pose_from == "fastlio_odom": + if "fastlio_odometry" not in store.streams: + raise typer.BadParameter( + "--image-pose-from=fastlio_odom but no fastlio_odometry stream in dataset" + ) + pose_authority = store.stream("fastlio_odometry", Odometry) + elif image_pose_from != "own": + raise typer.BadParameter( + f"--image-pose-from must be 'own' or 'fastlio_odom', got {image_pose_from!r}" + ) + + for img_obs in cam_pipeline: + cb(img_obs) + rr.set_time(TIMELINE, timestamp=img_obs.ts) + if pose_authority is not None: + # fastlio_odom is body-in-world; compose with base→optical so the + # camera entity lands where the camera actually is, not the body. + matches = pose_authority.at(img_obs.ts, tolerance=image_pose_tol).to_list() + if matches: + nearest = min(matches, key=lambda o: abs(o.ts - img_obs.ts)) + ps = nearest.pose_stamped + cam_tf = ( + Transform.from_pose("world", ps) + BASE_TO_OPTICAL + if ps is not None + else None + ) + else: + cam_tf = None + else: + # The image's own recorded pose is already optical-in-world. + ps = img_obs.pose_stamped + cam_tf = Transform.from_pose("world", ps) if ps is not None else None + if cam_tf is not None: + t, q = cam_tf.translation, cam_tf.rotation + rr.log( + "world/camera", + rr.Transform3D( + translation=[t.x, t.y, t.z], + quaternion=rr.Quaternion(xyzw=[q.x, q.y, q.z, q.w]), + ), + ) + rr.log("world/camera/image", img_obs.data.to_rerun()) + + print(f"wrote {out}") + print(f"open with: rerun {out}") + + +if __name__ == "__main__": + typer.run(main) diff --git a/dimos/memory2/observationstore/sqlite.py b/dimos/memory2/observationstore/sqlite.py index c74a481cfe..88128e3c1a 100644 --- a/dimos/memory2/observationstore/sqlite.py +++ b/dimos/memory2/observationstore/sqlite.py @@ -331,6 +331,7 @@ def _ensure_tag_indexes(self, tags: dict[str, Any]) -> None: self._tag_indexes.add(key) def insert(self, obs: Observation[T]) -> int: + # Observation already normalizes pose to the storage 7-tuple — skip _decompose_pose. pose = obs.pose_tuple tags_json = json.dumps(obs.tags) if obs.tags else "{}" value = obs._data if isinstance(obs._data, (int, float)) else None diff --git a/dimos/memory2/store/base.py b/dimos/memory2/store/base.py index fb6d76379d..3c07d6ae6d 100644 --- a/dimos/memory2/store/base.py +++ b/dimos/memory2/store/base.py @@ -14,6 +14,7 @@ from __future__ import annotations +from collections.abc import Iterator from typing import TYPE_CHECKING, Any, Generic, Protocol, TypeVar, cast from dimos.core.resource import CompositeResource @@ -65,6 +66,12 @@ def __getitem__(self, name: str) -> S: raise KeyError(name) return self._container.stream(name) + def __contains__(self, name: object) -> bool: + return isinstance(name, str) and name in self._container.list_streams() + + def __iter__(self) -> Iterator[str]: + return iter(self._container.list_streams()) + def __dir__(self) -> list[str]: return self._container.list_streams() @@ -200,6 +207,10 @@ def list_streams(self) -> list[str]: """Return names of all streams in this store.""" return list(self._streams.keys()) + def summary(self) -> str: + """One line per opened stream — name, count, ts range. See :meth:`Stream.summary`.""" + return "\n".join(s.summary() for _, s in self.streams.items()) + def delete_stream(self, name: str) -> None: """Delete a stream by name (from cache and underlying storage).""" stream = self._streams.pop(name, None) diff --git a/dimos/msgs/geometry_msgs/Quaternion.py b/dimos/msgs/geometry_msgs/Quaternion.py index 39a9c83c42..95a1b1b0c7 100644 --- a/dimos/msgs/geometry_msgs/Quaternion.py +++ b/dimos/msgs/geometry_msgs/Quaternion.py @@ -235,6 +235,7 @@ def __mul__(self, other: Quaternion) -> Quaternion: return Quaternion(x, y, z, w) def dot(self, other: Quaternion) -> float: + """4D inner product. ``|q.dot(q')| == 1`` iff same orientation (up to sign).""" return float(self.x * other.x + self.y * other.y + self.z * other.z + self.w * other.w) def angle_to(self, other: Quaternion) -> float: diff --git a/dimos/msgs/sensor_msgs/PointCloud2.py b/dimos/msgs/sensor_msgs/PointCloud2.py index ae30c41711..310f73701a 100644 --- a/dimos/msgs/sensor_msgs/PointCloud2.py +++ b/dimos/msgs/sensor_msgs/PointCloud2.py @@ -727,7 +727,10 @@ def to_rerun( point_colors = colors else: z = points[:, 2] - class_ids = ((z - z.min()) / (z.max() - z.min() + 1e-8) * 255).astype(np.uint8) + + # Clip to robust percentiles so a few outliers don't squash the colormap range. + z_lo, z_hi = np.percentile(z, [1, 99]) + class_ids = (np.clip((z - z_lo) / (z_hi - z_lo + 1e-8), 0, 1) * 255).astype(np.uint8) if mode == "points": return rr.Points3D( diff --git a/dimos/msgs/sensor_msgs/test_PointCloud2.py b/dimos/msgs/sensor_msgs/test_PointCloud2.py index 56ae622a8d..797e266ed3 100644 --- a/dimos/msgs/sensor_msgs/test_PointCloud2.py +++ b/dimos/msgs/sensor_msgs/test_PointCloud2.py @@ -32,8 +32,8 @@ def test_lcm_encode_decode() -> None: decoded = PointCloud2.lcm_decode(binary_msg) # 1. Check number of points - original_points, _ = lidar_msg.as_numpy() - decoded_points, _ = decoded.as_numpy() + original_points = lidar_msg.as_numpy() + decoded_points = decoded.as_numpy() assert len(original_points) == len(decoded_points), ( f"Point count mismatch: {len(original_points)} vs {len(decoded_points)}" diff --git a/dimos/utils/data.py b/dimos/utils/data.py index 9be1ac3a75..b5f2672f4d 100644 --- a/dimos/utils/data.py +++ b/dimos/utils/data.py @@ -113,7 +113,13 @@ def get_data_dir(extra_path: str | None = None) -> Path: return get_project_root() / "data" -def resolve_named_path(name: str | Path, suffix: str = "") -> Path: +def resolve_named_path(name: str | Path, suffix: str = "", *, pull: bool = True) -> Path: + """Resolve a bare name / partial path to an absolute Path. + + If ``pull`` is True (default), fall back to :func:`get_data` (which may + download from LFS) when the file isn't found locally. Set ``pull=False`` + to fail fast with ``FileNotFoundError`` instead. + """ s = str(name) p = Path(s) if p.is_absolute() or p.exists(): @@ -126,6 +132,8 @@ def resolve_named_path(name: str | Path, suffix: str = "") -> Path: return p if (DIMOS_PROJECT_ROOT / p).exists(): return DIMOS_PROJECT_ROOT / p + if not pull: + raise FileNotFoundError(f"{name!r} not found locally (pull=False)") return get_data(p.name)