From 6f5259b6eb9a4a631eba406da7d90e80c43545c4 Mon Sep 17 00:00:00 2001 From: EverNorif <1320346985@qq.com> Date: Wed, 20 May 2026 18:41:42 +0800 Subject: [PATCH 1/5] add action_adapter reset. --- source/autosim/autosim/core/action_adapter.py | 4 ++++ source/autosim/autosim/core/pipeline.py | 1 + 2 files changed, 5 insertions(+) diff --git a/source/autosim/autosim/core/action_adapter.py b/source/autosim/autosim/core/action_adapter.py index 141d2f0..7dec41a 100644 --- a/source/autosim/autosim/core/action_adapter.py +++ b/source/autosim/autosim/core/action_adapter.py @@ -102,3 +102,7 @@ def _default_apply(self, skill_output: SkillOutput, env: ManagerBasedEnv) -> tor self._logger.warning("Action adapter for skill not implemented. Using default apply.") return skill_output.action + + def reset(self) -> None: + """Reset the action adapter.""" + pass diff --git a/source/autosim/autosim/core/pipeline.py b/source/autosim/autosim/core/pipeline.py index 3ca75d3..b562bd3 100644 --- a/source/autosim/autosim/core/pipeline.py +++ b/source/autosim/autosim/core/pipeline.py @@ -134,6 +134,7 @@ def reset_env(self) -> None: self._env.reset() self._env_extra_info.reset() + self._action_adapter.reset() self._generated_actions = [] self._last_action = torch.zeros(self._env.action_space.shape, device=self._env.device) From 682f08caf23339c91403cf03c1584fe041930591 Mon Sep 17 00:00:00 2001 From: EverNorif <1320346985@qq.com> Date: Wed, 20 May 2026 18:42:25 +0800 Subject: [PATCH 2/5] add mulit move axis support in relative reach skill. --- source/autosim/autosim/skills/navigate.py | 4 +- .../autosim/autosim/skills/relative_reach.py | 47 +++++++++++++++---- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/source/autosim/autosim/skills/navigate.py b/source/autosim/autosim/skills/navigate.py index 1d45003..00ecdab 100644 --- a/source/autosim/autosim/skills/navigate.py +++ b/source/autosim/autosim/skills/navigate.py @@ -32,9 +32,9 @@ class NavigateSkillExtraCfg(SkillExtraCfg): waypoint_tolerance: float = 0.5 """The tolerance of the waypoint.""" goal_tolerance: float = 0.25 - """The tolerance of the goal.""" + """The tolerance of the distance to the goal.""" yaw_tolerance: float = 0.01 - """The tolerance of the yaw (radians).""" + """The tolerance of the yaw to the goal(radians).""" sampling_radius: float = 0.8 """The sampling radius for the target position, in meters.""" per_object_sampling_radius: dict[str, float] | None = None diff --git a/source/autosim/autosim/skills/relative_reach.py b/source/autosim/autosim/skills/relative_reach.py index 4d0179e..d843884 100644 --- a/source/autosim/autosim/skills/relative_reach.py +++ b/source/autosim/autosim/skills/relative_reach.py @@ -25,22 +25,51 @@ class RelativeReachSkillExtraCfg(CuroboSkillExtraCfg): move_offset: float = 0.1 """The offset to move the end-effector.""" move_axis: str = "+z" - """The axis to move the end-effector, which is in the eef frame. e.g. "+x", "+y", "+z", "-x", "-y", "-z".""" + """The axis to move the end-effector, which is in the eef frame. + Supports single axis (e.g. "+x", "-y") or multi-axis combinations (e.g. "+x+y", "+x-z").""" def __post_init__(self): """Post-initialize the relative reach skill extra configuration.""" super().__post_init__() - assert self.move_axis in ["+x", "+y", "+z", "-x", "-y", "-z"], f"Invalid move axis: {self.move_axis}." self._axis_map = { - "+x": torch.tensor([1.0, 0.0, 0.0]), - "+y": torch.tensor([0.0, 1.0, 0.0]), - "+z": torch.tensor([0.0, 0.0, 1.0]), - "-x": torch.tensor([-1.0, 0.0, 0.0]), - "-y": torch.tensor([0.0, -1.0, 0.0]), - "-z": torch.tensor([0.0, 0.0, -1.0]), + "x": torch.tensor([1.0, 0.0, 0.0]), + "y": torch.tensor([0.0, 1.0, 0.0]), + "z": torch.tensor([0.0, 0.0, 1.0]), } + def get_direction_vector(self) -> torch.Tensor: + """Parse move_axis and compute the normalized direction vector. + + This is computed on-demand to support dynamic modification of move_axis. + + Returns: + Normalized direction vector in EE frame. + """ + import re + + pattern = r"([+-][xyz])" + matches = re.findall(pattern, self.move_axis) + + if not matches: + raise ValueError( + f"Invalid move_axis format: '{self.move_axis}'. Expected format: '+x', '-y', '+x+y', '+x-z', etc." + ) + + direction_vector = torch.zeros(3) + for match in matches: + sign = 1.0 if match[0] == "+" else -1.0 + axis = match[1] + if axis not in self._axis_map: + raise ValueError(f"Invalid axis '{axis}' in move_axis: '{self.move_axis}'") + direction_vector += sign * self._axis_map[axis] + + norm = torch.linalg.norm(direction_vector) + if norm < 1e-6: + raise ValueError(f"move_axis '{self.move_axis}' results in zero direction vector") + + return direction_vector / norm + @configclass class RelativeReachSkillCfg(SkillCfg): @@ -83,7 +112,7 @@ def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: # move the eef along the move axis by the move offset based on eef frame, and convert to robot root frame to get target pose isaaclab_device = state.device - move_offset_vector = self.cfg.extra_cfg._axis_map[self.cfg.extra_cfg.move_axis] * self.cfg.extra_cfg.move_offset + move_offset_vector = self.cfg.extra_cfg.get_direction_vector() * self.cfg.extra_cfg.move_offset offset_pos_in_ee = move_offset_vector.to(isaaclab_device).unsqueeze(0) offset_quat_in_ee = torch.tensor([1.0, 0.0, 0.0, 0.0], device=isaaclab_device).unsqueeze(0) ee_pos_in_robot_root, ee_quat_in_robot_root = target_pos.to(isaaclab_device), target_quat.to(isaaclab_device) From d34a0d9415b4103c06fe5f79e88f7fee00fc323d Mon Sep 17 00:00:00 2001 From: EverNorif <1320346985@qq.com> Date: Thu, 21 May 2026 11:20:04 +0800 Subject: [PATCH 3/5] support robot_radius when navigation & map visualization. --- .../capabilities/navigation/occupancy_map.py | 26 ++++++++ source/autosim/autosim/core/types.py | 9 ++- source/autosim/autosim/utils/debug_util.py | 62 +++++++++++++++---- 3 files changed, 84 insertions(+), 13 deletions(-) diff --git a/source/autosim/autosim/capabilities/navigation/occupancy_map.py b/source/autosim/autosim/capabilities/navigation/occupancy_map.py index 3d24ad3..6e4713d 100644 --- a/source/autosim/autosim/capabilities/navigation/occupancy_map.py +++ b/source/autosim/autosim/capabilities/navigation/occupancy_map.py @@ -7,6 +7,7 @@ from isaaclab.envs import ManagerBasedEnv from isaaclab.utils import configclass from pxr import Usd, UsdGeom +from scipy.ndimage import binary_dilation from autosim.core.logger import AutoSimLogger from autosim.core.types import MapBounds, OccupancyMap @@ -39,6 +40,9 @@ class OccupancyMapCfg: """The tolerance for the height sampling.""" mesh_max_points: int = 5000 """Max number of mesh vertices used for footprint estimation (downsample if larger).""" + robot_radius: float = 0.25 + """Robot footprint radius in meters used to inflate obstacles. Set to 0.0 to disable inflation + (point-robot assumption). For non-circular bases, use the bounding-circle radius (conservative).""" # ----------------------------------------------------------------------------- @@ -534,10 +538,32 @@ def get_occupancy_map(env: ManagerBasedEnv, cfg: OccupancyMapCfg) -> OccupancyMa _rasterize_convex_poly(occupancy_map, poly, map_min_x, map_min_y, cfg.cell_size, map_height, map_width) + # Inflate obstacles by the robot footprint radius (+ optional extra margin). + # Cells produced by inflation are tracked separately so the visualization can distinguish + # "original geometry" vs. "robot-radius safety buffer", but for planning purposes the combined + # map treats both as occupied. + inflation_radius = float(cfg.robot_radius) + inflation_mask_np: np.ndarray | None = None + if inflation_radius > 0.0: + inflation_cells = int(np.ceil(inflation_radius / cfg.cell_size)) + if inflation_cells > 0: + original = occupancy_map.astype(bool) + inflated = binary_dilation(original, iterations=inflation_cells) + inflation_mask_np = np.logical_and(inflated, np.logical_not(original)) + occupancy_map = inflated.astype(np.int8) + _logger.info( + f"Inflated obstacles by radius={inflation_radius:.3f}m ({inflation_cells} cells); " + f"original={int(original.sum())} cells, inflation buffer={int(inflation_mask_np.sum())} cells" + ) + + inflation_mask_t = torch.from_numpy(inflation_mask_np).to(env.device) if inflation_mask_np is not None else None + return OccupancyMap( occupancy_map=torch.from_numpy(occupancy_map).to(env.device), origin=(map_min_x, map_min_y), resolution=cfg.cell_size, map_bounds=MapBounds(min_x=map_min_x, max_x=map_max_x, min_y=map_min_y, max_y=map_max_y), floor_bounds=MapBounds(min_x=min_bound[0], max_x=max_bound[0], min_y=min_bound[1], max_y=max_bound[1]), + inflation_mask=inflation_mask_t, + inflation_radius=inflation_radius if inflation_mask_np is not None else 0.0, ) diff --git a/source/autosim/autosim/core/types.py b/source/autosim/autosim/core/types.py index 3871c2b..5095aed 100644 --- a/source/autosim/autosim/core/types.py +++ b/source/autosim/autosim/core/types.py @@ -259,7 +259,9 @@ class OccupancyMap: """Occupancy map of the environment.""" occupancy_map: torch.Tensor - """The occupancy map of the environment. 2D array of shape[height, width] 0: free, 1: occupied, -1: unknown.""" + """The combined occupancy map of the environment, used for planning. 2D array of shape + [height, width] with values 0: free, 1: occupied (covers both original obstacles and the + inflation buffer), -1: unknown.""" resolution: float """The resolution of the occupancy map, cell size in meters.""" origin: tuple[float, float] @@ -268,3 +270,8 @@ class OccupancyMap: """The bounds of the occupancy map.""" floor_bounds: MapBounds """The bounds of the floor.""" + inflation_mask: torch.Tensor | None = None + """Cells added purely by robot-radius inflation (True = inflation buffer, False = either free + or original obstacle). Same shape as ``occupancy_map``. ``None`` if inflation was disabled.""" + inflation_radius: float = 0.0 + """The total inflation radius applied (meters): ``robot_radius + extra_inflation_margin``.""" diff --git a/source/autosim/autosim/utils/debug_util.py b/source/autosim/autosim/utils/debug_util.py index 6c161d7..a0a456d 100644 --- a/source/autosim/autosim/utils/debug_util.py +++ b/source/autosim/autosim/utils/debug_util.py @@ -95,11 +95,20 @@ def debug_visualize_goal_sampling( return import matplotlib.pyplot as plt # type: ignore[import] + from matplotlib.colors import ListedColormap # type: ignore[import] + from matplotlib.patches import Patch # type: ignore[import] occ = occupancy_map.occupancy_map.cpu().numpy() origin_x, origin_y = occupancy_map.origin resolution = float(occupancy_map.resolution) + # Build a 3-class display map: 0 = free, 1 = original obstacle, 2 = inflation buffer. + display_map = occ.astype(np.int16).copy() + inflation_mask_np: np.ndarray | None = None + if occupancy_map.inflation_mask is not None: + inflation_mask_np = occupancy_map.inflation_mask.cpu().numpy().astype(bool) + display_map[inflation_mask_np] = 2 + # object position in world frame -> grid coordinates ox = float(obj_pos_w[0]) oy = float(obj_pos_w[1]) @@ -117,8 +126,11 @@ def debug_visualize_goal_sampling( # sampling angles angles = np.linspace(sample_range[0], sample_range[1], num_samples, endpoint=False) + # Sampling-point classification by display map. free_x, free_y = [], [] - occ_x, occ_y = [], [] + original_x, original_y = [], [] + inflated_x, inflated_y = [], [] + oob_x, oob_y = [], [] for angle in angles: cx = ox + sampling_radius * np.cos(angle) @@ -127,23 +139,35 @@ def debug_visualize_goal_sampling( gx = int((cx - origin_x) / resolution) gy = int((cy - origin_y) / resolution) - # also visualize out of bounds, marked as "occupied" if 0 <= gy < occ.shape[0] and 0 <= gx < occ.shape[1]: - if occ[gy, gx] == 0: + cell_value = display_map[gy, gx] + if cell_value == 0: free_x.append(gx) free_y.append(gy) + elif cell_value == 2: + inflated_x.append(gx) + inflated_y.append(gy) else: - occ_x.append(gx) - occ_y.append(gy) + original_x.append(gx) + original_y.append(gy) else: - occ_x.append(gx) - occ_y.append(gy) + oob_x.append(gx) + oob_y.append(gy) # Reuse the same window across calls and clear it to avoid drawing overlaps. fig = plt.figure("NavigateSkill Goal Sampling", figsize=(6, 6)) fig.clear() ax = fig.add_subplot(1, 1, 1) - ax.imshow(occ, origin="lower", cmap="gray_r", interpolation="nearest") + # 0=free (white), 1=original obstacle (black), 2=inflation buffer (orange) + cmap = ListedColormap(["#ffffff", "#202020", "#ff8c1a"]) + ax.imshow(display_map, origin="lower", cmap=cmap, vmin=0, vmax=2, interpolation="nearest") + + legend_handles = [ + Patch(facecolor="#202020", label="original obstacle"), + ] + if inflation_mask_np is not None: + radius_str = f" (r={occupancy_map.inflation_radius:.2f}m)" + legend_handles.append(Patch(facecolor="#ff8c1a", label=f"robot inflation{radius_str}")) # object if 0 <= ogy < occ.shape[0] and 0 <= ogx < occ.shape[1]: @@ -154,11 +178,23 @@ def debug_visualize_goal_sampling( if 0 <= rgy < occ.shape[0] and 0 <= rgx < occ.shape[1]: ax.scatter(rgx, rgy, c="blue", marker="o", s=60, label="robot") - # sampling points (free = green points, occupied/out = red crosses) + # sampling points if free_x: ax.scatter(free_x, free_y, c="green", s=30, label="free samples") - if occ_x: - ax.scatter(occ_x, occ_y, c="red", marker="x", s=30, label="occupied / oob samples") + if inflated_x: + ax.scatter( + inflated_x, + inflated_y, + c="orange", + marker="x", + s=40, + linewidths=2, + label="blocked by inflation", + ) + if original_x: + ax.scatter(original_x, original_y, c="red", marker="x", s=40, linewidths=2, label="blocked by obstacle") + if oob_x: + ax.scatter(oob_x, oob_y, c="gray", marker="x", s=30, label="out-of-bounds samples") # final chosen candidate (if any) if target_pos_candidate is not None: @@ -172,7 +208,9 @@ def debug_visualize_goal_sampling( ax.set_title("Goal Sampling around Object") ax.set_xlabel("x (grid index)") ax.set_ylabel("y (grid index)") - ax.legend(loc="upper right") + # Combine map-class legend (patches) with scatter legend. + scatter_handles, scatter_labels = ax.get_legend_handles_labels() + ax.legend(handles=legend_handles + list(scatter_handles), loc="upper right") fig.tight_layout() fig.show() # Let the GUI event loop process draw/update events even if the main thread From 91ca1f7fb1a7516775e91e658cab2c9109ea03fc Mon Sep 17 00:00:00 2001 From: EverNorif <1320346985@qq.com> Date: Thu, 21 May 2026 14:07:55 +0800 Subject: [PATCH 4/5] aabb get enhance. --- .../capabilities/navigation/occupancy_map.py | 142 ++++++++++-------- 1 file changed, 81 insertions(+), 61 deletions(-) diff --git a/source/autosim/autosim/capabilities/navigation/occupancy_map.py b/source/autosim/autosim/capabilities/navigation/occupancy_map.py index 6e4713d..2dfd65e 100644 --- a/source/autosim/autosim/capabilities/navigation/occupancy_map.py +++ b/source/autosim/autosim/capabilities/navigation/occupancy_map.py @@ -43,6 +43,8 @@ class OccupancyMapCfg: robot_radius: float = 0.25 """Robot footprint radius in meters used to inflate obstacles. Set to 0.0 to disable inflation (point-robot assumption). For non-circular bases, use the bounding-circle radius (conservative).""" + skip_path_substrings: tuple[str, ...] = ("light", "camera", "looks", "material", "sites") + """Lowercased substrings; any prim whose path contains one of these is excluded from the occupancy map..""" # ----------------------------------------------------------------------------- @@ -59,19 +61,16 @@ def _get_prim_bounds(stage, prim_path: str, verbose: bool = True) -> tuple[np.nd prim = stage.GetPrimAtPath(prim_path) - # Get bounding box - bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_]) - bbox = bbox_cache.ComputeWorldBound(prim) - - # Aligned bounding box range - aligned_box = bbox.ComputeAlignedBox() - min_point = aligned_box.GetMin() - max_point = aligned_box.GetMax() + bbox_cache = _make_bbox_cache() + aabb = _world_aabb_or_none(bbox_cache, prim) + if aabb is None: + raise ValueError(f"Prim '{prim_path}' has empty or invalid world bounds") + min_arr, max_arr = aabb if verbose: - _logger.info(f"Prim '{prim_path}' bounds: min={list(min_point)}, max={list(max_point)}") + _logger.info(f"Prim '{prim_path}' bounds: min={min_arr.tolist()}, max={max_arr.tolist()}") - return np.array([min_point[0], min_point[1], min_point[2]]), np.array([max_point[0], max_point[1], max_point[2]]) + return min_arr, max_arr def _is_geometry_prim(prim: Usd.Prim) -> bool: @@ -85,6 +84,48 @@ def _is_geometry_prim(prim: Usd.Prim) -> bool: ) +def _make_bbox_cache(time_code: Usd.TimeCode | None = None) -> UsdGeom.BBoxCache: + """Create a BBoxCache that covers all USD purposes. + + Collision-only meshes (e.g. ``*/Collisions/*`` under Isaac Lab assets) are typically authored + with ``purpose=guide`` or ``proxy`` rather than ``default``. A BBoxCache that includes only + ``default`` returns an empty range for these prims, which surfaces as a sentinel AABB of + ``[+FLT_MAX, -FLT_MAX]`` and breaks downstream height / extent filters. Including all four + purposes keeps the cache valid for both visual and collision geometry. + """ + if time_code is None: + time_code = Usd.TimeCode.Default() + return UsdGeom.BBoxCache( + time_code, + includedPurposes=[ + UsdGeom.Tokens.default_, + UsdGeom.Tokens.render, + UsdGeom.Tokens.proxy, + UsdGeom.Tokens.guide, + ], + ) + + +def _world_aabb_or_none(bbox_cache: UsdGeom.BBoxCache, prim: Usd.Prim) -> tuple[np.ndarray, np.ndarray] | None: + """Return ``(min_xyz, max_xyz)`` of a prim's world-space AABB, or None if it is empty/invalid. + + USD returns an empty ``Gf.Range3d`` (``min > max``, components saturated to ``±FLT_MAX``) when + a prim has no extent, is hidden, or its purpose is outside the cache's whitelist. We collapse + those cases into ``None`` so callers can simply skip the prim. + """ + bbox = bbox_cache.ComputeWorldBound(prim) + aligned = bbox.ComputeAlignedBox() + if aligned.IsEmpty(): + return None + pmin = aligned.GetMin() + pmax = aligned.GetMax() + pmin_arr = np.array([pmin[0], pmin[1], pmin[2]], dtype=np.float64) + pmax_arr = np.array([pmax[0], pmax[1], pmax[2]], dtype=np.float64) + if not (np.all(np.isfinite(pmin_arr)) and np.all(np.isfinite(pmax_arr))): + return None + return pmin_arr, pmax_arr + + # ----------------------------------------------------------------------------- # 2D geometry / rasterization utilities # ----------------------------------------------------------------------------- @@ -181,40 +222,36 @@ def _collect_candidate_prim_paths( sample_height_min: float, sample_height_max: float, min_xy_extent: float = 0.01, + skip_path_substrings: tuple[str, ...] = (), ) -> list[str]: - """Collect candidate obstacle prim paths from the scene (coarse filtering only).""" - - def xform_has_geometry_child(xform_prim: Usd.Prim) -> bool: - """Match previous behavior: only consider direct children. + """Collect candidate obstacle prim paths from the scene (coarse filtering only). - This avoids pulling in very top-level container Xforms (e.g., env roots) whose - geometry only exists deeper in the hierarchy. - """ - return any(_is_geometry_prim(child) for child in xform_prim.GetChildren()) + Only direct geometry prims (Mesh / Cube / Cylinder / Sphere / Capsule) are returned. + Xform containers are skipped here — ``stage.Traverse()`` already visits the leaf geometry + prims they hold, so going through containers would just produce duplicates and force a + second expansion pass downstream. + """ candidate_paths: list[str] = [] - bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_]) + bbox_cache = _make_bbox_cache() + skip_lc = tuple(s.lower() for s in skip_path_substrings) for prim in stage.Traverse(): path_str = str(prim.GetPath()) if floor_prim_path in path_str or "Robot" in path_str or "robot" in path_str.lower(): continue - if any(skip in path_str.lower() for skip in ["light", "camera", "looks", "material"]): + if skip_lc and any(skip in path_str.lower() for skip in skip_lc): continue - if _is_geometry_prim(prim): - pass - elif prim.IsA(UsdGeom.Xform): - if not xform_has_geometry_child(prim): - continue - else: + if not _is_geometry_prim(prim): continue - bbox = bbox_cache.ComputeWorldBound(prim) - aligned = bbox.ComputeAlignedBox() - prim_min = aligned.GetMin() - prim_max = aligned.GetMax() + aabb = _world_aabb_or_none(bbox_cache, prim) + if aabb is None: + _logger.debug(f"Skipping prim with empty/invalid world bounds: {path_str}") + continue + prim_min, prim_max = aabb if prim_min[2] > sample_height_max or prim_max[2] < sample_height_min: continue @@ -229,24 +266,6 @@ def xform_has_geometry_child(xform_prim: Usd.Prim) -> bool: return candidate_paths -def _expand_to_geometry_prims(prim: Usd.Prim) -> list[Usd.Prim]: - """Expand a prim to leaf geometry prims.""" - - if _is_geometry_prim(prim): - return [prim] - if prim.IsA(UsdGeom.Xform): - out: list[Usd.Prim] = [] - stack = list(prim.GetChildren()) - while stack: - p = stack.pop() - if _is_geometry_prim(p): - out.append(p) - elif p.IsA(UsdGeom.Xform): - stack.extend(p.GetChildren()) - return out - return [] - - # ----------------------------------------------------------------------------- # Footprint generation (world XY convex polygons) # ----------------------------------------------------------------------------- @@ -391,6 +410,8 @@ def _fallback_bbox_footprint_poly_xy( local_bbox = bbox_cache.ComputeLocalBound(prim) box = local_bbox.GetRange() + if box.IsEmpty(): + return None mat = xform_cache.GetLocalToWorldTransform(prim) bmin = box.GetMin() bmax = box.GetMax() @@ -481,31 +502,30 @@ def get_occupancy_map(env: ManagerBasedEnv, cfg: OccupancyMapCfg) -> OccupancyMa _logger.info(f"Sampling height range: [{sample_height_min:.2f}, {sample_height_max:.2f}]") candidate_paths = _collect_candidate_prim_paths( - stage, floor_prim_path, sample_height_min, sample_height_max, cfg.min_xy_extent + stage, + floor_prim_path, + sample_height_min, + sample_height_max, + cfg.min_xy_extent, + tuple(cfg.skip_path_substrings), ) _logger.info(f"Found {len(candidate_paths)} candidate prims") time_code = Usd.TimeCode.Default() - bbox_cache = UsdGeom.BBoxCache(time_code, includedPurposes=[UsdGeom.Tokens.default_]) + bbox_cache = _make_bbox_cache(time_code) xform_cache = UsdGeom.XformCache(time_code) - geom_prims: list[Usd.Prim] = [] for path_str in candidate_paths: prim = stage.GetPrimAtPath(path_str) if not prim.IsValid(): continue - geom_prims.extend(_expand_to_geometry_prims(prim)) - _logger.info(f"Expanded to {len(geom_prims)} geometry prims") - for prim in geom_prims: - if not prim.IsValid(): + # Coarse height filter (redundant with the candidate-collection pass for the common case, + # kept for safety in case bbox cache state diverges between the two traversals). + aabb = _world_aabb_or_none(bbox_cache, prim) + if aabb is None: continue - - # Coarse height filter - bbox = bbox_cache.ComputeWorldBound(prim) - aligned = bbox.ComputeAlignedBox() - pmin = aligned.GetMin() - pmax = aligned.GetMax() + pmin, pmax = aabb if pmin[2] > sample_height_max or pmax[2] < sample_height_min: continue From 772468d7b0e09a352bb1eb9b9d27fb1d07953255 Mon Sep 17 00:00:00 2001 From: EverNorif <1320346985@qq.com> Date: Thu, 21 May 2026 17:38:25 +0800 Subject: [PATCH 5/5] add scene primitive to occupancy map. --- .../capabilities/navigation/occupancy_map.py | 523 ++++++++++++------ source/autosim/autosim/skills/navigate.py | 4 + 2 files changed, 373 insertions(+), 154 deletions(-) diff --git a/source/autosim/autosim/capabilities/navigation/occupancy_map.py b/source/autosim/autosim/capabilities/navigation/occupancy_map.py index 2dfd65e..28fed16 100644 --- a/source/autosim/autosim/capabilities/navigation/occupancy_map.py +++ b/source/autosim/autosim/capabilities/navigation/occupancy_map.py @@ -1,12 +1,27 @@ +"""Build a 2D occupancy grid from an Isaac Lab scene for base navigation. + +Pipeline overview: + 1. Floor prim defines map XY bounds and cell grid. + 2. ``stage.Traverse()`` collects obstacle geometry prims (coarse filter incl. ``sample_height``). + 3. Candidates that also belong to ``env.scene`` use simulation link/root poses; others use USD xforms. + 4. Each prim is projected to a 2D convex footprint and rasterized; obstacles are inflated by ``robot_radius``. + +Call after ``env.reset()`` so articulation poses match the current episode. The map is static for the +lifetime of the returned ``OccupancyMap`` (no runtime updates). +""" + from __future__ import annotations -from dataclasses import MISSING +from collections import defaultdict +from dataclasses import MISSING, dataclass +import isaaclab.sim as sim_utils import numpy as np import torch +from isaaclab.assets import Articulation, RigidObject from isaaclab.envs import ManagerBasedEnv from isaaclab.utils import configclass -from pxr import Usd, UsdGeom +from pxr import Gf, Usd, UsdGeom from scipy.ndimage import binary_dilation from autosim.core.logger import AutoSimLogger @@ -14,6 +29,10 @@ _logger = AutoSimLogger("OccupancyMap") +# Isaac Lab single-env layout; map build currently assumes env index 0 (see ``OccupancyMapCfg.env_id`` for poses). +ENV_PRIM_PATH = "/World/envs/env_0" +_NUM_CIRCLE_POINTS = 32 # samples for cylinder/sphere/capsule XY footprints + # ----------------------------------------------------------------------------- # Public config / API @@ -35,9 +54,9 @@ class OccupancyMapCfg: cell_size: float = 0.05 """The size of the cell in meters.""" sample_height: float = 0.5 - """The height to sample the occupancy map at, in meters.""" + """Height slice center (meters above floor z). Used only in candidate-prim coarse filtering.""" height_tolerance: float = 0.2 - """The tolerance for the height sampling.""" + """Half-width of the height window for candidate-prim coarse filtering (meters).""" mesh_max_points: int = 5000 """Max number of mesh vertices used for footprint estimation (downsample if larger).""" robot_radius: float = 0.25 @@ -45,6 +64,47 @@ class OccupancyMapCfg: (point-robot assumption). For non-circular bases, use the bounding-circle radius (conservative).""" skip_path_substrings: tuple[str, ...] = ("light", "camera", "looks", "material", "sites") """Lowercased substrings; any prim whose path contains one of these is excluded from the occupancy map..""" + env_id: int = 0 + """Environment index used when reading poses from ``env.scene`` articulations / rigid objects.""" + + +# ----------------------------------------------------------------------------- +# Internal context +# ----------------------------------------------------------------------------- + + +@dataclass +class _RasterizeContext: + """Shared state for footprint generation and grid rasterization. + + ``world_mat`` passed per prim (when set) overrides ``xform_cache`` for that prim only — used for + scene-registered assets whose USD xforms may not reflect ``env.reset()`` joint states. + """ + + stage: Usd.Stage + """The USD stage.""" + occupancy_map: np.ndarray + """The occupancy map.""" + xform_cache: UsdGeom.XformCache + """The USD xform cache.""" + bbox_cache: UsdGeom.BBoxCache + """The USD bbox cache.""" + time_code: Usd.TimeCode + """The USD time code.""" + mesh_max_points: int + """The maximum number of mesh points used for footprint generation.""" + map_min_x: float + """The minimum x coordinate of the map.""" + map_min_y: float + """The minimum y coordinate of the map.""" + cell_size: float + """The size of the cell in meters.""" + cell_size: float + """The size of the cell in meters.""" + map_height: int + """The height of the map.""" + map_width: int + """The width of the map.""" # ----------------------------------------------------------------------------- @@ -185,8 +245,9 @@ def _rasterize_convex_poly( map_height: int, map_width: int, ) -> None: - """Rasterize a convex polygon into an occupancy map.""" + """Fill grid cells whose centers fall inside a world-frame XY convex polygon.""" + # Tight integer bounds around the polygon AABB (+1 cell padding) before per-cell inside test. poly_min_x = float(poly_xy[:, 0].min()) poly_max_x = float(poly_xy[:, 0].max()) poly_min_y = float(poly_xy[:, 1].min()) @@ -212,7 +273,7 @@ def _rasterize_convex_poly( # ----------------------------------------------------------------------------- -# Candidate discovery & expansion +# Candidate discovery # ----------------------------------------------------------------------------- @@ -256,9 +317,7 @@ def _collect_candidate_prim_paths( if prim_min[2] > sample_height_max or prim_max[2] < sample_height_min: continue - xy_extent_x = prim_max[0] - prim_min[0] - xy_extent_y = prim_max[1] - prim_min[1] - if xy_extent_x <= min_xy_extent or xy_extent_y <= min_xy_extent: + if (prim_max[0] - prim_min[0]) <= min_xy_extent or (prim_max[1] - prim_min[1]) <= min_xy_extent: continue candidate_paths.append(path_str) @@ -266,61 +325,201 @@ def _collect_candidate_prim_paths( return candidate_paths +# ----------------------------------------------------------------------------- +# Scene-registered assets (candidate ∩ env.scene): Isaac pose + USD geometry +# +# USD ``XformCache`` often reflects authored / bind poses, not post-reset articulation joints. +# For prims that are both (a) in the candidate list and (b) under an ``env.scene`` asset prefix, +# we keep collision **shape** from USD but apply **body pose** from Isaac Lab at build time. +# ----------------------------------------------------------------------------- + + +def _matrix4d_from_pos_quat(pos: np.ndarray, quat_wxyz: np.ndarray) -> Gf.Matrix4d: + rot = Gf.Rotation( + Gf.Quatd(float(quat_wxyz[0]), Gf.Vec3d(float(quat_wxyz[1]), float(quat_wxyz[2]), float(quat_wxyz[3]))) + ) + return Gf.Matrix4d().SetTransform(rot, Gf.Vec3d(float(pos[0]), float(pos[1]), float(pos[2]))) + + +def _build_scene_prim_prefixes(env: ManagerBasedEnv) -> dict[str, str]: + """Resolve ``env.scene`` asset names to concrete USD prim prefixes.""" + + prefixes: dict[str, str] = {} + for name in env.scene.keys(): + asset = env.scene[name] + if not hasattr(asset, "cfg") or not hasattr(asset.cfg, "prim_path"): + continue + prim = sim_utils.find_first_matching_prim(asset.cfg.prim_path) + if prim is None or not prim.IsValid(): + _logger.warning(f"Could not resolve prim_path for scene asset '{name}': {asset.cfg.prim_path}") + continue + prefixes[name] = prim.GetPath().pathString + return prefixes + + +def _match_scene_object(path_str: str, scene_prefixes: dict[str, str]) -> str | None: + """Map a prim path to a scene asset name via longest matching ``prim_path`` prefix.""" + + matched_name: str | None = None + matched_len = -1 + for name, prefix in scene_prefixes.items(): + prefix_norm = prefix.rstrip("/") + if path_str == prefix_norm or path_str.startswith(prefix_norm + "/"): + if len(prefix_norm) > matched_len: + matched_name = name + matched_len = len(prefix_norm) + return matched_name + + +def _partition_candidates_by_scene( + candidate_paths: list[str], scene_prefixes: dict[str, str] +) -> tuple[dict[str, list[str]], list[str]]: + """Split candidates into scene-corrected vs USD-xform groups (no prim appears in both).""" + + scene_paths: dict[str, list[str]] = defaultdict(list) + usd_only_paths: list[str] = [] + for path_str in candidate_paths: + obj_name = _match_scene_object(path_str, scene_prefixes) + if obj_name is None: + usd_only_paths.append(path_str) + else: + scene_paths[obj_name].append(path_str) + return dict(scene_paths), usd_only_paths + + +def _prim_world_matrix_from_body_frame( + prim: Usd.Prim, + body_prim: Usd.Prim, + body_pos_w: np.ndarray, + body_quat_wxyz: np.ndarray, + xform_cache: UsdGeom.XformCache, +) -> Gf.Matrix4d: + """World transform: T_world_prim = T_world_body(scene) @ T_body_prim(usd_fixed).""" + + prim_usd = xform_cache.GetLocalToWorldTransform(prim) + body_usd = xform_cache.GetLocalToWorldTransform(body_prim) + # Rigid offset of collision prim in link/body frame; invariant to joint angle. + prim_in_body = prim_usd * body_usd.GetInverse() + + return _matrix4d_from_pos_quat(body_pos_w, body_quat_wxyz) * prim_in_body + + +def _link_name_under_prefix(path_str: str, prefix_norm: str) -> str: + """First path segment below the asset root — must match ``Articulation.body_names`` entry.""" + + rel = path_str[len(prefix_norm) + 1 :] + return rel.split("/")[0] if "/" in rel else "" + + +def _to_pose_numpy(pos_w: torch.Tensor, quat_w: torch.Tensor) -> tuple[np.ndarray, np.ndarray]: + return pos_w.detach().cpu().numpy(), quat_w.detach().cpu().numpy() + + +def _get_prim_world_matrix_for_scene_asset( + env: ManagerBasedEnv, + stage, + obj_name: str, + path_str: str, + prim: Usd.Prim, + asset_prefix: str, + env_id: int, + xform_cache: UsdGeom.XformCache, +) -> Gf.Matrix4d | None: + """Build per-prim world matrix from ``env.scene``; returns ``None`` if mapping fails.""" + + asset = env.scene[obj_name] + prefix_norm = asset_prefix.rstrip("/") + + if isinstance(asset, Articulation): + link_name = _link_name_under_prefix(path_str, prefix_norm) + if not link_name or link_name not in asset.body_names: + _logger.warning(f"Link '{link_name}' invalid for articulation '{obj_name}'; skipping '{path_str}'") + return None + body_prim = stage.GetPrimAtPath(f"{prefix_norm}/{link_name}") + if not body_prim.IsValid(): + _logger.warning(f"USD link prim missing at '{prefix_norm}/{link_name}'") + return None + link_idx = asset.body_names.index(link_name) + body_pos, body_quat = _to_pose_numpy( + asset.data.body_pos_w[env_id, link_idx], asset.data.body_quat_w[env_id, link_idx] + ) + return _prim_world_matrix_from_body_frame(prim, body_prim, body_pos, body_quat, xform_cache) + + if isinstance(asset, RigidObject): + body_prim = stage.GetPrimAtPath(prefix_norm) + if not body_prim.IsValid(): + _logger.warning(f"USD root prim missing at '{prefix_norm}' for rigid object '{obj_name}'") + return None + body_pos, body_quat = _to_pose_numpy(asset.data.root_pos_w[env_id], asset.data.root_quat_w[env_id]) + return _prim_world_matrix_from_body_frame(prim, body_prim, body_pos, body_quat, xform_cache) + + _logger.debug(f"Scene asset '{obj_name}' is not Articulation/RigidObject; skipping '{path_str}'") + return None + + # ----------------------------------------------------------------------------- # Footprint generation (world XY convex polygons) # ----------------------------------------------------------------------------- -def _transform_points_local_to_world(points_local: np.ndarray, mat) -> np.ndarray: - """Transform points from local to world coordinates.""" +def _transform_points_local_to_world(points_local: np.ndarray, mat: Gf.Matrix4d) -> np.ndarray: + """Transform Nx3 local points to world frame (USD row-vector convention).""" - out = np.empty_like(points_local, dtype=np.float64) - for i in range(points_local.shape[0]): - p = points_local[i] - pw = mat.Transform((float(p[0]), float(p[1]), float(p[2]))) - out[i, 0] = pw[0] - out[i, 1] = pw[1] - out[i, 2] = pw[2] - return out + m = np.array(mat, dtype=np.float64) + hom = np.concatenate([points_local, np.ones((points_local.shape[0], 1), dtype=np.float64)], axis=1) + return hom @ m -def _downsample_points(points: np.ndarray, max_points: int) -> np.ndarray: - """Downsample points.""" +def _prim_local_to_world_matrix( + prim: Usd.Prim, xform_cache: UsdGeom.XformCache, world_mat: Gf.Matrix4d | None +) -> Gf.Matrix4d: + # ``None`` → USD static xform; non-``None`` → scene-corrected matrix from link/root pose. + return world_mat if world_mat is not None else xform_cache.GetLocalToWorldTransform(prim) + + +def _footprint_poly_xy_from_world_points(points_w: np.ndarray) -> np.ndarray | None: + """XY convex hull of world-frame points; requires at least 3 distinct vertices.""" + + poly = _convex_hull_2d(points_w[:, :2]) + return poly if poly.shape[0] >= 3 else None + +def _downsample_points(points: np.ndarray, max_points: int) -> np.ndarray: if points.shape[0] <= max_points: return points idx = np.linspace(0, points.shape[0] - 1, max_points, dtype=np.int64) return points[idx] +def _local_bbox_corners(box_min, box_max) -> np.ndarray: + return np.array( + [ + [box_min[0], box_min[1], box_min[2]], + [box_min[0], box_min[1], box_max[2]], + [box_min[0], box_max[1], box_min[2]], + [box_min[0], box_max[1], box_max[2]], + [box_max[0], box_min[1], box_min[2]], + [box_max[0], box_min[1], box_max[2]], + [box_max[0], box_max[1], box_min[2]], + [box_max[0], box_max[1], box_max[2]], + ], + dtype=np.float64, + ) + + def _mesh_footprint_poly_xy( mesh_prim: Usd.Prim, xform_cache: UsdGeom.XformCache, - sample_height_min: float, - sample_height_max: float, mesh_max_points: int, + world_mat: Gf.Matrix4d | None = None, ) -> np.ndarray | None: - """Generate a footprint polygon for a mesh.""" - mesh = UsdGeom.Mesh(mesh_prim) pts = mesh.GetPointsAttr().Get(Usd.TimeCode.Default()) if pts is None or len(pts) == 0: return None - points_local = np.asarray(pts, dtype=np.float64) - points_local = _downsample_points(points_local, mesh_max_points) - - mat = xform_cache.GetLocalToWorldTransform(mesh_prim) - points_w = _transform_points_local_to_world(points_local, mat) - - z = points_w[:, 2] - mask = (z >= sample_height_min) & (z <= sample_height_max) - if not np.any(mask): - return None - xy = points_w[mask][:, :2] - poly = _convex_hull_2d(xy) - if poly.shape[0] < 3: - return None - return poly + points_local = _downsample_points(np.asarray(pts, dtype=np.float64), mesh_max_points) + mat = _prim_local_to_world_matrix(mesh_prim, xform_cache, world_mat) + return _footprint_poly_xy_from_world_points(_transform_points_local_to_world(points_local, mat)) def _sample_circle_points(radius: float, num: int) -> np.ndarray: @@ -333,44 +532,45 @@ def _sample_circle_points(radius: float, num: int) -> np.ndarray: return np.stack([x, y, z], axis=1) -def _cube_footprint_poly_xy(cube_prim: Usd.Prim, xform_cache: UsdGeom.XformCache) -> np.ndarray | None: - """Generate a footprint polygon for a cube.""" +def _footprint_from_local_points( + prim: Usd.Prim, + points_local: np.ndarray, + xform_cache: UsdGeom.XformCache, + world_mat: Gf.Matrix4d | None, +) -> np.ndarray | None: + mat = _prim_local_to_world_matrix(prim, xform_cache, world_mat) + return _footprint_poly_xy_from_world_points(_transform_points_local_to_world(points_local, mat)) - cube = UsdGeom.Cube(cube_prim) - size = float(cube.GetSizeAttr().Get(Usd.TimeCode.Default()) or 0.0) + +def _cube_footprint_poly_xy( + cube_prim: Usd.Prim, xform_cache: UsdGeom.XformCache, world_mat: Gf.Matrix4d | None = None +) -> np.ndarray | None: + size = float(UsdGeom.Cube(cube_prim).GetSizeAttr().Get(Usd.TimeCode.Default()) or 0.0) if size <= 0.0: return None s = 0.5 * size corners_local = np.array([[-s, -s, 0.0], [-s, s, 0.0], [s, s, 0.0], [s, -s, 0.0]], dtype=np.float64) - mat = xform_cache.GetLocalToWorldTransform(cube_prim) - corners_w = _transform_points_local_to_world(corners_local, mat) - poly = _convex_hull_2d(corners_w[:, :2]) - if poly.shape[0] < 3: - return None - return poly + return _footprint_from_local_points(cube_prim, corners_local, xform_cache, world_mat) def _cylinder_like_footprint_poly_xy( - prim: Usd.Prim, radius: float, xform_cache: UsdGeom.XformCache, num_circle_points: int = 32 + prim: Usd.Prim, + radius: float, + xform_cache: UsdGeom.XformCache, + world_mat: Gf.Matrix4d | None = None, + num_circle_points: int = _NUM_CIRCLE_POINTS, ) -> np.ndarray | None: - """Generate a footprint polygon for a cylinder-like prim.""" - if radius <= 0.0: return None - points_local = _sample_circle_points(radius, num_circle_points) - mat = xform_cache.GetLocalToWorldTransform(prim) - points_w = _transform_points_local_to_world(points_local, mat) - poly = _convex_hull_2d(points_w[:, :2]) - if poly.shape[0] < 3: - return None - return poly + return _footprint_from_local_points(prim, _sample_circle_points(radius, num_circle_points), xform_cache, world_mat) def _capsule_footprint_poly_xy( - capsule_prim: Usd.Prim, xform_cache: UsdGeom.XformCache, num_circle_points: int = 32 + capsule_prim: Usd.Prim, + xform_cache: UsdGeom.XformCache, + world_mat: Gf.Matrix4d | None = None, + num_circle_points: int = _NUM_CIRCLE_POINTS, ) -> np.ndarray | None: - """Generate a footprint polygon for a capsule.""" - cap = UsdGeom.Capsule(capsule_prim) radius = float(cap.GetRadiusAttr().Get(Usd.TimeCode.Default()) or 0.0) height = float(cap.GetHeightAttr().Get(Usd.TimeCode.Default()) or 0.0) @@ -379,60 +579,83 @@ def _capsule_footprint_poly_xy( axis = str(cap.GetAxisAttr().Get(Usd.TimeCode.Default()) or "Z").upper() if axis == "Z": - return _cylinder_like_footprint_poly_xy(capsule_prim, radius, xform_cache, num_circle_points=num_circle_points) + return _cylinder_like_footprint_poly_xy(capsule_prim, radius, xform_cache, world_mat, num_circle_points) half_len = 0.5 * max(0.0, height) angles = np.linspace(0.0, 2.0 * np.pi, num_circle_points, endpoint=False) circle = np.stack([radius * np.cos(angles), radius * np.sin(angles)], axis=1) - if axis == "X": - c1 = np.array([-half_len, 0.0], dtype=np.float64) - c2 = np.array([half_len, 0.0], dtype=np.float64) - else: # "Y" - c1 = np.array([0.0, -half_len], dtype=np.float64) - c2 = np.array([0.0, half_len], dtype=np.float64) - + c1, c2 = np.array([-half_len, 0.0]), np.array([half_len, 0.0]) + else: + c1, c2 = np.array([0.0, -half_len]), np.array([0.0, half_len]) pts2 = np.concatenate([circle + c1, circle + c2], axis=0) points_local = np.concatenate([pts2, np.zeros((pts2.shape[0], 1), dtype=np.float64)], axis=1) - - mat = xform_cache.GetLocalToWorldTransform(capsule_prim) - points_w = _transform_points_local_to_world(points_local, mat) - poly = _convex_hull_2d(points_w[:, :2]) - if poly.shape[0] < 3: - return None - return poly + return _footprint_from_local_points(capsule_prim, points_local, xform_cache, world_mat) def _fallback_bbox_footprint_poly_xy( - prim: Usd.Prim, bbox_cache: UsdGeom.BBoxCache, xform_cache: UsdGeom.XformCache + prim: Usd.Prim, + bbox_cache: UsdGeom.BBoxCache, + xform_cache: UsdGeom.XformCache, + world_mat: Gf.Matrix4d | None = None, ) -> np.ndarray | None: - """Fallback footprint: projected local bbox corners convex hull in world XY.""" - - local_bbox = bbox_cache.ComputeLocalBound(prim) - box = local_bbox.GetRange() + box = bbox_cache.ComputeLocalBound(prim).GetRange() if box.IsEmpty(): return None - mat = xform_cache.GetLocalToWorldTransform(prim) - bmin = box.GetMin() - bmax = box.GetMax() - corners_local = np.array( - [ - [bmin[0], bmin[1], bmin[2]], - [bmin[0], bmin[1], bmax[2]], - [bmin[0], bmax[1], bmin[2]], - [bmin[0], bmax[1], bmax[2]], - [bmax[0], bmin[1], bmin[2]], - [bmax[0], bmin[1], bmax[2]], - [bmax[0], bmax[1], bmin[2]], - [bmax[0], bmax[1], bmax[2]], - ], - dtype=np.float64, + return _footprint_from_local_points(prim, _local_bbox_corners(box.GetMin(), box.GetMax()), xform_cache, world_mat) + + +def _footprint_poly_for_prim( + prim: Usd.Prim, + ctx: _RasterizeContext, + world_mat: Gf.Matrix4d | None = None, +) -> np.ndarray | None: + if prim.IsA(UsdGeom.Mesh): + return _mesh_footprint_poly_xy(prim, ctx.xform_cache, ctx.mesh_max_points, world_mat) + if prim.IsA(UsdGeom.Cube): + return _cube_footprint_poly_xy(prim, ctx.xform_cache, world_mat) + if prim.IsA(UsdGeom.Cylinder): + radius = float(UsdGeom.Cylinder(prim).GetRadiusAttr().Get(ctx.time_code) or 0.0) + return _cylinder_like_footprint_poly_xy(prim, radius, ctx.xform_cache, world_mat) + if prim.IsA(UsdGeom.Sphere): + radius = float(UsdGeom.Sphere(prim).GetRadiusAttr().Get(ctx.time_code) or 0.0) + return _cylinder_like_footprint_poly_xy(prim, radius, ctx.xform_cache, world_mat) + if prim.IsA(UsdGeom.Capsule): + return _capsule_footprint_poly_xy(prim, ctx.xform_cache, world_mat) + return _fallback_bbox_footprint_poly_xy(prim, ctx.bbox_cache, ctx.xform_cache, world_mat) + + +def _rasterize_prim( + ctx: _RasterizeContext, + prim: Usd.Prim, + world_mat: Gf.Matrix4d | None = None, +) -> bool: + poly = _footprint_poly_for_prim(prim, ctx, world_mat) + if poly is None: + return False + _rasterize_convex_poly( + ctx.occupancy_map, poly, ctx.map_min_x, ctx.map_min_y, ctx.cell_size, ctx.map_height, ctx.map_width ) - corners_w = _transform_points_local_to_world(corners_local, mat) - poly = _convex_hull_2d(corners_w[:, :2]) - if poly.shape[0] < 3: - return None - return poly + return True + + +def _rasterize_prim_paths( + ctx: _RasterizeContext, + path_strs: list[str], + world_mat_for_path: dict[str, Gf.Matrix4d] | None = None, +) -> int: + """Rasterize prims; return count successfully marked occupied. + + ``world_mat_for_path`` maps prim path → override world matrix (scene group). Missing entries use USD. + """ + + mats = world_mat_for_path or {} + rasterized = 0 + for path_str in path_strs: + prim = ctx.stage.GetPrimAtPath(path_str) + if prim.IsValid() and _rasterize_prim(ctx, prim, mats.get(path_str)): + rasterized += 1 + return rasterized # ----------------------------------------------------------------------------- @@ -452,25 +675,20 @@ def get_occupancy_map(env: ManagerBasedEnv, cfg: OccupancyMapCfg) -> OccupancyMa """ stage = env.scene.stage - - floor_prim_path = f"/World/envs/env_0/{cfg.floor_prim_suffix}" + floor_prim_path = f"{ENV_PRIM_PATH}/{cfg.floor_prim_suffix}" min_bound, max_bound = _get_prim_bounds(stage, floor_prim_path) - # Validate bounds - check for unreasonable values (inf, nan, or too large) world_extent_x = max_bound[0] - min_bound[0] world_extent_y = max_bound[1] - min_bound[1] - - bounds_invalid = ( + if ( not np.isfinite(world_extent_x) or not np.isfinite(world_extent_y) or world_extent_x > cfg.max_world_extent or world_extent_y > cfg.max_world_extent or world_extent_x <= 0 or world_extent_y <= 0 - ) - - if bounds_invalid: + ): raise ValueError(f"Floor bounds invalid or too large: extent_x={world_extent_x}, extent_y={world_extent_y}") # Calculate map bounds (use floor bounds) @@ -480,7 +698,7 @@ def get_occupancy_map(env: ManagerBasedEnv, cfg: OccupancyMapCfg) -> OccupancyMa map_width = int((map_max_x - map_min_x) / cfg.cell_size) + 1 map_height = int((map_max_y - map_min_y) / cfg.cell_size) + 1 - # Clamp map size to prevent memory issues + # Grow cell size in-place if the grid would exceed ``max_map_size`` (mutates ``cfg.cell_size``). if map_width > cfg.max_map_size or map_height > cfg.max_map_size: _logger.warning(f"Map size {map_width}x{map_height} exceeds max {cfg.max_map_size}") new_cell_size = max((map_max_x - map_min_x) / cfg.max_map_size, (map_max_y - map_min_y) / cfg.max_map_size) @@ -511,52 +729,49 @@ def get_occupancy_map(env: ManagerBasedEnv, cfg: OccupancyMapCfg) -> OccupancyMa ) _logger.info(f"Found {len(candidate_paths)} candidate prims") - time_code = Usd.TimeCode.Default() - bbox_cache = _make_bbox_cache(time_code) - xform_cache = UsdGeom.XformCache(time_code) - - for path_str in candidate_paths: - prim = stage.GetPrimAtPath(path_str) - if not prim.IsValid(): - continue + scene_prefixes = _build_scene_prim_prefixes(env) + scene_paths, usd_only_paths = _partition_candidates_by_scene(candidate_paths, scene_prefixes) + scene_prim_count = sum(len(paths) for paths in scene_paths.values()) + _logger.info( + f"Partitioned candidates: {len(usd_only_paths)} USD-only, {scene_prim_count} scene-registered " + f"across {len(scene_paths)} asset(s) {list(scene_paths.keys()) if scene_paths else []}" + ) - # Coarse height filter (redundant with the candidate-collection pass for the common case, - # kept for safety in case bbox cache state diverges between the two traversals). - aabb = _world_aabb_or_none(bbox_cache, prim) - if aabb is None: - continue - pmin, pmax = aabb - if pmin[2] > sample_height_max or pmax[2] < sample_height_min: - continue + time_code = Usd.TimeCode.Default() + ctx = _RasterizeContext( + stage=stage, + occupancy_map=occupancy_map, + xform_cache=UsdGeom.XformCache(time_code), + bbox_cache=_make_bbox_cache(time_code), + time_code=time_code, + mesh_max_points=cfg.mesh_max_points, + map_min_x=map_min_x, + map_min_y=map_min_y, + cell_size=cfg.cell_size, + map_height=map_height, + map_width=map_width, + ) - poly: np.ndarray | None = None - if prim.IsA(UsdGeom.Mesh): - poly = _mesh_footprint_poly_xy( - prim, - xform_cache, - sample_height_min, - sample_height_max, - mesh_max_points=cfg.mesh_max_points, + # Static / non-scene obstacles: footprint pose from USD ``XformCache``. + _rasterize_prim_paths(ctx, usd_only_paths) + + # Scene obstacles: same USD local geometry, world pose from simulation (doors, articulated fixtures). + scene_world_mats: dict[str, Gf.Matrix4d] = {} + for obj_name, paths in scene_paths.items(): + prefix_norm = scene_prefixes[obj_name].rstrip("/") + for path_str in paths: + prim = ctx.stage.GetPrimAtPath(path_str) + if not prim.IsValid(): + continue + world_mat = _get_prim_world_matrix_for_scene_asset( + env, stage, obj_name, path_str, prim, prefix_norm, cfg.env_id, ctx.xform_cache ) - elif prim.IsA(UsdGeom.Cube): - poly = _cube_footprint_poly_xy(prim, xform_cache) - elif prim.IsA(UsdGeom.Cylinder): - cyl = UsdGeom.Cylinder(prim) - radius = float(cyl.GetRadiusAttr().Get(time_code) or 0.0) - poly = _cylinder_like_footprint_poly_xy(prim, radius, xform_cache) - elif prim.IsA(UsdGeom.Sphere): - sph = UsdGeom.Sphere(prim) - radius = float(sph.GetRadiusAttr().Get(time_code) or 0.0) - poly = _cylinder_like_footprint_poly_xy(prim, radius, xform_cache) - elif prim.IsA(UsdGeom.Capsule): - poly = _capsule_footprint_poly_xy(prim, xform_cache) - else: - poly = _fallback_bbox_footprint_poly_xy(prim, bbox_cache, xform_cache) - - if poly is None or poly.shape[0] < 3: - continue + if world_mat is not None: + scene_world_mats[path_str] = world_mat - _rasterize_convex_poly(occupancy_map, poly, map_min_x, map_min_y, cfg.cell_size, map_height, map_width) + scene_rasterized = _rasterize_prim_paths(ctx, list(scene_world_mats.keys()), scene_world_mats) + if scene_paths: + _logger.info(f"Scene pose rasterization: {scene_rasterized}/{scene_prim_count} prims marked occupied") # Inflate obstacles by the robot footprint radius (+ optional extra margin). # Cells produced by inflation are tracked separately so the visualization can distinguish diff --git a/source/autosim/autosim/skills/navigate.py b/source/autosim/autosim/skills/navigate.py index 00ecdab..69d0f52 100644 --- a/source/autosim/autosim/skills/navigate.py +++ b/source/autosim/autosim/skills/navigate.py @@ -185,6 +185,10 @@ def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: robot_pos_w = state.robot_base_pose.detach().cpu().numpy() target_pos_candidate = goal_pos.detach().cpu().numpy() + self._logger.debug(f"object position in world frame: {obj_pos_w}") + self._logger.debug(f"robot position in world frame: {robot_pos_w}") + self._logger.debug(f"target position candidate: {target_pos_candidate}") + debug_visualize_goal_sampling( occupancy_map=self._occupancy_map, obj_pos_w=obj_pos_w,