diff --git a/data/.lfs/go2_mid360_stairs.db.tar.gz b/data/.lfs/go2_mid360_stairs.db.tar.gz new file mode 100644 index 0000000000..615aad3a17 --- /dev/null +++ b/data/.lfs/go2_mid360_stairs.db.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:02d8d2194332291cf71988458965aa9f8019b8d61661ff153e93886dd72e85e9 +size 54082131 diff --git a/data/.lfs/go2dds_data1.tar.gz b/data/.lfs/go2dds_data1.tar.gz new file mode 100644 index 0000000000..f44d478ad5 --- /dev/null +++ b/data/.lfs/go2dds_data1.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dfd237ce49cef1f96074ba02576b9a8094db6d6e0dc9ca24b63e5fdaa84acaf2 +size 224684671 diff --git a/data/.lfs/recording_go2_mid360_2026-05-29_4-45pm-PST.db.tar.gz b/data/.lfs/recording_go2_mid360_2026-05-29_4-45pm-PST.db.tar.gz new file mode 100644 index 0000000000..d30906b058 --- /dev/null +++ b/data/.lfs/recording_go2_mid360_2026-05-29_4-45pm-PST.db.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4b89c53b5a32c9b262b821904031a66143132464e23413ff3f77fad98d363790 +size 1384605823 diff --git a/data/.lfs/recording_go2_mid360_2026-05-29_4-45pm-PST_corrected.db.tar.gz b/data/.lfs/recording_go2_mid360_2026-05-29_4-45pm-PST_corrected.db.tar.gz new file mode 100644 index 0000000000..a91571ea98 --- /dev/null +++ b/data/.lfs/recording_go2_mid360_2026-05-29_4-45pm-PST_corrected.db.tar.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:e913ca4c640c1bfcce523d67fffbac80b3220d2ed94289cbb6bd966e1c68a78d +size 1365230944 diff --git a/dimos/mapping/loop_closure/pgo_auto.py b/dimos/mapping/loop_closure/pgo_auto.py new file mode 100644 index 0000000000..37dbb7d36b --- /dev/null +++ b/dimos/mapping/loop_closure/pgo_auto.py @@ -0,0 +1,854 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""PGO drift corrections as composable Stream stages. + +A lidar/odom stream comes in with poses that drift over time — the robot's +estimate of where it is in the world slowly diverges from ground truth as +small per-frame errors accumulate. PGO ("pose graph optimization") detects +revisited places via loop closure and pulls the trajectory back into +self-consistency. The result is a stream of *corrections*: a Transform per +keyframe that maps every drifted ("raw") pose to its corrected one. + +Pipeline: + + lidar: Stream[PointCloud2] # pose-stamped scans + -> pgo_keyframes(...) -> Stream[Keyframe] # one per kf + -> keyframes_to_corrections(...) -> Stream[Transform] # world_corrected <- world_raw + -> apply_corrections(any_stream, corrections) -> Stream[T] # obs.pose shuffled + +`pgo_keyframes` runs ISAM2 + ICP loop closure across the input, emitting +each kept keyframe with both its raw odom pose (`local`) and its optimized +pose (`optimized`) as Transforms. Pass `loop_closures_out=[]` to also +collect each accepted ICP loop edge as a `LoopClosure` (source+target +optimized poses + ICP fitness) — useful for visualizing the pose graph. + +`keyframes_to_corrections` is pure arithmetic: per keyframe, the drift +correction is `optimized @ local^-1` (Transform composition). The resulting +stream has one entry per keyframe — sparse in time. + +`make_interpolator` materializes the sparse correction stream into a +ts -> Transform function: SLERP for rotation, linear lerp for translation, +clipping out-of-range queries to the endpoints. + +`apply_corrections` shuffles `obs.pose` on any stream using the interpolated +correction at each obs.ts. It's read-only on `obs.data`, so the same lidar +stream can be re-applied — or a different recording from the same physical +space, if the corrections generalize. + +`gtsam` is imported lazily inside hot helpers so importing this module +stays cheap and gtsam-free for consumers that only need `Keyframe` / +`apply_corrections`. +""" + +from __future__ import annotations + +from collections.abc import Callable, Iterable, Iterator +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, TypedDict, TypeVar, Unpack + +import numpy as np +import open3d as o3d # type: ignore[import-untyped] +import open3d.core as o3c # type: ignore[import-untyped] +from scipy.spatial.transform import Rotation + +from dimos.memory2.store.memory import MemoryStore +from dimos.memory2.stream import Stream +from dimos.memory2.type.observation import Observation +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 +from dimos.protocol.service.spec import BaseConfig +from dimos.utils.logging_config import setup_logger + +if TYPE_CHECKING: + import gtsam # type: ignore[import-not-found,import-untyped] + +T = TypeVar("T") + +FRAME_WORLD_CORRECTED = "world_corrected" +FRAME_WORLD_RAW = "world_raw" +FRAME_BODY = "body" + +logger = setup_logger() + + +class PGOConfig(BaseConfig): + # Keyframe detection + key_pose_delta_trans: float = 0.5 + key_pose_delta_deg: float = 10.0 + + # Loop closure + loop_search_radius: float = 2.0 + loop_search_radius_local: float = 15.0 + loop_fallback_drift_thresh: float = 0.5 # min drift to trigger local fallback + loop_time_thresh: float = 20.0 + loop_time_thresh_local: float = 40.0 # stricter time gap for local fallback + loop_score_thresh: float = 0.3 + loop_score_thresh_tight: float = 0.05 # early-exit threshold: ICP rmse ~22cm + loop_submap_half_range: int = 40 + loop_candidates_per_iter: int = 7 + min_icp_inliers: int = 10 + min_keyframes_for_loop_search: int = 10 + loop_closure_extra_iterations: int = 2 + submap_resolution: float = 0.2 + min_loop_detect_duration: float = 3.0 + + # ICP + max_icp_iterations: int = 50 + max_icp_correspondence_dist: float = 0.5 + + +class PGOKwargs(TypedDict, total=False): + """Typed kwargs for `pgo_keyframes`; mirrors `PGOConfig` fields.""" + + key_pose_delta_trans: float + key_pose_delta_deg: float + loop_search_radius: float + loop_search_radius_local: float + loop_fallback_drift_thresh: float + loop_time_thresh: float + loop_time_thresh_local: float + loop_score_thresh: float + loop_score_thresh_tight: float + loop_submap_half_range: int + loop_candidates_per_iter: int + min_icp_inliers: int + min_keyframes_for_loop_search: int + loop_closure_extra_iterations: int + submap_resolution: float + min_loop_detect_duration: float + max_icp_iterations: int + max_icp_correspondence_dist: float + + +@dataclass(frozen=True) +class Keyframe: + """Keyframe emitted by `pgo_keyframes`. + + `local` is the body pose in the odom (`world_raw`) frame, `optimized` is + the body pose in the drift-corrected (`world_corrected`) frame. + """ + + ts: float + local: Transform + optimized: Transform + + +@dataclass(frozen=True) +class LoopClosure: + """An accepted ICP loop-closure edge in the pose graph. + + `source` and `target` are the final optimized body poses (in + `world_corrected`) of the two keyframes the loop connected. `score` is + the ICP fitness used as the loop's translation variance — lower is a + tighter match. + """ + + source: Transform + target: Transform + score: float + + +def pgo_keyframes( + stream: Iterable[Observation[PointCloud2]], + *, + on_frame: Callable[[Any], None] | None = None, + loop_closures_out: list[LoopClosure] | None = None, + **pgo_cfg: Unpack[PGOKwargs], +) -> Stream[Keyframe]: + """Run PGO across a pose-stamped point-cloud stream; emit one obs per keyframe. + + If `loop_closures_out` is provided, accepted loop edges are appended to + it (in detection order) using each keyframe's final optimized pose. + """ + cfg = PGOConfig(**pgo_cfg) + pgo = _PGO(cfg) + + for obs in stream: + if on_frame is not None: + on_frame(obs) + pose = obs.pose_tuple + if pose is None: + continue + # Skip placeholder poses (origin position OR zero quaternion). + if pose[0] == 0 and pose[1] == 0 and pose[2] == 0: + continue + if pose[3] == 0 and pose[4] == 0 and pose[5] == 0 and (pose[6] == 0 or pose[6] == 1): + continue + local_pose = _obs_to_pose3(obs) + pgo.process(local_pose, obs.ts, obs.data) + + mem = MemoryStore() + out: Stream[Keyframe] = mem.stream("keyframes", Keyframe) + for kf in pgo.finalize(): + out.append(kf, ts=kf.ts) + if loop_closures_out is not None: + loop_closures_out.extend(pgo.loop_closures()) + return out + + +def keyframes_to_corrections(keyframes: Stream[Keyframe]) -> Stream[Transform]: + """Per-keyframe drift correction as Transform(world_corrected <- world_raw).""" + mem = MemoryStore() + out: Stream[Transform] = mem.stream("corrections", Transform) + for obs in keyframes: + kf = obs.data + # world_corrected <- body <- world_raw composes to world_corrected <- world_raw. + drift = kf.optimized + kf.local.inverse() + out.append(drift, ts=kf.ts) + return out + + +def make_interpolator(corrections: Stream[Transform]) -> Callable[[float], Transform]: + """Materialize corrections once; return a fast ts -> Transform lookup.""" + ts_list: list[float] = [] + R_list: list[np.ndarray] = [] + t_list: list[np.ndarray] = [] + for obs in corrections: + R, t = _r_t_from_transform(obs.data) + # obs.ts is authoritative; obs.data.ts can be mutated by Transform's + # ts=0.0 -> time.time() fallback in its constructor. + ts_list.append(obs.ts) + R_list.append(R) + t_list.append(t) + + if not ts_list: + raise ValueError("empty corrections stream") + + # Pad len==1 with a duplicate so the general path handles it; + # clip-to-endpoints behavior is unchanged. + if len(ts_list) == 1: + ts_list.append(ts_list[0] + 1e-6) + R_list.append(R_list[0]) + t_list.append(t_list[0]) + + ts_arr = np.array(ts_list) + R_stack = np.stack(R_list) + t_stack = np.stack(t_list) + + def interp(ts: float) -> Transform: + ts_clip = float(np.clip(ts, ts_arr[0], ts_arr[-1])) + idx = int(np.searchsorted(ts_arr, ts_clip)) + if idx == 0: + t = t_stack[0] + R = R_stack[0] + elif idx >= len(ts_arr): + t = t_stack[-1] + R = R_stack[-1] + else: + t_lo, t_hi = ts_arr[idx - 1], ts_arr[idx] + # Nearest-neighbor in time: avoids blending two adjacent + # corrections that may straddle a loop-closure update. + near_lo = (ts_clip - t_lo) < (t_hi - ts_clip) + t = t_stack[idx - 1] if near_lo else t_stack[idx] + R = R_stack[idx - 1] if near_lo else R_stack[idx] + return _transform_from_r_t(R, t, ts=float(ts)) + + return interp + + +def apply_corrections( + stream: Stream[T], + corrections: Stream[Transform], +) -> Stream[T]: + """Shuffle obs.pose on `stream` by the interpolated correction at each obs.ts. + + `obs.data` is untouched. Frames with `obs.pose is None` pass through + unchanged. Out-of-range `obs.ts` get the endpoint correction (clipped). + """ + interp = make_interpolator(corrections) + + def xf(upstream: Iterator[Observation[T]]) -> Iterator[Observation[T]]: + for obs in upstream: + if obs.pose is None: + yield obs + continue + raw_tf = Transform.from_pose(FRAME_BODY, obs.pose_stamped) + # Transform.__add__ composes: (T_corr + T_raw) applies T_corr after T_raw. + # Observation normalizes Transform back to 7-tuple via __post_init__. + corrected = interp(obs.ts) + raw_tf + yield obs.derive(data=obs.data, pose=corrected) + + return stream.transform(xf) + + +class PoseGraph: + """OOP view over PGO output — optimized keyframes, loop edges, and the + drift correction they imply. + + Wraps the functional pipeline (`pgo_keyframes` → `keyframes_to_corrections` + → `make_interpolator`) behind one object for callers that want a single + handle (e.g. the `dimos map` tool). + """ + + def __init__( + self, + keyframes: tuple[Keyframe, ...], + loops: tuple[LoopClosure, ...], + correct: Callable[[float], Transform], + ) -> None: + self.keyframes = keyframes + self.loops = loops + self._correct = correct + + def correction_at(self, ts: float) -> Transform: + """Drift correction (world_corrected <- world_raw) at `ts`.""" + return self._correct(ts) + + def correct(self, pose: Transform) -> Transform: + """Lift a world_raw pose into world_corrected.""" + return self._correct(pose.ts) + pose + + +class PGO: + """Transformer that runs PGO over a pose-stamped cloud stream and emits a + single `PoseGraph` observation. Thin OOP wrapper over `pgo_keyframes`. + + Use as ``stream.transform(PGO()).last().data``. + """ + + def __init__(self, **pgo_cfg: Unpack[PGOKwargs]) -> None: + self._cfg = pgo_cfg + + def __call__( + self, upstream: Iterator[Observation[PointCloud2]] + ) -> Iterator[Observation[PoseGraph]]: + loops: list[LoopClosure] = [] + kfs = pgo_keyframes(upstream, loop_closures_out=loops, **self._cfg) + keyframes = tuple(obs.data for obs in kfs) + if keyframes: + correct = make_interpolator(keyframes_to_corrections(kfs)) + last_ts = keyframes[-1].ts + else: + # No valid keyframes (e.g. pose-less input): correction is identity. + correct = lambda ts: Transform( # noqa: E731 + frame_id=FRAME_WORLD_CORRECTED, child_frame_id=FRAME_WORLD_RAW, ts=ts + ) + last_ts = 0.0 + yield Observation(ts=last_ts, _data=PoseGraph(keyframes, tuple(loops), correct)) + + +def _r_t_from_transform(tf: Transform) -> tuple[np.ndarray, np.ndarray]: + q = tf.rotation + R = Rotation.from_quat([q.x, q.y, q.z, q.w]).as_matrix() + t = np.array([tf.translation.x, tf.translation.y, tf.translation.z]) + return R, t + + +def _transform_from_r_t(R: np.ndarray, t: np.ndarray, *, ts: float) -> Transform: + return Transform( + translation=Vector3(float(t[0]), float(t[1]), float(t[2])), + rotation=Quaternion.from_rotation_matrix(R), + frame_id=FRAME_WORLD_CORRECTED, + child_frame_id=FRAME_WORLD_RAW, + ts=ts, + ) + + +def _obs_to_pose3(obs: Observation[Any]) -> gtsam.Pose3: + """Convert an observation's stored pose tuple directly to a `gtsam.Pose3`.""" + import gtsam # type: ignore[import-not-found,import-untyped] + + if obs.pose_tuple is None: + raise LookupError("No pose set on this observation") + x, y, z, qx, qy, qz, qw = obs.pose_tuple + return gtsam.Pose3( + gtsam.Rot3.Quaternion(float(qw), float(qx), float(qy), float(qz)), + gtsam.Point3(float(x), float(y), float(z)), + ) + + +@dataclass +class _KeyPose: + local: gtsam.Pose3 # odom-frame pose at capture + optimized: gtsam.Pose3 # drift-corrected pose + timestamp: float + body_cloud: PointCloud2 # voxel-downsampled, body frame + + +@dataclass +class _LoopPair: + source: int + target: int + offset: gtsam.Pose3 # source pose in target's frame + score: float + relocalizing: bool = False # True if this loop was found via local-frame fallback + + +class _PGO: + """Incremental PGO: gtsam ISAM2 over keyframes with ICP loop closures. + + Call `process` per frame (odom pose + body-frame points). Call + `finalize` once at the end for the sorted, deduped keyframe list. + """ + + def __init__(self, config: PGOConfig) -> None: + import gtsam # type: ignore[import-not-found,import-untyped] + + self._gtsam = gtsam + self._cfg = config + self._key_poses: list[_KeyPose] = [] + self._pending_loops: list[_LoopPair] = [] + self._accepted_loops: list[_LoopPair] = [] + self._last_loop_ts: float | None = None + self._world_correction: gtsam.Pose3 = gtsam.Pose3() # identity + + params = gtsam.ISAM2Params() + params.setRelinearizeThreshold(0.01) + params.relinearizeSkip = 1 + params.setOptimizationParams(gtsam.ISAM2DoglegParams()) + self._isam2 = gtsam.ISAM2(params) + self._graph = gtsam.NonlinearFactorGraph() + self._values = gtsam.Values() + + def process( + self, + local_pose: gtsam.Pose3, + ts: float, + world_cloud: PointCloud2, + ) -> None: + if len(world_cloud) == 0: + return + if not self._is_keyframe(local_pose): + return + # Unregister: lift world-frame scan back into body frame using its + # odom pose, so PGO can re-project it via the optimized pose later. + body_cloud = world_cloud.transform( + _pose3_to_transform(local_pose.inverse(), ts=ts) + ).voxel_downsample(self._cfg.submap_resolution) + self._add_keyframe(local_pose, ts, body_cloud) + self._search_for_loops() + self._smooth_and_update() + + def finalize(self) -> list[Keyframe]: + """Return keyframes sorted by ts, with duplicate-ts entries dropped.""" + # Second-pass loop rescan over the converged optimized poses: + # keyframes that ended up in heavily drifted regions may have + # missed loops during incremental processing (their optimized + # pose at the moment was misleading). Re-search with the final + # poses gives us another chance to anchor those segments. + self._last_loop_ts = None + for i in range(len(self._key_poses)): + self._search_for_loops( + cur_idx=i, + enforce_time_gate=False, + no_fallback=True, + submap_half_range=38, + time_thresh_override=13.0, + ) + if self._pending_loops: + self._smooth_and_update() + kps = sorted(self._key_poses, key=lambda kp: kp.timestamp) + out: list[Keyframe] = [] + for i, kp in enumerate(kps): + if i > 0 and kp.timestamp <= kps[i - 1].timestamp: + continue + out.append( + Keyframe( + ts=kp.timestamp, + local=_pose3_to_transform( + kp.local, + ts=kp.timestamp, + frame_id=FRAME_WORLD_RAW, + child_frame_id=FRAME_BODY, + ), + optimized=_pose3_to_transform( + kp.optimized, + ts=kp.timestamp, + frame_id=FRAME_WORLD_CORRECTED, + child_frame_id=FRAME_BODY, + ), + ) + ) + return out + + def loop_closures(self) -> list[LoopClosure]: + """Accepted loop edges, resolved to final optimized poses.""" + out: list[LoopClosure] = [] + for pair in self._accepted_loops: + src = self._key_poses[pair.source] + tgt = self._key_poses[pair.target] + out.append( + LoopClosure( + source=_pose3_to_transform( + src.optimized, + ts=src.timestamp, + frame_id=FRAME_WORLD_CORRECTED, + child_frame_id=FRAME_BODY, + ), + target=_pose3_to_transform( + tgt.optimized, + ts=tgt.timestamp, + frame_id=FRAME_WORLD_CORRECTED, + child_frame_id=FRAME_BODY, + ), + score=pair.score, + ) + ) + return out + + def _is_keyframe(self, local_pose: gtsam.Pose3) -> bool: + if not self._key_poses: + return True + delta = self._key_poses[-1].local.inverse().compose(local_pose) + delta_trans = float(np.linalg.norm(np.asarray(delta.translation()))) + # Rotation magnitude from trace: cos(theta) = (tr(R) - 1) / 2. + R = delta.rotation().matrix() + cos_theta = float(np.clip((np.trace(R) - 1.0) / 2.0, -1.0, 1.0)) + delta_deg = float(np.degrees(np.arccos(cos_theta))) + return ( + delta_trans > self._cfg.key_pose_delta_trans or delta_deg > self._cfg.key_pose_delta_deg + ) + + def _add_keyframe( + self, + local_pose: gtsam.Pose3, + ts: float, + body_cloud: PointCloud2, + ) -> None: + gtsam = self._gtsam + idx = len(self._key_poses) + optimized = self._world_correction.compose(local_pose) + + self._values.insert(idx, optimized) + + if idx == 0: + noise = gtsam.noiseModel.Diagonal.Variances(np.full(6, 1e-12)) + self._graph.add(gtsam.PriorFactorPose3(idx, optimized, noise)) + else: + last_local = self._key_poses[-1].local + between = last_local.inverse().compose(local_pose) + # When the prior keyframe is already in a drifted region, + # loosen the chain variance for this step so loop closures + # can pull harder across the disturbance. + prev = self._key_poses[-1] + prev_drift = float( + np.linalg.norm( + np.asarray(prev.optimized.translation()) - np.asarray(prev.local.translation()) + ) + ) + trans_var = 8e-4 if prev_drift > 1.3 else 1e-4 + noise = gtsam.noiseModel.Diagonal.Variances( + np.array([1e-6, 1e-6, 1e-6, trans_var, trans_var, 1e-6]) + ) + self._graph.add(gtsam.BetweenFactorPose3(idx - 1, idx, between, noise)) + + self._key_poses.append( + _KeyPose( + local=local_pose, + optimized=optimized, + timestamp=ts, + body_cloud=body_cloud, + ) + ) + + def _get_submap(self, idx: int, half_range: int) -> PointCloud2: + lo = max(0, idx - half_range) + hi = min(len(self._key_poses) - 1, idx + half_range) + if lo > hi: + return PointCloud2() + cloud = self._key_poses[lo].body_cloud.transform( + _pose3_to_transform(self._key_poses[lo].optimized, ts=self._key_poses[lo].timestamp) + ) + for i in range(lo + 1, hi + 1): + kp = self._key_poses[i] + cloud = cloud + kp.body_cloud.transform( + _pose3_to_transform(kp.optimized, ts=kp.timestamp) + ) + return cloud.voxel_downsample(self._cfg.submap_resolution) + + def _search_for_loops( + self, + cur_idx: int | None = None, + *, + enforce_time_gate: bool = True, + opt_radius: float | None = None, + force_fallback: bool = False, + no_fallback: bool = False, + submap_half_range: int | None = None, + icp_max_dist: float | None = None, + source_half_range: int = 0, + candidates_per_iter: int | None = None, + force_multi: bool = False, + time_thresh_override: float | None = None, + ) -> None: + if len(self._key_poses) < self._cfg.min_keyframes_for_loop_search: + return + if cur_idx is None: + cur_idx = len(self._key_poses) - 1 + + cur_kp = self._key_poses[cur_idx] + cur_ts = cur_kp.timestamp + if enforce_time_gate and ( + self._last_loop_ts is not None + and cur_ts - self._last_loop_ts < self._cfg.min_loop_detect_duration + ): + return + + cur_opt_t = np.asarray(cur_kp.optimized.translation()) + cur_loc_t = np.asarray(cur_kp.local.translation()) + + from scipy.spatial import KDTree + + # Search both optimized and local (odom) frames; exclude cur_idx + # from the candidate pool (and time-adjacent neighbors via the + # time gate filter inside _collect). + other_kps = [kp for j, kp in enumerate(self._key_poses) if j != cur_idx] + other_idx_map = [j for j in range(len(self._key_poses)) if j != cur_idx] + opt_positions = np.array([np.asarray(kp.optimized.translation()) for kp in other_kps]) + loc_positions = np.array([np.asarray(kp.local.translation()) for kp in other_kps]) + + def _collect(local_idxs: set[int], time_thresh: float) -> list[tuple[float, int]]: + # local_idxs index into other_kps; map back to global keyframe index. + cands = [] + for li in local_idxs: + gi = other_idx_map[li] + if abs(cur_ts - self._key_poses[gi].timestamp) <= time_thresh: + continue + d_opt = float( + np.linalg.norm( + np.asarray(self._key_poses[gi].optimized.translation()) - cur_opt_t + ) + ) + d_loc = float( + np.linalg.norm(np.asarray(self._key_poses[gi].local.translation()) - cur_loc_t) + ) + cands.append((min(d_opt, d_loc), gi)) + cands.sort() + return cands + + # Tight optimized-frame search first; if none survive the + # time-gap filter AND accumulated drift is large enough that we + # likely need a relocalization, fall back to wider local-frame + # search with a stricter time gap. + opt_radius_val = opt_radius if opt_radius is not None else self._cfg.loop_search_radius + opt_idxs = set(KDTree(opt_positions).query_ball_point(cur_opt_t, opt_radius_val)) + time_thresh = ( + time_thresh_override if time_thresh_override is not None else self._cfg.loop_time_thresh + ) + candidates = _collect(opt_idxs, time_thresh) + drift = float(np.linalg.norm(cur_opt_t - cur_loc_t)) + relocalizing = False + if not no_fallback and ( + force_fallback or (not candidates and drift > self._cfg.loop_fallback_drift_thresh) + ): + loc_idxs = set( + KDTree(loc_positions).query_ball_point( + cur_loc_t, self._cfg.loop_search_radius_local + ) + ) + candidates = _collect(loc_idxs - opt_idxs, self._cfg.loop_time_thresh_local) + relocalizing = True + logger.info( + "loop fallback fired", + cur_idx=cur_idx, + drift=round(drift, 2), + n_candidates=len(candidates), + ) + if not candidates: + return + + # Try top-K candidates; keep all that pass the threshold (for + # relocalization fallback, multiple loops to different points in + # the past give the optimizer more constraint to anchor the + # disturbed segment). For the regular non-fallback path keep + # only the best one — the trajectory there is well-constrained + # already and extra loops add noise. Early-exit on a very tight + # match to save redundant ICP runs in the common case. + source = self._get_submap(cur_idx, source_half_range) + accepted: list[tuple[int, Transform, float]] = [] + k = ( + candidates_per_iter + if candidates_per_iter is not None + else self._cfg.loop_candidates_per_iter + ) + for _, loop_idx in candidates[:k]: + target_hr = ( + submap_half_range + if submap_half_range is not None + else self._cfg.loop_submap_half_range + ) + target = self._get_submap(loop_idx, target_hr) + max_dist = ( + icp_max_dist if icp_max_dist is not None else self._cfg.max_icp_correspondence_dist + ) + icp_tf, fitness = _icp( + source, + target, + max_iter=self._cfg.max_icp_iterations, + max_dist=max_dist, + min_inliers=self._cfg.min_icp_inliers, + ) + if fitness <= self._cfg.loop_score_thresh: + accepted.append((loop_idx, icp_tf, fitness)) + if ( + not relocalizing + and not force_multi + and fitness <= self._cfg.loop_score_thresh_tight + ): + break + if not accepted: + return + accepted.sort(key=lambda x: x[2]) + keep = accepted if (relocalizing or force_multi) else [accepted[0]] + + for loop_idx, icp_tf, fitness in keep: + # icp_tf takes cur_kp.optimized -> refined pose (correcting the drift). + # offset = loop_kp.optimized^-1 * refined = relative pose from loop to cur. + icp_pose = _transform_to_pose3(icp_tf) + refined = icp_pose.compose(cur_kp.optimized) + offset = self._key_poses[loop_idx].optimized.between(refined) + + self._pending_loops.append( + _LoopPair( + source=cur_idx, + target=loop_idx, + offset=offset, + score=fitness, + relocalizing=relocalizing, + ) + ) + logger.info( + "Loop closure detected", + source=cur_idx, + target=loop_idx, + score=round(fitness, 4), + reloc=relocalizing, + ) + self._last_loop_ts = cur_ts + + def _smooth_and_update(self) -> None: + gtsam = self._gtsam + has_loop = bool(self._pending_loops) + + for pair in self._pending_loops: + # Pose3 noise is [rx, ry, rz, x, y, z]. The two halves have + # different units (rad² vs m²), so a uniform variance silently + # makes one half pathological. Use ICP fitness as the + # *translation* variance and a generous fixed rotation variance + # — loops shouldn't be trusted to fix rotation tightly without + # normals + p2plane. + trans_var = max(0.005, float(pair.score)) # >= sigma_trans ~7 cm + rot_var = 0.003 # sigma_rot ~ 3.1 deg (large submap p2plane ICP, tight rotation) + noise = gtsam.noiseModel.Diagonal.Variances( + np.array([rot_var, rot_var, rot_var, trans_var, trans_var, trans_var]) + ) + self._graph.add(gtsam.BetweenFactorPose3(pair.target, pair.source, pair.offset, noise)) + self._accepted_loops.extend(self._pending_loops) + self._pending_loops.clear() + + self._isam2.update(self._graph, self._values) + self._isam2.update() + if has_loop: + for _ in range(self._cfg.loop_closure_extra_iterations): + self._isam2.update() + self._graph = gtsam.NonlinearFactorGraph() + self._values = gtsam.Values() + + estimates = self._isam2.calculateBestEstimate() + for i in range(len(self._key_poses)): + self._key_poses[i].optimized = estimates.atPose3(i) + + last = self._key_poses[-1] + self._world_correction = last.optimized.compose(last.local.inverse()) + + +def _pose3_to_transform( + pose: gtsam.Pose3, + *, + ts: float, + frame_id: str = "", + child_frame_id: str = "", +) -> Transform: + """PGO-internal: build a Transform from a Pose3.""" + t = np.asarray(pose.translation()) + return Transform( + translation=Vector3(float(t[0]), float(t[1]), float(t[2])), + rotation=Quaternion.from_rotation_matrix(pose.rotation().matrix()), + frame_id=frame_id, + child_frame_id=child_frame_id, + ts=ts, + ) + + +def _transform_to_pose3(tf: Transform) -> gtsam.Pose3: + """PGO-internal: build a Pose3 from a Transform.""" + import gtsam # type: ignore[import-not-found,import-untyped] + + return gtsam.Pose3(tf.to_matrix()) + + +def _icp( + source: PointCloud2, + target: PointCloud2, + max_iter: int = 50, + max_dist: float = 1.0, + tol: float = 1e-6, + min_inliers: int = 10, + init: Transform | None = None, +) -> tuple[Transform, float]: + """Point-to-plane ICP using Open3D's tensor pipeline. + + Returns ``(tf, fitness)`` where ``fitness`` is mean squared inlier + distance (m^2) — the GTSAM noise model in `_smooth_and_update` uses + this directly as sigma_trans squared. On rejection (too few points or + zero correspondences) returns the identity transform and inf fitness. + """ + + if len(source) < min_inliers or len(target) < min_inliers: + return Transform.identity(), float("inf") + + src_pcd = source.pointcloud_tensor + tgt_pcd = target.pointcloud_tensor + + # Normals on the target enable point-to-plane ICP — converges tighter + # than point-to-point on indoor scenes (walls give unambiguous normals + # that resolve the slide-along-wall ambiguity). + tgt_pcd.estimate_normals(max_nn=30, radius=0.3) + + device = src_pcd.device + init_T = o3c.Tensor( + init.to_matrix() if init is not None else np.eye(4), + dtype=o3c.float64, + device=device, + ) + + # Silence Open3D's "0 correspondence" warning — we deliberately use a + # tight max_correspondence_distance and reject loops with poor fitness. + # Treat singular-system failures as rejection rather than aborting. + try: + with o3d.utility.VerbosityContextManager(o3d.utility.VerbosityLevel.Error): + result = o3d.t.pipelines.registration.icp( + source=src_pcd, + target=tgt_pcd, + max_correspondence_distance=max_dist, + init_source_to_target=init_T, + estimation_method=o3d.t.pipelines.registration.TransformationEstimationPointToPlane(), + criteria=o3d.t.pipelines.registration.ICPConvergenceCriteria( + relative_fitness=tol, + relative_rmse=tol, + max_iteration=max_iter, + ), + ) + except RuntimeError: + return Transform.identity(), float("inf") + + if float(result.fitness) == 0.0: + return Transform.identity(), float("inf") + + T_mat = result.transformation.numpy() + rmse = float(result.inlier_rmse) + return _transform_from_r_t(T_mat[:3, :3], T_mat[:3, 3], ts=source.ts), rmse * rmse diff --git a/dimos/mapping/utils/cli/assets/fastlio_lidar.png b/dimos/mapping/utils/cli/assets/fastlio_lidar.png new file mode 100644 index 0000000000..ee6bb6d17a --- /dev/null +++ b/dimos/mapping/utils/cli/assets/fastlio_lidar.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dc654b1c5837615fc14c0097f468c2337dcd17722ac41895081daf392666e62e +size 3000691 diff --git a/dimos/mapping/utils/cli/assets/go2_lidar.png b/dimos/mapping/utils/cli/assets/go2_lidar.png new file mode 100644 index 0000000000..2ce3d83882 --- /dev/null +++ b/dimos/mapping/utils/cli/assets/go2_lidar.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:816287e864438b4eb951df649685db66cac7b278a8d95f5326459e6bdca78ab9 +size 1589502 diff --git a/dimos/mapping/utils/cli/assets/markers_fastlio.png b/dimos/mapping/utils/cli/assets/markers_fastlio.png new file mode 100644 index 0000000000..6e637a5473 --- /dev/null +++ b/dimos/mapping/utils/cli/assets/markers_fastlio.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:79045486d46148c44718bb2eaf000dfc12cf421c4db4762ffaaf12011f36e2a7 +size 3320057 diff --git a/dimos/mapping/utils/cli/assets/markers_go2.png b/dimos/mapping/utils/cli/assets/markers_go2.png new file mode 100644 index 0000000000..c3e93c8b5a --- /dev/null +++ b/dimos/mapping/utils/cli/assets/markers_go2.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:5c007ae1b46039d008620feee867e512fb81d4d39f9b3f712a380fe15e007692 +size 2260861 diff --git a/dimos/mapping/utils/cli/dataset_validation.md b/dimos/mapping/utils/cli/dataset_validation.md new file mode 100644 index 0000000000..5f32853ddf --- /dev/null +++ b/dimos/mapping/utils/cli/dataset_validation.md @@ -0,0 +1,53 @@ +Dataset Validation + +```sh +dimos map summary recording_go2_mid360_2026-05-29_4-45pm-PST.db + +Stream("color_image"): 11141 items, 2026-05-29 23:32:57 — 2026-05-29 23:45:57 (780.1s) +Stream("fastlio_lidar"): 7240 items, 2026-05-29 23:32:56 — 2026-05-29 23:45:57 (781.7s) +Stream("fastlio_odometry"): 18737 items, 2026-05-29 23:32:56 — 2026-05-29 23:45:57 (781.8s) +Stream("lidar"): 6025 items, 2026-05-29 23:32:55 — 2026-05-29 23:45:57 (782.3s) +Stream("odom"): 14630 items, 2026-05-29 23:32:55 — 2026-05-29 23:45:57 (782.3s) +``` + +Shows which streams are in the database. You can replay messages in rerun: + +```sh +dimos map replay recording_go2_mid360_2026-05-29_4-45pm-PST.db --duration 60 +``` + + +```sh +dimos map global recording_go2_mid360_2026-05-29_4-45pm-PST_corrected --voxel 0.1 --lidar fastlio_lidar +``` + +validates that livox lidar observations have correct poses associated coming from livox odometry + +![output](assets/fastlio_lidar.png) + + +```sh +dimos map global recording_go2_mid360_2026-05-29_4-45pm-PST_corrected --voxel 0.1 --lidar lidar +``` + +validates that go2 lidar observations have correct poses associated coming from go2 odometry + +![output](assets/go2_lidar.png) + + +``` +dimos map global recording_go2_mid360_2026-05-29_4-45pm-PST_corrected --voxel 0.1 --lidar lidar --markers --pgo +``` + +validates that camera image has correct pose associated, `--pgo` is not needed but you can + +![output](assets/markers_go2.png) + + +```sh +dimos map global recording_go2_mid360_2026-05-29_4-45pm-PST_corrected --voxel 0.1 --lidar fastlio_lidar --markers --image-pose fastlio_odometry --duration 120 +``` + +Validates that transform between fastlio_odom and camera image is correct (below it isn't, we detect markers at the angle - mabye fastlio_odometry pose should be flat representing base_link not lidar mounting style) + +![output](assets/markers_fastlio.png) diff --git a/dimos/utils/cli/map.py b/dimos/mapping/utils/cli/map.py similarity index 63% rename from dimos/utils/cli/map.py rename to dimos/mapping/utils/cli/map.py index c162a015ef..7fa2d39cc7 100644 --- a/dimos/utils/cli/map.py +++ b/dimos/mapping/utils/cli/map.py @@ -17,6 +17,7 @@ from collections.abc import Callable, Iterable import math from pathlib import Path +import subprocess import time from typing import TYPE_CHECKING, Any @@ -160,8 +161,136 @@ def _progress(obs: Observation[Any]) -> None: return _progress +def _log_reconstruction( + *, + voxel: float, + global_map: PointCloud2 | None, + path: list[tuple[float, float, float]], + pgo_map: PointCloud2 | None, + full_pgo_map: PointCloud2 | None, + pgo_path: list[tuple[float, float, float]], + graph: PoseGraph | None, + marker_dets: list[Observation[Any]], + marker_size: float, +) -> None: + """Log maps, paths, the PGO graph, and markers to the active rerun recording.""" + from dimos.memory2.vis.color import Color + from dimos.msgs.geometry_msgs.Transform import Transform + + rr.send_blueprint(rrb.Blueprint(rrb.Spatial3DView(origin="world"))) + if global_map is not None: + rr.log("world/raw_map/pointcloud", global_map.to_rerun(voxel_size=voxel / 2), static=True) + if path: + rr.log( + "world/raw_map/path", + rr.LineStrips3D(strips=[path], colors=[[231, 76, 60]], radii=[PATH_THICKNESS]), + static=True, + ) + if pgo_map is not None: + rr.log("world/pgo_map/pointcloud", pgo_map.to_rerun(voxel_size=voxel / 2), static=True) + if full_pgo_map is not None: + rr.log( + "world/full_pgo_map/pointcloud", + full_pgo_map.to_rerun(voxel_size=voxel / 2), + static=True, + ) + if pgo_path: + rr.log( + "world/pgo_map/path", + rr.LineStrips3D(strips=[pgo_path], colors=[[255, 255, 255]], radii=[PATH_THICKNESS]), + static=True, + ) + rr.log( + "world/pgo_map/pgo/keyframes", + rr.Points3D(positions=pgo_path, colors=[[255, 0, 0]], radii=[0.025]), + static=True, + ) + if graph is not None and graph.loops: + loop_strips = [ + [ + (lc.source.translation.x, lc.source.translation.y, lc.source.translation.z), + (lc.target.translation.x, lc.target.translation.y, lc.target.translation.z), + ] + for lc in graph.loops + ] + rr.log( + "world/pgo_map/pgo/loop_closures", + rr.LineStrips3D(strips=loop_strips, colors=[[231, 76, 60]], radii=[0.025]), + static=True, + ) + if marker_dets: + half = marker_size / 2.0 + n = len(marker_dets) + fill_half = [(half, half, 0.005)] * n + # Outline sits just outside the fill so both stay visible. + outline_bump = marker_size * 0.05 + outline_half = [(half + outline_bump, half + outline_bump, 0.006)] * n + raw_centers = [(d.data.center.x, d.data.center.y, d.data.center.z) for d in marker_dets] + raw_quats = [ + (d.data.orientation.x, d.data.orientation.y, d.data.orientation.z, d.data.orientation.w) + for d in marker_dets + ] + # One entry per tracked marker session — color stable per track_id. + colors = [ + Color.from_cmap("tab10", (d.data.track_id % 10) / 10.0).rgb_u8() for d in marker_dets + ] + labels = [f"track={d.data.track_id} id={d.data.marker_id}" for d in marker_dets] + + _log_markers( + "world/raw_map/markers", + raw_centers, + raw_quats, + fill_half=fill_half, + outline_half=outline_half, + colors=colors, + labels=labels, + ) + + if graph is not None: + # PGO-correct each raw marker pose: lift it from world_raw into + # world_corrected so it lines up with pgo_map. + pgo_centers: list[tuple[float, float, float]] = [] + pgo_quats: list[tuple[float, float, float, float]] = [] + for d in marker_dets: + raw_tf = Transform( + translation=d.data.center, + rotation=d.data.orientation, + frame_id="world", + child_frame_id=f"marker_{d.data.marker_id}", + ts=d.ts, + ) + corrected = graph.correct(raw_tf) + pgo_centers.append( + (corrected.translation.x, corrected.translation.y, corrected.translation.z) + ) + pgo_quats.append( + ( + corrected.rotation.x, + corrected.rotation.y, + corrected.rotation.z, + corrected.rotation.w, + ) + ) + _log_markers( + "world/pgo_map/markers", + pgo_centers, + pgo_quats, + fill_half=fill_half, + outline_half=outline_half, + colors=colors, + labels=labels, + ) + + def main( dataset: str = typer.Argument(..., help="Dataset .db: bare name (cwd or data/) or path"), + lidar_stream: str = typer.Option( + "lidar", "--lidar", help="Lidar point-cloud stream to reconstruct" + ), + seek: float = typer.Option(0.0, "--seek", help="Skip the first N seconds of the recording"), + duration: float | None = typer.Option( + None, "--duration", help="Use only N seconds from --seek (default: to the end)" + ), voxel: float = typer.Option(0.05, "--voxel", help="Voxel size for the rebuild"), device: str = typer.Option( "CUDA:0", "--device", help="Open3D compute device (e.g. CUDA:0, CPU:0)" @@ -174,7 +303,7 @@ def main( pgo_tol: float = typer.Option( 0.3, "--pgo-tol", - help="Spatial dedup tolerance (meters); applies to both raw and --pgo maps", + help="Spatial dedup tolerance (meters); applies to both raw and --pgo maps. 0 disables dedup (keep every posed frame)", ), block_count: int = typer.Option( 2_000_000, "--block-count", help="VoxelBlockGrid capacity (raw and PGO rebuilds)" @@ -189,7 +318,10 @@ def main( "--full-pgo", help="Also build a full-replay PGO map (every frame) for comparison (implies --pgo)", ), - no_gui: bool = typer.Option(False, "--no-gui", help="Skip rerun visualization"), + out: Path | None = typer.Option( + None, "--out", help="Output .rrd path (default: ./.rrd)" + ), + no_gui: bool = typer.Option(False, "--no-gui", help="Write the .rrd but don't launch rerun"), markers: bool = typer.Option( False, "--markers", @@ -200,6 +332,12 @@ def main( "--camera-info", help="YAML calibration file for --markers; defaults to Go2 builtin", ), + image_pose: str | None = typer.Option( + None, + "--image-pose", + help="Re-pose color_image from this stream's pose (composed with the camera " + "optical mount) before marker detection, instead of the image's stored pose", + ), marker_size: float = typer.Option( 0.1, "--marker-size", help="Physical marker edge length in meters (--markers only)" ), @@ -224,25 +362,26 @@ def main( help="Sliding-window track buffer for marker pose averaging (s); 0 disables (one box per raw detection)", ), ) -> None: - """Rebuild a voxel map from a recorded SQLite dataset and view it in rerun.""" + """Rebuild a voxel map from a recorded SQLite dataset, write a .rrd, and open it in rerun.""" from dimos.mapping.loop_closure.pgo import PGO from dimos.memory2.store.sqlite import SqliteStore from dimos.memory2.transform import QualityWindow, SpeedLimit - from dimos.memory2.vis.color import Color - from dimos.msgs.geometry_msgs.Transform import Transform from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo from dimos.msgs.sensor_msgs.Image import Image + from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 from dimos.perception.fiducial.marker_transformer import DetectMarkers - from dimos.robot.unitree.go2.connection import _camera_info_static + from dimos.robot.unitree.go2.connection import BASE_TO_OPTICAL, _camera_info_static from dimos.utils.data import resolve_named_path from dimos.visualization.rerun.init import rerun_init db_path = resolve_named_path(dataset, ".db") + if out is None: + out = Path.cwd() / f"{db_path.stem}.rrd" if export or full_pgo: pgo = True store = SqliteStore(path=db_path) - lidar = store.streams.lidar + lidar = store.stream(lidar_stream, PointCloud2).from_time(seek or None).to_time(duration) print(lidar.summary()) @@ -250,9 +389,10 @@ def main( # Spatial dedup: bucket frames by 3D cell using the raw pose, keep the # latest per cell. Shared by raw and PGO rebuilds. Doesn't touch obs.data - # so it stays cheap (no pointcloud loading). - seen: dict[tuple[int, int, int], Observation[Any]] = {} - for obs in lidar: + # so it stays cheap (no pointcloud loading). With pgo_tol<=0 the bucketing + # is disabled and every posed frame is kept (keyed by index). + seen: dict[Any, Observation[Any]] = {} + for i, obs in enumerate(lidar): pose = obs.pose if pose is None: continue @@ -260,15 +400,25 @@ def main( # Same condition as pgo_keyframes so dedup and PGO see the same frames. if pose.position.is_zero() or pose.orientation.is_zero(): continue - t = pose.position - # math.floor so negative coords bucket consistently; int() truncates - # toward zero and silently folds -0.5 and 0.5 into the same cell. - cell = (math.floor(t.x / pgo_tol), math.floor(t.y / pgo_tol), math.floor(t.z / pgo_tol)) - seen[cell] = obs + if pgo_tol > 0: + t = pose.position + # math.floor so negative coords bucket consistently; int() truncates + # toward zero and silently folds -0.5 and 0.5 into the same cell. + key: Any = ( + math.floor(t.x / pgo_tol), + math.floor(t.y / pgo_tol), + math.floor(t.z / pgo_tol), + ) + else: + key = i + seen[key] = obs n_kept = len(seen) pct = 100 * n_kept / total if total else 0 - print(f"dedup: kept [{n_kept}/{total}] frames ({pct:.1f}%) at tol={pgo_tol}m") + if pgo_tol > 0: + print(f"dedup: kept [{n_kept}/{total}] frames ({pct:.1f}%) at tol={pgo_tol}m") + else: + print(f"dedup: disabled, kept all [{n_kept}/{total}] posed frames") # Dict insertion order = lidar iteration order = chronological. # `seen` only contains entries with non-None poses (filtered above). @@ -323,9 +473,19 @@ def main( if markers: # Image observations in dimos recordings are stamped with # frame_id="camera_optical", so obs.pose is already optical-in-world - # (verified: matches lidar_base_pose + BASE_TO_OPTICAL to ~1mm). - # No mount composition needed. - color_image = store.stream("color_image", Image) + # (verified: matches lidar_base_pose + BASE_TO_OPTICAL to ~1mm). With + # --image-pose, swap that stored pose for a different source (e.g. + # fastlio_odometry), composing the base→optical mount onto it first. + color_image = store.stream("color_image", Image).from_time(seek or None).to_time(duration) + n_images = color_image.count() + if image_pose is not None: + from dimos.mapping.utils.cli.pose_fill import pose_fill + + src_pose: Stream[Any] = ( + store.stream(image_pose).from_time(seek or None).to_time(duration) + ) + print(f"re-posing color_image from {image_pose!r} + camera optical mount") + color_image = pose_fill(color_image, src_pose, tolerance=0.1, mount=BASE_TO_OPTICAL) cam_info = CameraInfo.from_yaml(str(camera_info)) if camera_info else _camera_info_static() xf = DetectMarkers( camera_info=cam_info, @@ -334,9 +494,9 @@ def main( ) # Keep the sharpest frame per --marker-quality-window window, then # drop frames where the robot was moving (linear + rotational) faster - # than the limits. Defaults match markers_rrd.py so positions agree. + # than the limits. Defaults match replay_marker.py so positions agree. pipeline: Stream[Image] = color_image.tap( - progress(color_image.count(), "detecting markers") + progress(n_images, "detecting markers") ).transform(QualityWindow(lambda img: img.sharpness, window=marker_quality_window)) if marker_max_speed > 0: pipeline = pipeline.transform( @@ -361,125 +521,24 @@ def main( f"across {len(unique_ids)} unique ids {unique_ids}" ) - if not no_gui: - rerun_init("dimos map tool", spawn=True) - rr.send_blueprint(rrb.Blueprint(rrb.Spatial3DView(origin="world"))) - if global_map is not None: - rr.log( - "world/raw_map/pointcloud", global_map.to_rerun(voxel_size=voxel / 2), static=True - ) - if path: - rr.log( - "world/raw_map/path", - rr.LineStrips3D(strips=[path], colors=[[231, 76, 60]], radii=[PATH_THICKNESS]), - static=True, - ) - if pgo_map is not None: - rr.log("world/pgo_map/pointcloud", pgo_map.to_rerun(voxel_size=voxel / 2), static=True) - if full_pgo_map is not None: - rr.log( - "world/full_pgo_map/pointcloud", - full_pgo_map.to_rerun(voxel_size=voxel / 2), - static=True, - ) - if pgo_path: - rr.log( - "world/pgo_map/path", - rr.LineStrips3D( - strips=[pgo_path], colors=[[255, 255, 255]], radii=[PATH_THICKNESS] - ), - static=True, - ) - rr.log( - "world/pgo_map/pgo/keyframes", - rr.Points3D(positions=pgo_path, colors=[[255, 0, 0]], radii=[0.025]), - static=True, - ) - if graph is not None and graph.loops: - loop_strips = [ - [ - (lc.source.translation.x, lc.source.translation.y, lc.source.translation.z), - (lc.target.translation.x, lc.target.translation.y, lc.target.translation.z), - ] - for lc in graph.loops - ] - rr.log( - "world/pgo_map/pgo/loop_closures", - rr.LineStrips3D(strips=loop_strips, colors=[[231, 76, 60]], radii=[0.025]), - static=True, - ) - if marker_dets: - half = marker_size / 2.0 - n = len(marker_dets) - fill_half = [(half, half, 0.005)] * n - # Outline sits just outside the fill so both stay visible. - outline_bump = marker_size * 0.05 - outline_half = [(half + outline_bump, half + outline_bump, 0.006)] * n - raw_centers = [(d.data.center.x, d.data.center.y, d.data.center.z) for d in marker_dets] - raw_quats = [ - ( - d.data.orientation.x, - d.data.orientation.y, - d.data.orientation.z, - d.data.orientation.w, - ) - for d in marker_dets - ] - # One entry per tracked marker session — color stable per track_id. - colors = [ - Color.from_cmap("tab10", (d.data.track_id % 10) / 10.0).rgb_u8() - for d in marker_dets - ] - labels = [f"track={d.data.track_id} id={d.data.marker_id}" for d in marker_dets] - - _log_markers( - "world/raw_map/markers", - raw_centers, - raw_quats, - fill_half=fill_half, - outline_half=outline_half, - colors=colors, - labels=labels, - ) - - if graph is not None: - # PGO-correct each raw marker pose: lift it from world_raw - # into world_corrected so it lines up with pgo_map. - pgo_centers: list[tuple[float, float, float]] = [] - pgo_quats: list[tuple[float, float, float, float]] = [] - for d in marker_dets: - raw_tf = Transform( - translation=d.data.center, - rotation=d.data.orientation, - frame_id="world", - child_frame_id=f"marker_{d.data.marker_id}", - ts=d.ts, - ) - corrected = graph.correct(raw_tf) - pgo_centers.append( - ( - corrected.translation.x, - corrected.translation.y, - corrected.translation.z, - ) - ) - pgo_quats.append( - ( - corrected.rotation.x, - corrected.rotation.y, - corrected.rotation.z, - corrected.rotation.w, - ) - ) - _log_markers( - "world/pgo_map/markers", - pgo_centers, - pgo_quats, - fill_half=fill_half, - outline_half=outline_half, - colors=colors, - labels=labels, - ) + rerun_init("dimos map tool") + rr.save(str(out)) + _log_reconstruction( + voxel=voxel, + global_map=global_map, + path=path, + pgo_map=pgo_map, + full_pgo_map=full_pgo_map, + pgo_path=pgo_path, + graph=graph, + marker_dets=marker_dets, + marker_size=marker_size, + ) + print(f"wrote {out}") + if no_gui: + print(f"open with: rerun {out}") + else: + subprocess.Popen(["rerun", str(out)]) if export and pgo_map is not None: out_path = Path.cwd() / f"{db_path.stem}.pc2.lcm" diff --git a/dimos/mapping/utils/cli/pose_fill.py b/dimos/mapping/utils/cli/pose_fill.py new file mode 100644 index 0000000000..1a775bcd74 --- /dev/null +++ b/dimos/mapping/utils/cli/pose_fill.py @@ -0,0 +1,202 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Stamp observations with poses pulled from another stream by nearest timestamp. + +Some recorders store raw sensor streams (``lidar``, images) without baking the +trajectory into each frame — the pose lives in a separate odometry stream +(``odom``, ``fastlio_odometry``). PGO and the voxel rebuild skip pose-less +frames, so such a recording yields an empty map. :func:`pose_fill` re-attaches +poses by nearest-in-time match (via :meth:`Stream.align`) so those tools work +again. Frames stay in whatever coordinate frame they were stored in — only the +pose *metadata* is added. + +:func:`pose_fill_db` runs the same fill while copying a whole SQLite dataset to +a new file, baking the pulled poses into the target stream. +""" + +from __future__ import annotations + +from collections.abc import Iterable +from pathlib import Path +from typing import TYPE_CHECKING, Any, cast + +import typer + +# Heavy dimos imports are deferred (TYPE_CHECKING / inside functions) so that +# `dimos map --help` stays fast. See test_cli_startup.py. +if TYPE_CHECKING: + from dimos.memory2.backend import Backend + from dimos.memory2.stream import Stream + from dimos.memory2.type.observation import Observation + from dimos.msgs.geometry_msgs.Transform import Transform + + +def pose_fill( + stream: Stream[Any], + pose_stream: Stream[Any], + *, + tolerance: float = 0.1, + mount: Transform | None = None, +) -> Stream[Any]: + """Re-pose each observation in *stream* from the nearest entry in *pose_stream*. + + Pairs are formed by :meth:`Stream.align` (nearest ``|Δts| <= tolerance``); + primaries with no match in tolerance are dropped. The pose is read from the + matched pose observation's *payload* (``PoseStamped`` / ``Odometry`` / any + object exposing ``.position`` + ``.orientation``) because pose-message + streams carry the pose in the value, not the indexed pose columns. The + target stream's payload stays lazy. + + *mount* composes a static child transform onto each source pose + (``world_base ∘ mount``) before attaching — e.g. ``base_link → + camera_optical`` so a base-frame odometry source yields an optical-frame + image pose. + """ + from dimos.memory2.type.observation import _to_tuple + from dimos.msgs.geometry_msgs.Quaternion import Quaternion + from dimos.msgs.geometry_msgs.Transform import Transform + from dimos.msgs.geometry_msgs.Vector3 import Vector3 + + def _fill(pair_obs: Observation[Any]) -> Observation[Any]: + primary, secondary = cast( + "tuple[Observation[Any], Observation[Any]]", pair_obs.data + ) # AlignedPair(primary, secondary) + if mount is None: + return primary.with_pose(secondary.data) + x, y, z, qx, qy, qz, qw = cast("tuple[float, ...]", _to_tuple(secondary.data)) + world_base = Transform(translation=Vector3(x, y, z), rotation=Quaternion(qx, qy, qz, qw)) + return primary.with_pose(world_base + mount) + + return stream.align(pose_stream, tolerance=tolerance).map(_fill) + + +def pose_fill_db( + src_path: str | Path, + dest_path: str | Path, + *, + target: str = "lidar", + pose_source: str = "odom", + tolerance: float = 0.1, + streams: list[str] | None = None, + seek: float = 0.0, + duration: float | None = None, +) -> dict[str, int]: + """Copy a SQLite dataset to *dest_path*, baking *pose_source* poses into *target*. + + Every stream in *streams* (default: all streams in the source) is copied + with its original payload type and codec. Blobs are copied **verbatim** — + the encoded bytes are moved as-is, never decoded and re-encoded — so lossy + codecs (e.g. jpeg images) keep their original quality. Only the *target* + stream's pose metadata is rewritten, to the nearest *pose_source* pose (the + rtree spatial index is rebuilt from it on insert). Returns a per-stream + count of observations written. + """ + from dimos.memory2.store.sqlite import SqliteStore + + dest_p = Path(dest_path) + if dest_p.exists(): + raise FileExistsError( + f"{dest_p} already exists — refusing to append into it (stale rows would " + "collide on ts). Delete it (and any -wal/-shm sidecars) first." + ) + + src = SqliteStore(path=str(src_path), must_exist=True) + dest = SqliteStore(path=str(dest_path)) + names = streams if streams is not None else src.list_streams() + if target not in names: + raise ValueError(f"target stream {target!r} not in {names}") + if pose_source not in src.list_streams(): + raise ValueError(f"pose_source stream {pose_source!r} not found in source dataset") + + written: dict[str, int] = {} + for name in names: + src_b = cast("Backend[Any]", src.stream(name)._source) + dest_b = cast("Backend[Any]", dest.stream(name, src_b.data_type, codec=src_b.codec)._source) + + # Re-posed observations for the target (ids preserved from source, so + # the source blob still resolves); plain metadata for everything else. + if name == target: + rows: Iterable[Observation[Any]] = pose_fill( + src.stream(name).from_time(seek or None).to_time(duration).order_by("ts"), + src.stream(pose_source).from_time(seek or None).to_time(duration).order_by("ts"), + tolerance=tolerance, + ) + else: + rows = src.stream(name).from_time(seek or None).to_time(duration).order_by("ts") + + scalar = src_b.data_type in (int, float) + n = 0 + for obs in rows: + blob = ( + None if scalar or src_b.blob_store is None else src_b.blob_store.get(name, obs.id) + ) + row_id = dest_b.metadata_store.insert(obs) + if blob is not None: + assert dest_b.blob_store is not None + dest_b.blob_store.put(name, row_id, blob) + n += 1 + commit = getattr(dest_b.metadata_store, "commit", None) + if commit is not None: + commit() + written[name] = n + + src.stop() + dest.stop() + return written + + +def main( + dataset: str = typer.Argument(..., help="Source .db: bare name (cwd or data/) or path"), + out: str | None = typer.Option( + None, "--out", help="Output .db path (default: _posed.db beside the source)" + ), + target: str = typer.Option("lidar", "--target", help="Stream to bake poses into"), + pose_source: str = typer.Option( + "odom", "--pose-source", help="Stream to read the nearest poses from" + ), + tolerance: float = typer.Option(0.1, "--tolerance", help="Max |Δts| for a pose match (s)"), + streams: str | None = typer.Option( + None, "--streams", help="Comma-separated subset of streams to copy (default: all)" + ), + seek: float = typer.Option(0.0, "--seek", help="Skip the first N seconds of the recording"), + duration: float | None = typer.Option( + None, "--duration", help="Use only N seconds from --seek (default: to the end)" + ), +) -> None: + """Copy a recording, baking nearest pose_source poses into the target stream.""" + from dimos.utils.data import resolve_named_path + + src = resolve_named_path(dataset, ".db") + dest = Path(out) if out else src.with_name(f"{src.stem}_posed.db") + names = [s.strip() for s in streams.split(",")] if streams else None + print(f"pose-filling {src.name}: {target!r} <- nearest {pose_source!r} (±{tolerance}s)") + written = pose_fill_db( + src, + dest, + target=target, + pose_source=pose_source, + tolerance=tolerance, + streams=names, + seek=seek, + duration=duration, + ) + for name, n in written.items(): + print(f" {name}: {n}") + print(f"wrote {dest}") + print(f"now run: dimos map global {dest.stem} --pgo") + + +if __name__ == "__main__": + typer.run(main) diff --git a/dimos/mapping/utils/cli/rename.py b/dimos/mapping/utils/cli/rename.py new file mode 100644 index 0000000000..95c3f671b1 --- /dev/null +++ b/dimos/mapping/utils/cli/rename.py @@ -0,0 +1,181 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Copy a memory2 sqlite recording into a new file, renaming streams. + +Iterates each source stream and re-appends every observation under its new +name in a fresh destination db. Slower than in-place ``ALTER TABLE`` but +forces a full re-read of every row, so any pre-existing corruption surfaces +immediately. Streams not mentioned in ``--rename`` are copied verbatim. + +Usage: + uv run python -m dimos.mapping.utils.cli.rename mid360 \\ + --out mid360_renamed.db \\ + --rename go2_lidar=lidar \\ + --rename lidar=fastlio_lidar \\ + --rename odometry=fastlio_odometry +""" + +from __future__ import annotations + +from collections.abc import Callable +import json +from pathlib import Path +import sqlite3 +import time +from typing import Any + +import typer + +from dimos.memory2.codecs.base import _resolve_payload_type +from dimos.memory2.store.sqlite import SqliteStore +from dimos.memory2.stream import Stream +from dimos.memory2.type.observation import Observation +from dimos.utils.data import resolve_named_path + + +def progress(total: int, label: str = "") -> Callable[[Observation[Any]], None]: + """Matches dimos/mapping/utils/cli/map.py:progress — kept inline to avoid pulling rerun.""" + seen = 0 + wall_start: float | None = None + last_wall: float | None = None + first_ts: float | None = None + + def _progress(obs: Observation[Any]) -> None: + nonlocal seen, wall_start, last_wall, first_ts + now = time.monotonic() + if wall_start is None: + wall_start = now + first_ts = obs.ts + assert first_ts is not None + frame_ms = (now - last_wall) * 1000 if last_wall is not None else 0.0 + last_wall = now + seen += 1 + pct = 100 * seen // total if total else 100 + wall = now - wall_start + data = obs.ts - first_ts + speed = data / wall if wall > 0 else 0.0 + end = "\n" if seen >= total else "" + prefix = f"{label} " if label else "" + print( + f"\r{prefix}{pct:>3}% [{seen}/{total}] {data:.1f}s ({speed:.1f} x rt) {frame_ms:.0f}ms/frame", + end=end, + flush=True, + ) + + return _progress + + +def _stream_payload_types(db_path: Path) -> dict[str, type]: + """Read each stream's registered payload type from the _streams table.""" + conn = sqlite3.connect(str(db_path)) + try: + rows = conn.execute("SELECT name, config FROM _streams").fetchall() + finally: + conn.close() + return {name: _resolve_payload_type(json.loads(cfg)["payload_module"]) for name, cfg in rows} + + +def _parse_renames(pairs: list[str]) -> dict[str, str]: + """Parse ``OLD=NEW`` pairs into a dict; reject malformed or duplicate OLDs.""" + out: dict[str, str] = {} + for raw in pairs: + if "=" not in raw: + raise typer.BadParameter(f"--rename must be OLD=NEW, got {raw!r}") + old, new = raw.split("=", 1) + old, new = old.strip(), new.strip() + if not old or not new: + raise typer.BadParameter(f"--rename has empty side in {raw!r}") + if old in out: + raise typer.BadParameter(f"--rename {old!r} specified more than once") + out[old] = new + return out + + +def main( + dataset: str = typer.Argument(..., help="Source .db: bare name (cwd or data/) or path"), + out: Path = typer.Option(..., "--out", help="Output renamed .db path (must not exist)"), + rename: list[str] = typer.Option( + [], + "--rename", + help="OLD=NEW rename pair (can be passed multiple times); unmapped streams pass through", + ), + drop: list[str] = typer.Option( + [], "--drop", help="Stream name to omit from output (can be passed multiple times)" + ), + seek: float = typer.Option(0.0, "--seek", help="Skip the first N seconds of the recording"), + duration: float | None = typer.Option( + None, "--duration", help="Use only N seconds from --seek (default: to the end)" + ), +) -> None: + """Copy a recording into a new .db, renaming and/or dropping streams.""" + src_path = resolve_named_path(dataset, ".db") + if out.exists(): + raise typer.BadParameter(f"Output already exists: {out}") + + print(f"analizing dataset {src_path}") + rename_map = _parse_renames(rename) + payload_types = _stream_payload_types(src_path) + + drop_set = set(drop) + missing = sorted((set(rename_map) | drop_set) - set(payload_types)) + if missing: + raise typer.BadParameter( + f"--rename / --drop refers to streams not in source: {missing}. " + f"Available: {sorted(payload_types)}" + ) + overlap = drop_set & set(rename_map) + if overlap: + raise typer.BadParameter(f"stream(s) in both --rename and --drop: {sorted(overlap)}") + + kept = {name: rename_map.get(name, name) for name in payload_types if name not in drop_set} + seen: dict[str, str] = {} + for src_name, dst_name in kept.items(): + if dst_name in seen: + raise typer.BadParameter( + f"name collision: {seen[dst_name]!r} and {src_name!r} both map to {dst_name!r}" + ) + seen[dst_name] = src_name + + print("rename plan:") + for src_name in payload_types: + if src_name in drop_set: + print(f" {src_name:>16s} ✗ (dropped)") + else: + dst_name = kept[src_name] + arrow = "→" if src_name != dst_name else " " + print(f" {src_name:>16s} {arrow} {dst_name}") + print() + + src = SqliteStore(path=str(src_path)) + with src: + dst = SqliteStore(path=str(out)) + with dst: + for src_name, dst_name in kept.items(): + ptype = payload_types[src_name] + src_s: Stream[Any] = ( + src.stream(src_name, ptype).from_time(seek or None).to_time(duration) + ) + dst_s: Stream[Any] = dst.stream(dst_name, ptype) + total = src_s.count() + cb = progress(total, f"{dst_name:>16s}") + for obs in src_s: + dst_s.append(obs.data, ts=obs.ts, pose=obs.pose, tags=obs.tags or None) + cb(obs) + + print(f"\nwrote {out}") + + +if __name__ == "__main__": + typer.run(main) diff --git a/dimos/mapping/utils/cli/replay.py b/dimos/mapping/utils/cli/replay.py new file mode 100644 index 0000000000..4228f6cc3c --- /dev/null +++ b/dimos/mapping/utils/cli/replay.py @@ -0,0 +1,378 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dump a recorded dataset to .rrd: lidar point clouds + camera frames. + +Lidar clouds are assumed to be in world frame and logged directly under +their entity path (no parent transform). Entities written: + +- ``world/lidar`` — Go2 L1 per-frame point cloud +- ``world/fastlio_lidar`` — fastlio_lidar raw cloud (if present) +- ``world/_voxels`` — growing voxel map, one per PointCloud2 stream (``--map``) +- ``world/_map`` — single static voxel map, one per PointCloud2 stream (``--map-final``) +- ``world/fastlio`` — fastlio_odometry pose axis (if present) +- ``world/fastlio_path`` — fastlio_odometry trajectory (growing LineStrips3D) +- ``world/odom`` — Go2 onboard odom pose axis (if present) +- ``world/odom_path`` — Go2 onboard odom trajectory (growing LineStrips3D) +- ``world/camera`` — color_image camera pose (static pinhole + Transform3D) +- ``world/camera/image`` — color_image frames + +Usage: + uv run python -m dimos.mapping.utils.cli.replay mid360 --out map.rrd + uv run python -m dimos.mapping.utils.cli.replay mid360 --out map.rrd --map + uv run python -m dimos.mapping.utils.cli.replay mid360 --out map.rrd --map-final + rerun map.rrd +""" + +from __future__ import annotations + +from collections.abc import Callable +from pathlib import Path +import subprocess +import time +from typing import TYPE_CHECKING, Any + +import rerun as rr +import typer + +# Heavy dimos imports (mapping/memory2 → torch, scipy, open3d) are deferred into +# main() so that `dimos map --help` stays fast. See test_cli_startup.py and the +# same pattern in dimos/mapping/utils/cli/map.py. +if TYPE_CHECKING: + from dimos.memory2.stream import Stream + from dimos.memory2.type.observation import Observation + from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 + +TIMELINE = "ts" + + +def _progress(total: int, label: str) -> Callable[[Observation[Any]], None]: + """Matches dimos/mapping/utils/cli/map.py:progress.""" + seen = 0 + wall_start: float | None = None + last_wall: float | None = None + first_ts: float | None = None + + def tick(obs: Observation[Any]) -> None: + nonlocal seen, wall_start, last_wall, first_ts + now = time.monotonic() + if wall_start is None: + wall_start = now + first_ts = obs.ts + assert first_ts is not None + frame_ms = (now - last_wall) * 1000 if last_wall is not None else 0.0 + last_wall = now + seen += 1 + pct = 100 * seen // total if total else 100 + wall = now - wall_start + data = obs.ts - first_ts + speed = data / wall if wall > 0 else 0.0 + end = "\n" if seen >= total else "" + print( + f"\r{label} {pct:>3}% [{seen}/{total}] {data:.1f}s ({speed:.1f} x rt) {frame_ms:.0f}ms/frame", + end=end, + flush=True, + ) + + return tick + + +def _log_clouds( + label: str, + stream: Stream[PointCloud2], + entity: str, + voxel: float, + point_mode: str, + *, + total: int | None = None, +) -> None: + """Iterate a PointCloud2 stream and log each obs to ``entity``. + + ``total`` overrides the progress denominator — useful for transform + pipelines where calling :py:meth:`Stream.count` would materialize the + whole pipeline. + """ + n = total if total is not None else stream.count() + cb = _progress(n, label) + for obs in stream: + cb(obs) + rr.set_time(TIMELINE, timestamp=obs.ts) + rr.log(entity, obs.data.to_rerun(voxel_size=voxel, mode=point_mode)) + + +def _log_path( + label: str, + stream: Stream[Any], + entity: str, + color: tuple[int, int, int], + *, + emit_every: int = 10, +) -> None: + """Iterate a pose-bearing stream and log a growing :class:`LineStrips3D` to + ``entity`` every ``emit_every`` poses (and once more at the end). Frames + without a pose are skipped. + """ + n = stream.count() + cb = _progress(n, label) + points: list[tuple[float, float, float]] = [] + last_ts: float | None = None + emit_count = 0 + for obs in stream: + cb(obs) + if obs.pose_tuple is None: + continue + points.append( + (float(obs.pose_tuple[0]), float(obs.pose_tuple[1]), float(obs.pose_tuple[2])) + ) + last_ts = obs.ts + emit_count += 1 + if emit_every > 0 and emit_count % emit_every == 0 and len(points) >= 2: + rr.set_time(TIMELINE, timestamp=obs.ts) + rr.log(entity, rr.LineStrips3D([points], colors=[color])) + if ( + last_ts is not None + and len(points) >= 2 + and (emit_every <= 0 or emit_count % emit_every != 0) + ): + rr.set_time(TIMELINE, timestamp=last_ts) + rr.log(entity, rr.LineStrips3D([points], colors=[color])) + + +def main( + dataset: str = typer.Argument(..., help="Dataset .db: bare name (cwd or data/) or path"), + out: Path | None = typer.Option( + None, "--out", help="Output .rrd path (default: ./.rrd)" + ), + no_gui: bool = typer.Option(False, "--no-gui", help="Don't launch rerun on the result"), + seek: float = typer.Option(0.0, "--seek", help="Skip the first N seconds of the recording"), + duration: float | None = typer.Option( + None, "--duration", help="Use only N seconds from --seek (default: to the end)" + ), + voxel: float = typer.Option( + 0.05, + "--voxel", + help="Voxel grid resolution (m) for --map/--map-final; rendering follows the same size", + ), + point_mode: str = typer.Option( + "spheres", "--point-mode", help="Render mode: 'spheres', 'boxes', or 'points'" + ), + camera_hz: float = typer.Option( + 0.0, + "--camera-hz", + help="Throttle color_image to at most this rate; 0 (default) logs all frames", + ), + map: bool = typer.Option( + False, + "--map", + help="Accumulate each lidar stream into a VoxelGrid, logging a growing map over the timeline", + ), + map_final: bool = typer.Option( + False, + "--map-final", + help="Log a single static accumulated map of the whole recording (independent of --map)", + ), + map_source: list[str] = typer.Option( + [], + "--map-source", + help="PointCloud2 stream(s) to map; repeatable. Default: all PointCloud2 streams", + ), + map_carve_columns: bool = typer.Option( + False, + "--map-carve-columns/--no-map-carve-columns", + help="Clear the full Z column under each new voxel, keeping only the latest surface " + "(good for forward-facing lidar like the Go2 L1); --map/--map-final only", + ), + map_device: str = typer.Option( + "CUDA:0", "--map-device", help="Open3D device for the VoxelGrid; --map/--map-final only" + ), + map_emit_every: int = typer.Option( + 10, + "--map-emit-every", + help="Emit accumulated map every N frames (0 = only at end); --map only", + ), +) -> None: + """Dump a recording to .rrd (lidar clouds + camera frames) and open it in rerun.""" + from dimos.mapping.utils.cli.summary import _stream_payload_types + from dimos.mapping.voxels import VoxelMapTransformer + from dimos.memory2.store.sqlite import SqliteStore + from dimos.memory2.transform import throttle + from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped + from dimos.msgs.nav_msgs.Odometry import Odometry + from dimos.msgs.sensor_msgs.Image import Image + from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2, register_colormap_annotation + from dimos.robot.unitree.go2.connection import _camera_info_static + from dimos.utils.data import resolve_named_path + + db_path = resolve_named_path(dataset, ".db") + if out is None: + out = Path.cwd() / f"{db_path.stem}.rrd" + cam_info = _camera_info_static() + + # Resolve which streams to voxelize: all PointCloud2 streams, or the + # explicit --map-source subset. Validate up front so typos fail fast. + pc_streams = [n for n, t in _stream_payload_types(db_path).items() if t is PointCloud2] + map_sources = list(map_source) or pc_streams + if (map or map_final) and (bad := [s for s in map_sources if s not in pc_streams]): + raise typer.BadParameter(f"--map-source: not PointCloud2 stream(s): {', '.join(bad)}") + + rr.init("dimos map_rrd", recording_id=db_path.stem) + rr.save(str(out)) + register_colormap_annotation("turbo") + + # Static pinhole on the camera entity; per-frame Transform3D goes on the + # same entity. Image is the child so it projects through the pinhole. + pinhole = cam_info.to_rerun() + assert not isinstance(pinhole, list) + rr.log("world/camera", pinhole, static=True) + + # Static axis triads as children of each moving Transform3D, so the + # transforms are actually visible in the 3D view. + axes = rr.Arrows3D( + vectors=[[0.3, 0, 0], [0, 0.3, 0], [0, 0, 0.3]], + colors=[[255, 0, 0], [0, 255, 0], [0, 0, 255]], + ) + rr.log("world/fastlio/axes", axes, static=True) + rr.log("world/odom/axes", axes, static=True) + + store = SqliteStore(path=str(db_path)) + with store: + print(store.summary()) + + def clipped(name: str, ptype: type[Any]) -> Stream[Any]: + return store.stream(name, ptype).from_time(seek or None).to_time(duration) + + lidar = clipped("lidar", PointCloud2) + color_image = clipped("color_image", Image) + has_livox = "fastlio_lidar" in store.streams + livox = clipped("fastlio_lidar", PointCloud2) if has_livox else None + + # Per-frame raw clouds. + _log_clouds(" lidar", lidar, "world/lidar", voxel, point_mode) + if livox is not None: + _log_clouds("fastlio_lidar", livox, "world/fastlio_lidar", voxel, point_mode) + + # Accumulated voxel maps over the selected PointCloud2 streams. + # --map logs a growing map per stream; --map-final logs one static map + # per stream. --map-carve-columns clears the Z column under each surface + # voxel (good for forward-facing lidar like the Go2 L1); off by default. + if map or map_final: + grid_kwargs = {"voxel_size": voxel, "device": map_device, "show_startup_log": False} + for name in map_sources: + src = clipped(name, PointCloud2) + if not src.exists(): + continue + if map: + _log_clouds( + f"{name}_voxels", + src.transform( + VoxelMapTransformer( + emit_every=map_emit_every, + carve_columns=map_carve_columns, + **grid_kwargs, + ) + ), + f"world/{name}_voxels", + voxel / 4, # render smaller than the grid → gaps read as transparency + point_mode, + total=max(1, src.count() // max(map_emit_every, 1)), + ) + if map_final: + # emit_every=0 → one accumulated obs at exhaustion + final = src.transform( + VoxelMapTransformer( + emit_every=0, carve_columns=map_carve_columns, **grid_kwargs + ) + ).last() + rr.log( + f"world/{name}_map", + final.data.to_rerun(voxel_size=voxel / 4, mode=point_mode), + static=True, + ) + + # fastlio pose axis + path from fastlio_odometry stream. + if "fastlio_odometry" in store.streams: + odometry = clipped("fastlio_odometry", Odometry) + cb = _progress(odometry.count(), "fastlio_odometry") + for obs in odometry: + cb(obs) + if obs.pose_tuple is None: + continue + rr.set_time(TIMELINE, timestamp=obs.ts) + x, y, z, qx, qy, qz, qw = obs.pose_tuple + rr.log( + "world/fastlio", + rr.Transform3D( + translation=[x, y, z], + quaternion=rr.Quaternion(xyzw=[qx, qy, qz, qw]), + ), + ) + _log_path( + " fastlio_path", + clipped("fastlio_odometry", Odometry), + "world/fastlio_path", + color=(255, 165, 0), # orange + ) + + # Go2 native odom pose axis + path. + if "odom" in store.streams: + odom = clipped("odom", PoseStamped) + cb = _progress(odom.count(), " odom") + for odom_obs in odom: + cb(odom_obs) + if odom_obs.pose_tuple is None: + continue + rr.set_time(TIMELINE, timestamp=odom_obs.ts) + x, y, z, qx, qy, qz, qw = odom_obs.pose_tuple + rr.log( + "world/odom", + rr.Transform3D( + translation=[x, y, z], + quaternion=rr.Quaternion(xyzw=[qx, qy, qz, qw]), + ), + ) + _log_path( + " odom_path", + clipped("odom", PoseStamped), + "world/odom_path", + color=(0, 200, 100), # green + ) + + # Pass 2: camera pose + image per color_image. + cam_pipeline = ( + color_image.transform(throttle(1.0 / camera_hz)) if camera_hz > 0 else color_image + ) + n_img = cam_pipeline.count() + cb = _progress(n_img, " color_image") + for img_obs in cam_pipeline: + cb(img_obs) + rr.set_time(TIMELINE, timestamp=img_obs.ts) + if img_obs.pose_tuple is not None: + x, y, z, qx, qy, qz, qw = img_obs.pose_tuple + rr.log( + "world/camera", + rr.Transform3D( + translation=[x, y, z], quaternion=rr.Quaternion(xyzw=[qx, qy, qz, qw]) + ), + ) + rr.log("world/camera/image", img_obs.data.to_rerun()) + + print(f"wrote {out}") + if no_gui: + print(f"open with: rerun {out}") + else: + subprocess.Popen(["rerun", str(out)]) + + +if __name__ == "__main__": + typer.run(main) diff --git a/dimos/mapping/utils/cli/replay_marker.py b/dimos/mapping/utils/cli/replay_marker.py new file mode 100644 index 0000000000..881ef4cf9b --- /dev/null +++ b/dimos/mapping/utils/cli/replay_marker.py @@ -0,0 +1,252 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Temporary: dump apriltag detector replay (.rrd) for reliability investigation. + +Walks a recorded SQLite dataset and writes an rrd containing: +- per-image: camera pose + pinhole + image data +- per-lidar: base_link pose (a moving axis) +- per detection: marker box in world frame, at the detection timestamp + +Usage: + uv run python -m dimos.mapping.utils.cli.replay_marker hk_village1 --out hk.rrd + rerun hk.rrd + +Throwaway script next to ``map.py``; remove once the apriltag reliability work +lands. +""" + +from __future__ import annotations + +from pathlib import Path +import subprocess +from typing import TYPE_CHECKING, Any + +import rerun as rr +import typer + +# Heavy dimos imports (memory2/perception → torch, scipy, cv2) are deferred into +# main() so that `dimos map --help` stays fast. See test_cli_startup.py and the +# same pattern in dimos/mapping/utils/cli/map.py. +if TYPE_CHECKING: + from dimos.memory2.stream import Stream + +TIMELINE = "ts" + + +def main( + dataset: str = typer.Argument(..., help="Dataset .db: bare name (cwd or data/) or path"), + out: Path | None = typer.Option( + None, "--out", help="Output .rrd path (default: ./.rrd)" + ), + no_gui: bool = typer.Option(False, "--no-gui", help="Don't launch rerun on the result"), + seek: float = typer.Option(0.0, "--seek", help="Skip the first N seconds of the recording"), + duration: float | None = typer.Option( + None, "--duration", help="Use only N seconds from --seek (default: to the end)" + ), + marker_size: float = typer.Option(0.1, "--marker-size", help="Marker edge length (m)"), + marker_max_speed: float = typer.Option( + 0.5, "--marker-max-speed", help="Detection speed gate (m/s); 0 disables" + ), + marker_max_rot_rate: float = typer.Option( + 50.0, "--marker-max-rot-rate", help="Detection rot gate (deg/s); 0 disables" + ), + quality_window: float = typer.Option( + 0.1, "--quality-window", help="Sharpest-frame window for detection (s)" + ), + smoothing_window: float = typer.Option( + 7.5, "--smoothing-window", help="Buffer window for averaged-track pass (s); 0 disables" + ), +) -> None: + """Dump an AprilTag detection replay to .rrd and open it in rerun.""" + from dimos.memory2.store.sqlite import SqliteStore + from dimos.memory2.transform import QualityWindow, SpeedLimit + from dimos.memory2.vis.color import Color + from dimos.msgs.sensor_msgs.Image import Image + from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 + from dimos.perception.fiducial.marker_transformer import DetectMarkers + from dimos.robot.unitree.go2.connection import _camera_info_static + from dimos.utils.data import resolve_named_path + + db_path = resolve_named_path(dataset, ".db") + if out is None: + out = Path.cwd() / f"{db_path.stem}.rrd" + cam_info = _camera_info_static() + + rr.init("dimos markers", recording_id=db_path.stem) + rr.save(str(out)) + + # Static pinhole on the camera entity; per-frame Transform3D goes on the + # same entity. Image is the child so it projects through the pinhole. + pinhole = cam_info.to_rerun() + assert not isinstance(pinhole, list) + rr.log("world/camera", pinhole, static=True) + + store = SqliteStore(path=str(db_path)) + with store: + color_image = store.stream("color_image", Image).from_time(seek or None).to_time(duration) + lidar = store.stream("lidar", PointCloud2).from_time(seek or None).to_time(duration) + + # Pass 1: robot base pose over time (from lidar.pose). + for lidar_obs in lidar: + if lidar_obs.pose_tuple is None: + continue + rr.set_time(TIMELINE, timestamp=lidar_obs.ts) + x, y, z, qx, qy, qz, qw = lidar_obs.pose_tuple + rr.log( + "world/robot", + rr.Transform3D( + translation=[x, y, z], quaternion=rr.Quaternion(xyzw=[qx, qy, qz, qw]) + ), + ) + + # Pass 2: camera pose + image per color_image frame. + n_img = color_image.count() + for i, img_obs in enumerate(color_image): + rr.set_time(TIMELINE, timestamp=img_obs.ts) + if img_obs.pose_tuple is not None: + x, y, z, qx, qy, qz, qw = img_obs.pose_tuple + rr.log( + "world/camera", + rr.Transform3D( + translation=[x, y, z], quaternion=rr.Quaternion(xyzw=[qx, qy, qz, qw]) + ), + ) + rr.log("world/camera/image", img_obs.data.to_rerun()) + # Clear any prior detection box at this ts; pass 3 will overwrite + # this with the actual detection iff one fires for this frame. + rr.log( + "world/camera/image/detections", + rr.Boxes2D(array=[], array_format=rr.Box2DFormat.XYWH), + ) + if (i + 1) % 50 == 0 or i + 1 == n_img: + print(f"images: {i + 1}/{n_img}") + + # Pass 3: marker detections (filtered same way as `dimos map`). + xf = DetectMarkers(camera_info=cam_info, marker_length_m=marker_size) + pipeline: Stream[Any] = color_image.transform( + QualityWindow(lambda img: img.sharpness, window=quality_window) + ) + if marker_max_speed > 0: + pipeline = pipeline.transform( + SpeedLimit( + max_mps=marker_max_speed, + max_dps=marker_max_rot_rate if marker_max_rot_rate > 0 else None, + ) + ) + # Each 3D detection gets its own entity path so it persists from its + # ts onward — earlier detections stay visible as new ones appear. + # Color by ts (turbo) across the recording's full image-time range. + ts_min = color_image.first().ts + ts_max = color_image.last().ts + ts_span = ts_max - ts_min if ts_max > ts_min else 1.0 + n_det = 0 + for det_obs in pipeline.transform(xf): + d = det_obs.data + rr.set_time(TIMELINE, timestamp=det_obs.ts) + color = Color.from_cmap("turbo", (det_obs.ts - ts_min) / ts_span).rgb_u8() + rr.log( + f"world/markers/det_{n_det:05d}", + rr.Boxes3D( + centers=[(d.center.x, d.center.y, d.center.z)], + half_sizes=[(marker_size / 2, marker_size / 2, 0.005)], + quaternions=[ + rr.Quaternion( + xyzw=[ + d.orientation.x, + d.orientation.y, + d.orientation.z, + d.orientation.w, + ] + ) + ], + colors=[color], + fill_mode=rr.components.FillMode.Solid, + labels=[f"id={d.marker_id}"], + ), + ) + n_det += 1 + xs = d.corners_px[:, 0] + ys = d.corners_px[:, 1] + x1, y1 = float(xs.min()), float(ys.min()) + x2, y2 = float(xs.max()), float(ys.max()) + rr.log( + "world/camera/image/detections", + rr.Boxes2D( + array=[[x1, y1, x2 - x1, y2 - y1]], + array_format=rr.Box2DFormat.XYWH, + labels=[f"id={d.marker_id}"], + ), + ) + print(f"detections: {n_det}") + + # Pass 4: averaged tracks (smoothing_window > 0 → per-track ids). + # Re-runs the same filtered pipeline through a smoothing detector; + # each track yields one entity that updates as the windowed average + # refines. Color stable per track_id for visual identity. + if smoothing_window > 0: + xf_tracked = DetectMarkers( + camera_info=cam_info, + marker_length_m=marker_size, + smoothing_window=smoothing_window, + ) + pipeline_tracked: Stream[Any] = color_image.transform( + QualityWindow(lambda img: img.sharpness, window=quality_window) + ) + if marker_max_speed > 0: + pipeline_tracked = pipeline_tracked.transform( + SpeedLimit( + max_mps=marker_max_speed, + max_dps=marker_max_rot_rate if marker_max_rot_rate > 0 else None, + ) + ) + seen_tracks: set[int] = set() + n_updates = 0 + for det_obs in pipeline_tracked.transform(xf_tracked): + d = det_obs.data + rr.set_time(TIMELINE, timestamp=det_obs.ts) + color = Color.from_cmap("tab10", (d.track_id % 10) / 10.0).rgb_u8() + rr.log( + f"world/tracks/track_{d.track_id:04d}", + rr.Boxes3D( + centers=[(d.center.x, d.center.y, d.center.z)], + half_sizes=[(marker_size / 2, marker_size / 2, 0.005)], + quaternions=[ + rr.Quaternion( + xyzw=[ + d.orientation.x, + d.orientation.y, + d.orientation.z, + d.orientation.w, + ] + ) + ], + colors=[color], + fill_mode=rr.components.FillMode.Solid, + labels=[f"track={d.track_id} id={d.marker_id}"], + ), + ) + seen_tracks.add(d.track_id) + n_updates += 1 + print(f"tracks: {len(seen_tracks)} unique, {n_updates} updates") + + print(f"wrote {out}") + if no_gui: + print(f"open with: rerun {out}") + else: + subprocess.Popen(["rerun", str(out)]) + + +if __name__ == "__main__": + typer.run(main) diff --git a/dimos/mapping/utils/cli/summary.py b/dimos/mapping/utils/cli/summary.py new file mode 100644 index 0000000000..6475f2d833 --- /dev/null +++ b/dimos/mapping/utils/cli/summary.py @@ -0,0 +1,59 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Print ``Store.summary()`` for a memory2 sqlite recording. + +Usage: + uv run python -m dimos.mapping.utils.cli.summary mid360 +""" + +from __future__ import annotations + +import json +from pathlib import Path +import sqlite3 + +import typer + +from dimos.memory2.codecs.base import _resolve_payload_type +from dimos.memory2.store.sqlite import SqliteStore +from dimos.utils.data import resolve_named_path + + +def _stream_payload_types(db_path: Path) -> dict[str, type]: + """Read each stream's registered payload type from the _streams table.""" + conn = sqlite3.connect(str(db_path)) + try: + rows = conn.execute("SELECT name, config FROM _streams").fetchall() + finally: + conn.close() + return {name: _resolve_payload_type(json.loads(cfg)["payload_module"]) for name, cfg in rows} + + +def main( + dataset: str = typer.Argument(..., help="Dataset .db: bare name (cwd or data/) or path"), +) -> None: + """Print per-stream counts and time ranges for a recorded SQLite dataset.""" + db_path = resolve_named_path(dataset, ".db") + payload_types = _stream_payload_types(db_path) + + store = SqliteStore(path=str(db_path)) + with store: + for name, ptype in payload_types.items(): + store.stream(name, ptype) + print(store.summary()) + + +if __name__ == "__main__": + typer.run(main) diff --git a/dimos/mapping/utils/cli/test_cli.py b/dimos/mapping/utils/cli/test_cli.py new file mode 100644 index 0000000000..cb9ece1f3c --- /dev/null +++ b/dimos/mapping/utils/cli/test_cli.py @@ -0,0 +1,223 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""End-to-end tests for the `dimos map` verbs, run in-process. + +Each test invokes the real CLI against the `go2_short` recording (60s, auto- +pulled via LFS) and asserts on the artifact it produces. A short `--duration` +snippet keeps every invocation to a few seconds, and `--no-gui` stops the rrd +verbs from spawning a rerun viewer. + +The CLI is invoked in-process via Typer's CliRunner (not a subprocess per case), +so the heavy dimos import is paid once for the whole module instead of per case. +""" + +from __future__ import annotations + +from pathlib import Path +import traceback +from types import SimpleNamespace + +import pytest +from typer.testing import CliRunner + +# These drive the real CLI against an LFS recording (CPU voxel accumulation, +# multi-second runs) — self-hosted runner only. +pytestmark = pytest.mark.self_hosted + +# A few seconds in, then a couple seconds long — small enough to stay fast, +# long enough that the robot moves (so dedup/PGO/markers have something to do). +SEEK = 4.0 +DURATION = 3.0 + +# go2_short has 461 lidar frames over ~60s; a 3s snippet must be far fewer. +FULL_LIDAR_COUNT = 461 + +_runner = CliRunner() + + +def _turbojpeg_available() -> bool: + """True if the native libturbojpeg can be loaded (decodes the color_image stream).""" + try: + from turbojpeg import TurboJPEG + + TurboJPEG() + except Exception: + return False + return True + + +# The color_image stream is JPEG; decoding it needs the native libturbojpeg, which +# is missing from stale CI images. Skip the verbs that touch images when it's absent. +requires_turbojpeg = pytest.mark.skipif( + not _turbojpeg_available(), + reason="native libturbojpeg unavailable; cannot decode the JPEG color_image stream", +) + + +def _run(*args: str, timeout: float = 300.0) -> SimpleNamespace: + """Invoke `dimos map ` in-process and capture its result. + + Uses Typer's CliRunner so the dimos import cost is paid once (module import) + rather than per case. `timeout` is kept for call-site compatibility but is a + no-op here — pytest-timeout covers hangs. Returns a result with the + .returncode/.stdout/.stderr fields the test bodies use. + """ + from dimos.robot.cli.dimos import main as cli_app + + res = _runner.invoke(cli_app, ["map", *args]) + err = res.output + if res.exception is not None and not isinstance(res.exception, SystemExit): + err += "\n" + "".join(traceback.format_exception(res.exception)) + return SimpleNamespace(returncode=res.exit_code, stdout=res.output, stderr=err) + + +def _stream_counts(db_path: Path) -> dict[str, int]: + from dimos.memory2.store.sqlite import SqliteStore + + store = SqliteStore(path=str(db_path), must_exist=True) + try: + return {name: store.stream(name).count() for name in store.list_streams()} + finally: + store.stop() + + +@pytest.fixture(scope="module") +def dataset() -> str: + """Ensure the go2_short recording is present locally; return its bare name.""" + from dimos.utils.data import get_data + + get_data("go2_short.db") + return "go2_short" + + +def test_summary(dataset: str) -> None: + res = _run("summary", dataset) + assert res.returncode == 0, res.stderr + assert "lidar" in res.stdout + assert "odom" in res.stdout + + +@requires_turbojpeg +def test_rename_snippet(dataset: str, tmp_path: Path) -> None: + out = tmp_path / "renamed.db" + res = _run( + "rename", + dataset, + "--out", + str(out), + "--rename", + "odom=odometry", + "--duration", + str(DURATION), + ) + assert res.returncode == 0, res.stderr + assert out.exists() + + counts = _stream_counts(out) + # Stream was renamed, not duplicated. + assert "odometry" in counts + assert "odom" not in counts + assert "lidar" in counts + # --duration actually clipped the copy. + assert 0 < counts["lidar"] < FULL_LIDAR_COUNT + + +def test_pose_fill_snippet(dataset: str, tmp_path: Path) -> None: + out = tmp_path / "posed.db" + res = _run( + "pose-fill", + dataset, + "--out", + str(out), + "--streams", + "lidar,odom", + "--duration", + str(DURATION), + ) + assert res.returncode == 0, res.stderr + assert out.exists() + + from dimos.memory2.store.sqlite import SqliteStore + from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 + + store = SqliteStore(path=str(out), must_exist=True) + try: + lidar = store.stream("lidar", PointCloud2) + assert 0 < lidar.count() < FULL_LIDAR_COUNT + # The target stream carries a baked pose after pose-fill. + assert lidar.first().pose is not None + finally: + store.stop() + + +@requires_turbojpeg +def test_replay_snippet(dataset: str, tmp_path: Path) -> None: + out = tmp_path / "replay.rrd" + res = _run("replay", dataset, "--out", str(out), "--no-gui", "--duration", str(DURATION)) + assert res.returncode == 0, res.stderr + assert out.exists() and out.stat().st_size > 0 + + +@requires_turbojpeg +def test_replay_marker_snippet(dataset: str, tmp_path: Path) -> None: + out = tmp_path / "markers.rrd" + res = _run("replay-marker", dataset, "--out", str(out), "--no-gui", "--duration", str(DURATION)) + assert res.returncode == 0, res.stderr + assert out.exists() and out.stat().st_size > 0 + + +@requires_turbojpeg +def test_replay_map_final_snippet(dataset: str, tmp_path: Path) -> None: + out = tmp_path / "replay_map.rrd" + # CPU device keeps the voxel accumulation runnable without a GPU. + res = _run( + "replay", + dataset, + "--out", + str(out), + "--no-gui", + "--duration", + str(DURATION), + "--map-final", + "--map-carve-columns", + "--map-device", + "CPU:0", + ) + assert res.returncode == 0, res.stderr + assert out.exists() and out.stat().st_size > 0 + + +def test_global_snippet(dataset: str, tmp_path: Path) -> None: + out = tmp_path / "global.rrd" + # CPU device + small block budget keeps this runnable without a GPU. + # --seek exercises the snippet offset alongside --duration. + res = _run( + "global", + dataset, + "--out", + str(out), + "--no-gui", + "--seek", + str(SEEK), + "--duration", + str(DURATION), + "--device", + "CPU:0", + "--block-count", + "100000", + timeout=600.0, + ) + assert res.returncode == 0, res.stderr + assert out.exists() and out.stat().st_size > 0 diff --git a/dimos/mapping/utils/cli/test_pose_fill.py b/dimos/mapping/utils/cli/test_pose_fill.py new file mode 100644 index 0000000000..7b4b39e7d7 --- /dev/null +++ b/dimos/mapping/utils/cli/test_pose_fill.py @@ -0,0 +1,61 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for the stream-level `pose_fill` (no LFS data, runs in normal CI). + +The end-to-end `dimos map pose-fill` path against a real recording lives in +`test_cli.py` (self-hosted). These cover the pure stream transform directly. +""" + +from __future__ import annotations + +from dimos.mapping.utils.cli.pose_fill import pose_fill +from dimos.memory2.store.memory import MemoryStore +from dimos.msgs.geometry_msgs.Quaternion import Quaternion +from dimos.msgs.geometry_msgs.Transform import Transform +from dimos.msgs.geometry_msgs.Vector3 import Vector3 + + +def test_pose_fill_attaches_nearest_pose() -> None: + """Each target obs gets the nearest pose-source pose; payload is preserved.""" + with MemoryStore() as store: + target = store.stream("image", str) + poses = store.stream("odom", Transform) + target.append("img0", ts=0.0) + poses.append(Transform(translation=Vector3(1.0, 0.0, 0.0)), ts=0.001) + + out = pose_fill(target, poses, tolerance=0.05).to_list() + + assert len(out) == 1 + assert out[0].data == "img0" + assert out[0].pose_tuple is not None + assert out[0].pose_tuple[:3] == (1.0, 0.0, 0.0) + + +def test_pose_fill_mount_composes_static_child_transform() -> None: + """`mount` composes ``world_base + mount`` onto each attached pose.""" + with MemoryStore() as store: + target = store.stream("image", str) + poses = store.stream("odom", Transform) + target.append("img0", ts=0.0) + # Base pose 1m forward in x; mount offsets 1m in y (identity rotations). + poses.append(Transform(translation=Vector3(1.0, 0.0, 0.0)), ts=0.001) + mount = Transform(translation=Vector3(0.0, 1.0, 0.0), rotation=Quaternion()) + + out = pose_fill(target, poses, tolerance=0.05, mount=mount).to_list() + + assert len(out) == 1 + assert out[0].pose_tuple is not None + # Identity rotations → composed translation is the component-wise sum. + assert out[0].pose_tuple[:3] == (1.0, 1.0, 0.0) diff --git a/dimos/memory2/backend.py b/dimos/memory2/backend.py index 3c3b89669e..7b95bd6335 100644 --- a/dimos/memory2/backend.py +++ b/dimos/memory2/backend.py @@ -89,20 +89,25 @@ def loader() -> Any: return loader def append(self, obs: Observation[T]) -> Observation[T]: + # Materialize lazy payloads (e.g. from with_pose()/tag()/derived + # streams) in place — validation and encoding below read obs._data, + # and we must encode it to store anyway. + payload = obs.data + # Validate payload type matches stream type - if self.data_type is not object and not isinstance(obs._data, self.data_type): + if self.data_type is not object and not isinstance(payload, self.data_type): raise TypeError( - f"Stream expects {self.data_type.__qualname__}, got {type(obs._data).__qualname__}" + f"Stream expects {self.data_type.__qualname__}, got {type(payload).__qualname__}" ) obs.data_type = self.data_type # Scalars are stored inline in the metadata value column — skip blob - is_scalar = isinstance(obs._data, (int, float)) + is_scalar = isinstance(payload, (int, float)) # Encode payload before any locking (avoids holding locks during IO) encoded: bytes | None = None if self.blob_store is not None and not is_scalar: - encoded = self.codec.encode(obs._data) + encoded = self.codec.encode(payload) try: # Insert metadata, get assigned id diff --git a/dimos/memory2/store/base.py b/dimos/memory2/store/base.py index fb6d76379d..7a7162a6d1 100644 --- a/dimos/memory2/store/base.py +++ b/dimos/memory2/store/base.py @@ -65,6 +65,9 @@ def __getitem__(self, name: str) -> S: raise KeyError(name) return self._container.stream(name) + def __contains__(self, name: str) -> bool: + return name in self._container.list_streams() + def __dir__(self) -> list[str]: return self._container.list_streams() @@ -200,6 +203,10 @@ def list_streams(self) -> list[str]: """Return names of all streams in this store.""" return list(self._streams.keys()) + def summary(self) -> str: + """One line per stream — name, count, ts range. See :meth:`Stream.summary`.""" + return "\n".join(stream.summary() for _, stream in self.streams.items()) + def delete_stream(self, name: str) -> None: """Delete a stream by name (from cache and underlying storage).""" stream = self._streams.pop(name, None) diff --git a/dimos/memory2/stream.py b/dimos/memory2/stream.py index d0f4f14ca0..c79d358d50 100644 --- a/dimos/memory2/stream.py +++ b/dimos/memory2/stream.py @@ -14,6 +14,8 @@ from __future__ import annotations +from collections import namedtuple +from functools import cache import sys import time from typing import TYPE_CHECKING, Any, Generic, cast, overload @@ -56,6 +58,51 @@ logger = setup_logger() +@cache +def _pair_class(field_a: str, field_b: str) -> type: + """Cached ``AlignedPair`` namedtuple with fields named after the two streams. + + ``rename=True`` so any non-identifier or duplicate name degrades to ``_0`` / + ``_1`` rather than raising — index access (``pair[0]``) always works. + """ + return namedtuple("AlignedPair", [field_a, field_b], rename=True) # type: ignore[misc] + + +def _nearest_align_iter( + primary_iter: Iterator[Observation[Any]], + secondary_iter: Iterator[Observation[Any]], + tolerance: float, + pair_class: type, +) -> Iterator[Observation[Any]]: + """Streaming two-pointer nearest-ts merge over ts-ascending iterators. + + The secondary cursor catches up to each primary; the nearest secondary is + whichever of the two bracketing it (the last one at/before ``p`` or the + first one after) is closer. O(1) state — only those two are held, never a + window — so it streams over arbitrarily long (including live) inputs. + """ + secondary = iter(secondary_iter) + prev: Observation[Any] | None = None # last secondary with ts <= p.ts + nxt: Observation[Any] | None = next(secondary, None) # first secondary after prev + + for p in primary_iter: + # Advance the cursor until `nxt` sits past `p`; `prev` trails just behind. + while nxt is not None and nxt.ts <= p.ts: + prev = nxt + nxt = next(secondary, None) + + # Nearest is the closer of the two bracketing secondaries. + if prev is None: + best = nxt + elif nxt is None: + best = prev + else: + best = prev if (p.ts - prev.ts) <= (nxt.ts - p.ts) else nxt + + if best is not None and abs(best.ts - p.ts) <= tolerance: + yield p.derive(data=pair_class(p, best)) + + class Stream(CompositeResource, Generic[T, O]): """Lazy, pull-based stream over observations. @@ -118,6 +165,14 @@ def __str__(self) -> str: return result + @property + def name(self) -> str | None: + """Backing stream's name (``None`` if unbound), inherited through transforms.""" + current: Any = self + while isinstance(current, Stream): + current = current._source + return None if current is None else cast("str", current.name) + def is_live(self) -> bool: """True if this stream (or any ancestor in the chain) is in live mode.""" if self._query.live_buffer is not None: @@ -311,6 +366,35 @@ def detect(upstream): xf = FnIterTransformer(xf) return Stream(source=self, transform=xf, query=StreamQuery()) + def align(self, other: Stream[Any, Any], *, tolerance: float) -> Stream[Any]: + """Pair each observation with the nearest-in-time one from *other*. + + The primary (``self``) drives; for each primary the nearest secondary + within ``|Δts| <= tolerance`` is paired, others are skipped. The merge + is a single forward pass that assumes **both streams iterate in + ascending ts** — true for live streams (arrival order) and for stored + streams in insertion order; prepend ``.order_by("ts")`` on a side whose + iteration order isn't chronological. Source-agnostic: stored, + transformed, and live streams all work (a live side simply never ends). + + The output ``Observation`` carries the primary's ``ts``, ``pose``, + ``tags``. ``obs.data`` is an ``AlignedPair`` namedtuple whose fields are + named after each side's stream (e.g. ``pair.lidar`` / ``pair.odom``), + with ``pair[0]`` / ``pair[1]`` index access always available. Each + field is the full :class:`Observation` from that side. Field names fall + back to ``primary`` / ``secondary`` when a stream is unnamed or both + share a name. + """ + field_a, field_b = self.name, other.name + if not field_a or not field_b or field_a == field_b: + field_a, field_b = "primary", "secondary" + pair_class = _pair_class(field_a, field_b) + + def _align(upstream: Iterator[Observation[Any]]) -> Iterator[Observation[Any]]: + return _nearest_align_iter(upstream, iter(other), tolerance, pair_class) + + return self.transform(FnIterTransformer(_align)) + def live(self, buffer: BackpressureBuffer[Observation[Any]] | None = None) -> Stream[T, O]: """Return a stream whose iteration never ends — backfill then live tail. diff --git a/dimos/memory2/test_stream.py b/dimos/memory2/test_stream.py index fe92650a12..62ddda3cd1 100644 --- a/dimos/memory2/test_stream.py +++ b/dimos/memory2/test_stream.py @@ -783,6 +783,221 @@ def consumer(): assert results == ["b", "d"] +class TestAlign: + """Pairwise nearest-ts alignment between two streams.""" + + def test_basic_pairing(self, session): + """Each primary pairs with the closest in-tolerance secondary.""" + lidar = session.stream("lidar", str) + image = session.stream("image", str) + # primary at 0.0, 0.1, 0.2; secondary slightly offset + for i, v in enumerate(["L0", "L1", "L2"]): + lidar.append(v, ts=i * 0.1) + for ts, v in [(0.02, "I0"), (0.11, "I1"), (0.19, "I2")]: + image.append(v, ts=ts) + + pairs = lidar.align(image, tolerance=0.05).to_list() + assert [p.data.lidar.data for p in pairs] == ["L0", "L1", "L2"] + assert [p.data.image.data for p in pairs] == ["I0", "I1", "I2"] + + def test_named_and_index_access(self, session): + """Pair fields are accessible by source-stream name and by index.""" + a = session.stream("alpha", str) + b = session.stream("beta", str) + a.append("a0", ts=0.0) + b.append("b0", ts=0.001) + + pair = a.align(b, tolerance=0.01).to_list()[0].data + assert pair.alpha.data == "a0" + assert pair.beta.data == "b0" + assert pair[0].data == "a0" + assert pair[1].data == "b0" + + def test_same_name_falls_back_to_primary_secondary(self, session): + """Aligning streams that share a name names fields primary/secondary.""" + a = session.stream("dup", str) + a.append("x", ts=0.0) + + pair = a.align(a, tolerance=0.01).to_list()[0].data + assert pair.primary.data == "x" + assert pair.secondary.data == "x" + assert pair[0].data == "x" + + def test_outer_obs_carries_primary_metadata(self, session): + """The yielded outer Observation keeps the primary's ts/pose/tags.""" + a = session.stream("primary", str) + b = session.stream("secondary", str) + a.append("pa", ts=10.0, pose=(1, 2, 3), tags={"src": "p"}) + b.append("sb", ts=10.005, pose=(9, 9, 9), tags={"src": "s"}) + + out = a.align(b, tolerance=0.05).to_list()[0] + assert out.ts == 10.0 + assert out.pose_tuple[:3] == (1, 2, 3) + assert out.tags == {"src": "p"} + # Secondary's pose still reachable via the pair. + assert out.data.secondary.pose_tuple[:3] == (9, 9, 9) + + def test_skip_when_no_match_in_tolerance(self, session): + """Primary frames with no secondary within tolerance are dropped.""" + a = session.stream("a", str) + b = session.stream("b", str) + a.append("a0", ts=0.0) + a.append("a1", ts=1.0) + a.append("a2", ts=2.0) + # Only a match near a1. + b.append("b1", ts=1.02) + + pairs = a.align(b, tolerance=0.05).to_list() + assert [p.data.a.data for p in pairs] == ["a1"] + assert [p.data.b.data for p in pairs] == ["b1"] + + def test_one_secondary_matches_many_primaries(self, session): + """Same secondary can pair with multiple primaries — no early eviction.""" + a = session.stream("a", str) + b = session.stream("b", str) + for ts in (1.000, 1.005, 1.010): + a.append(f"a{ts}", ts=ts) + b.append("b", ts=1.004) + + pairs = a.align(b, tolerance=0.05).to_list() + assert len(pairs) == 3 + assert all(p.data.b.data == "b" for p in pairs) + + def test_primary_before_first_secondary(self, session): + """Early primaries without any in-window secondary are skipped, later ones pair.""" + a = session.stream("a", str) + b = session.stream("b", str) + for ts in (0.0, 0.1, 0.2, 0.3): + a.append(f"a{ts}", ts=ts) + b.append("b", ts=0.3) + + pairs = a.align(b, tolerance=0.05).to_list() + assert [p.ts for p in pairs] == [0.3] + + def test_empty_secondary(self, session): + """Empty secondary stream → empty alignment, no error.""" + a = session.stream("a", str) + b = session.stream("b", str) # leave empty + a.append("a0", ts=0.0) + pairs = a.align(b, tolerance=1.0).to_list() + assert pairs == [] + + def test_filter_on_either_side_respected(self, session): + """Filters applied before .align() restrict the iterated set on that side.""" + a = session.stream("a", str) + b = session.stream("b", str) + for ts in (0.0, 1.0, 2.0): + a.append(f"a{ts}", ts=ts) + b.append(f"b{ts}", ts=ts + 0.01) + + pairs = a.after(0.5).align(b.before(1.5), tolerance=0.05).to_list() + assert [p.data.a.data for p in pairs] == ["a1.0"] + assert [p.data.b.data for p in pairs] == ["b1.0"] + + def test_transform_source_allowed(self, session): + """A transform-sourced stream on either side aligns fine (source-agnostic).""" + a = session.stream("a", int) + b = session.stream("b", int) + for ts in (0.0, 1.0): + a.append(int(ts), ts=ts) + b.append(int(ts) + 100, ts=ts + 0.01) + + # Primary transformed (+1), secondary transformed (+1) — both still ts-ordered. + primary = a.map(lambda obs: obs.derive(data=obs.data + 1)) + secondary = b.map(lambda obs: obs.derive(data=obs.data + 1)) + pairs = primary.align(secondary, tolerance=0.05).to_list() + # Field names inherit through the transform from the backing streams. + assert [p.data.a.data for p in pairs] == [1, 2] + assert [p.data.b.data for p in pairs] == [101, 102] + + def test_live_alignment(self, memory_session): + """align() streams over live inputs — a pair emits once its bracket closes.""" + p = memory_session.stream("lp") + s = memory_session.stream("ls") + # Backfilled matched pair so the first result is immediate. + p.append("P0", ts=1.0) + s.append("S0", ts=1.002) + + aligned = p.live(buffer=Unbounded()).align(s.live(buffer=Unbounded()), tolerance=0.1) + + results: list[tuple[str, str]] = [] + consumed = threading.Event() + + def consumer(): + for obs in aligned: + results.append((obs.data.lp.data, obs.data.ls.data)) + if len(results) >= 2: + consumed.set() + return + + t = threading.Thread(target=consumer, daemon=True) + t.start() + + time.sleep(0.05) + # Live primary, then a secondary just past it so P1's bracket closes. + p.append("P1", ts=2.0) + s.append("S1", ts=2.003) + + consumed.wait(timeout=2.0) + t.join(timeout=2.0) + assert results == [("P0", "S0"), ("P1", "S1")] + + +class TestAlignLargeInput: + """The two-pointer merge scales to dense streams in a single forward pass.""" + + def test_dense_secondary_pairs_correctly(self, memory_session): + """5k primaries against 50k secondaries — every primary gets its exact match.""" + primary = memory_session.stream("p") + secondary = memory_session.stream("s") + # secondary at 500 Hz, primary at 50 Hz — each primary lands on a secondary. + for i in range(5_000): + primary.append(i, ts=i * 0.02) + for i in range(50_000): + secondary.append(i, ts=i * 0.002) + + pairs = primary.align(secondary, tolerance=0.005).to_list() + assert len(pairs) == 5_000 + # Each primary's nearest secondary is the coincident one (Δts ≈ 0). + assert max(abs(p.data[0].ts - p.data[1].ts) for p in pairs) < 1e-9 + + +class TestWithPose: + """``Observation.with_pose`` attaches/clears a pose without touching the payload.""" + + def test_attaches_pose_and_keeps_payload_lazy(self): + """Pose is set, but the payload loader stays untouched until accessed.""" + loads = 0 + + def loader() -> str: + nonlocal loads + loads += 1 + return "payload" + + obs: Observation[str] = Observation(id=1, ts=5.0, data_type=str, _loader=loader) + posed = obs.with_pose((1.0, 2.0, 3.0)) + + # 3-tuple padded with the identity quaternion; payload not materialized. + assert posed.pose_tuple == (1.0, 2.0, 3.0, 0.0, 0.0, 0.0, 1.0) + assert posed.ts == 5.0 + assert loads == 0 + # The payload still resolves lazily on first access. + assert posed.data == "payload" + assert loads == 1 + + def test_pose_none_clears(self): + obs: Observation[str] = Observation(id=1, ts=0.0, data_type=str, pose=(1, 2, 3), _data="x") + cleared = obs.with_pose(None) + assert cleared.pose_tuple is None + assert cleared.data == "x" + + def test_overwrites_existing_pose(self): + obs: Observation[str] = Observation(id=1, ts=0.0, data_type=str, pose=(1, 2, 3), _data="x") + moved = obs.with_pose((7, 8, 9)) + assert moved.pose_tuple is not None + assert moved.pose_tuple[:3] == (7.0, 8.0, 9.0) + + class TestTimeWindowing: """``*_time`` is relative to the first observation, ``*_timestamp`` is absolute. diff --git a/dimos/memory2/type/observation.py b/dimos/memory2/type/observation.py index 139566287d..7ee8224582 100644 --- a/dimos/memory2/type/observation.py +++ b/dimos/memory2/type/observation.py @@ -212,6 +212,22 @@ def tag(self, **tags: Any) -> Self: ) return type(self)(**kwargs) + def with_pose(self, pose: Any) -> Self: + """Return a new observation with ``pose`` attached, payload kept lazy. + + ``pose`` accepts anything :func:`_to_tuple` handles (a 3-/7-tuple, + ``Pose``/``PoseStamped``/``Transform``, or ``None`` to clear). + """ + kwargs: dict[str, Any] = {f.name: getattr(self, f.name) for f in fields(self)} + kwargs.pop("pose_tuple", None) + kwargs.update( + pose=pose, + _data=_UNLOADED, + _loader=lambda: self.data, + _data_lock=threading.Lock(), + ) + return type(self)(**kwargs) + @dataclass(init=False) class EmbeddedObservation(Observation[T]): diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py index 209c30c3e8..c866a4546f 100644 --- a/dimos/robot/cli/dimos.py +++ b/dimos/robot/cli/dimos.py @@ -38,8 +38,13 @@ from dimos.core.daemon import daemonize, install_signal_handlers from dimos.core.global_config import GlobalConfig, global_config from dimos.core.run_registry import get_most_recent, is_pid_alive, stop_entry +from dimos.mapping.utils.cli.map import main as _map_main +from dimos.mapping.utils.cli.pose_fill import main as _map_pose_fill_main +from dimos.mapping.utils.cli.rename import main as _map_rename_main +from dimos.mapping.utils.cli.replay import main as _map_replay_main +from dimos.mapping.utils.cli.replay_marker import main as _map_replay_marker_main +from dimos.mapping.utils.cli.summary import main as _map_summary_main from dimos.robot.unitree.go2.cli.go2tool import app as go2tool_app -from dimos.utils.cli.map import main as _map_main from dimos.utils.logging_config import setup_logger from dimos.visualization.rerun.constants import RerunOpenOption @@ -673,7 +678,14 @@ def send( topic_send(topic, message_expr) -main.command(name="map")(_map_main) +map_app = typer.Typer(help="Voxel-map tools over recorded sqlite datasets") +main.add_typer(map_app, name="map") +map_app.command("global")(_map_main) +map_app.command("summary")(_map_summary_main) +map_app.command("rename")(_map_rename_main) +map_app.command("pose-fill")(_map_pose_fill_main) +map_app.command("replay")(_map_replay_main) +map_app.command("replay-marker")(_map_replay_marker_main) @main.command()