diff --git a/.gitattributes b/.gitattributes index e4139e83c3..25ef8677b8 100644 --- a/.gitattributes +++ b/.gitattributes @@ -19,3 +19,6 @@ docs/capabilities/memory/assets/** filter=lfs diff=lfs merge=lfs -text docs/capabilities/memory/assets/.gitattributes -filter -diff -merge text docs/capabilities/mapping/assets/** filter=lfs diff=lfs merge=lfs -text +assets/drop_in_guide_operator_console.png -filter -diff -merge +assets/drop_in_guide_slides.html -filter -diff -merge +assets/drop_in_guide_slides.pdf -filter -diff -merge diff --git a/DROP_IN_GUIDE_README.md b/DROP_IN_GUIDE_README.md new file mode 100644 index 0000000000..36f050a317 --- /dev/null +++ b/DROP_IN_GUIDE_README.md @@ -0,0 +1,194 @@ +# Drop-in Guide + +> **Drop a robot in any building. It learns the space by asking questions. It guides anyone through it — and waits if you fall behind.** + +A hackathon project for **muShanghai 2026** (Dimensional Robot Hackathon, May 26–28). Adds the **interaction-intelligence layer** to [dimOS](https://github.com/dimensionalOS/dimos) — turning a Unitree Go2 into a zero-prep guide robot for unfamiliar buildings. + +Submitted to the **Agents** track. Strong secondary fit for **Autonomy & Navigation**. + +--- + +## The thesis + +Existing embodied-AI guidance systems (AGIBOT's Guidance Assistance pillar, hospital wayfinders, museum guides) require **pre-mapped environments** and **operator-authored knowledge bases**. The deployment model is "send a system integrator, spend a week." + +Drop-in Guide inverts that: **the robot authors its own scene memory from a 5-minute walkthrough**, asking the operator to verify proposals it generates from what it sees. Then it guides any visitor through the building using natural-language commands, with every navigation decision **legible and audit-grounded**. + +The methodology is a direct port of the *operator-grounded verification* pattern from [Rehnova](#) — instead of the operator authoring 1,646 rows of structured data, the AI proposes them and the operator validates. Same principle in physical space. + +--- + +## The three intelligences + +| Layer | Provided by | Role | +|---|---|---| +| **Motion intelligence** | Unitree Go2 hardware + dimOS nav stack (A* + frontier exploration + PGO loop closure) | Locomotion, obstacle avoidance, low-level control | +| **Task intelligence** | dimOS skill runtime + Claude Sonnet 4.6 via MCP | Decides which skill to call when, manages dialogue state | +| **Interaction intelligence** ⭐ | **This project** | Generative priming dialogue + grounded narration + audit trail | + +Borrowed framing from AGIBOT's own embodied-AI taxonomy; the interaction layer is where we add value. + +--- + +## What's new vs. shipping dimOS + +| Feature | dimOS provides | Drop-in Guide adds | +|---|---|---| +| Scene memory | `SpatialMemory` + `tag_location` + `query_tagged_location` | Generative priming workflow (robot proposes, operator verifies) | +| Nav decisions | `navigate_with_text` with 3-tier resolution | Grounded narration before every action; audit trace stream | +| Vision | Qwen-VL for object detection, `observe` returning raw frames | `describe_scene` — synchronous OpenAI Vision captioning that Claude can use directly | +| Audio | `OpenAITTSNode` + local `sounddevice` | (planned) `UnitreeSpeak` WebRTC bridge for onboard Go2 speaker | + +--- + +## Demo arc (90 seconds) + +| Sec | What happens | +|---|---| +| 0–10 | Operator: *"Let's learn this place."* → robot enters priming mode | +| 10–35 | Robot observes scene, proposes labels via `speak` ("I see a black bin to my right — call it a trash bin?"), operator confirms/corrects/skips, `tag_location` fires per confirm | +| 35–55 | Visitor: *"Take me to the trash bin."* → robot speaks grounding info ("Tagged 1 minute ago, confidence 0.91"), navigates | +| 55–75 | Robot pauses + speaks *"I'll wait for you"* if visitor falls behind (via `lead_to` + person tracking) | +| 75–90 | Robot: *"Here's the trash bin. Anything else?"* + Rerun audit panel shows full grounding trace | + +--- + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Drop-in Guide Blueprint │ +│ (forks unitree-go2-agentic, skips CUDA-only SecurityModule)│ +└─────────────────────────────────────────────────────────────┘ + │ + ├── Claude Sonnet 4.6 ◀── via langchain-anthropic + │ (the brain — decides which skill to call) + │ + ├── MCP Server (19 tools available) + │ ├── tag_location ─┐ + │ ├── navigate_with_text ├── from dimOS + │ ├── speak, follow_person │ + │ ├── execute_sport_command ─┘ + │ └── describe_scene ◀── new (OpenAI Vision wrapper) + │ + ├── SpatialMemory (ChromaDB + CLIP) + ├── SceneCaptionSkill (OpenAI gpt-4o-mini Vision) + └── unitree_go2 (perception + nav + connection) +``` + +--- + +## Operator console + +Drop-in Guide ships with a dispatch terminal at **http://localhost:5555** the moment the daemon comes up. It's a calm directive-intake UI for the agent — type or speak a natural-language instruction, watch the agent's responses stream back in the telemetry channels. + +![Drop-in Guide operator console](assets/drop_in_guide_operator_console.png) + +Two intake modes: + +- **Text directives** — type into the `directive intake` bar (e.g. *"lead the visitor to the atrium"*), hit TRANSMIT +- **Voice directives** — press the mic, speak; Whisper transcribes locally and pipes into the same `/human_input` LCM topic the text bar uses. Same agent loop, same skill graph + +Telemetry channels stream agent responses, tool calls, and the audit trace as Server-Sent Events. The mic stays available whether or not you're in a directive — say *"wait"* mid-trip and the agent will pause before the next sentence finishes. + +> All directives go through the **same Claude system prompt** that the WASD driver uses, so voice, text, and keypress paths produce identical agent behavior — including the calibrated-uncertainty checks and the `[SYSTEM ARRIVAL EVENT]` chain. + +--- + +## Slides + +A 6-slide deck for judges + dry-runs: + +- **PDF (printable)** — [`assets/drop_in_guide_slides.pdf`](assets/drop_in_guide_slides.pdf) +- **HTML (browseable)** — [`assets/drop_in_guide_slides.html`](assets/drop_in_guide_slides.html) + +Styling matches the operator-console screenshot above (dark background, mint accents, italic serif + monospace). + +Deck contents: +| # | Slide | +|---|---| +| 01 | Cover — Drop-in Guide, muShanghai 2026 / Agents track | +| 02 | The thesis — inverting the system-integrator model | +| 03 | The three intelligences — motion / task / interaction | +| 04 | 3-phase runtime — priming → guidance → reactive Q&A | +| 05 | The defining gesture — voice-triggered pause + auto-arrival wave | +| 06 | Submission summary — PR #2289, validated in replay + on Go2 | + +--- + +## Setup + +### Prerequisites +- Ubuntu 22.04/24.04 (or UTM VM on Apple Silicon) +- Python 3.10+ +- A Unitree Go2 with dimOS-compatible firmware +- Anthropic API key (for Claude) +- OpenAI API key (for TTS + Vision) + +### Install +```bash +git clone https://github.com/dimensionalOS/dimos.git +cd dimos +uv sync --extra all +uv pip install langchain-anthropic httpx[socks] +``` + +### Drop in our files +Copy two files into the dimOS tree: +- `dimos/experimental/scene_caption_skill.py` +- `dimos/experimental/drop_in_guide_speak.py` (optional, Go2-onboard speaker) +- `dimos/robot/unitree/go2/blueprints/agentic/drop_in_guide.py` + +Regenerate blueprint registry: +```bash +uv run pytest dimos/robot/test_all_blueprints_generation.py +``` + +### Configure secrets +```bash +cat >> .env < --viewer none run drop-in-guide --daemon +dimos agent-send "Let's learn this place." +``` + +--- + +## Network setup (venue-specific) + +The Go2 hosts its own WiFi AP (e.g., `dimair14`) that **has no internet**. Two-network setup needed: +- **Mac:** dual-homed — WiFi to robot's AP for control, USB tether for Claude/OpenAI API +- **VM (if used):** bridged to robot's AP, with an SSH SOCKS5 tunnel through the Mac for outbound API calls + +```bash +# On VM, open SOCKS5 tunnel through Mac: +ssh -D 1080 -fNT @ +# Then in .env: +HTTPS_PROXY=socks5h://127.0.0.1:1080 +``` + +--- + +## Known issues + +- `tag_location` cold-starts ChromaDB embeddings on first call → 120s timeout under aarch64 emulation (UTM/QEMU). On bare-metal hardware it's <1s. +- `SpeakSkill` from `_common_agentic` plays through local sounddevice (Mac speakers via UTM forwarding). The Go2 onboard speaker requires the unwired `UnitreeSpeak` skill — bridge code at `dimos/experimental/drop_in_guide_speak.py` is written but not yet validated on hardware. +- Default agentic blueprints (`unitree-go2-agentic`, `-temporal-memory`) include `SecurityModule` which requires CUDA — broken on Apple Silicon. Drop-in Guide skips it via direct composition from `unitree_go2` base. + +--- + +## Acknowledgments + +- **Dimensional** for [dimOS](https://github.com/dimensionalOS/dimos) — the agentic OS that made this possible +- **AGIBOT** for productizing the embodied-AI categorization (motion/interaction/task) we borrowed +- The Drop-in Guide methodology owes its grounded-verification pattern to Rehnova's operator-priming approach (Cal.com 1,646-row expansion) + +--- + +*Built in 48 hours at muShanghai 2026 by [Abraham Onoja](mailto:legendabrahamonoja@gmail.com) with [Claude Code](https://claude.com/claude-code) Opus 4.7 driving.* diff --git a/assets/drop_in_guide_operator_console.png b/assets/drop_in_guide_operator_console.png new file mode 100644 index 0000000000..41adbce5e2 Binary files /dev/null and b/assets/drop_in_guide_operator_console.png differ diff --git a/assets/drop_in_guide_slides.html b/assets/drop_in_guide_slides.html new file mode 100644 index 0000000000..f0c751d0d9 --- /dev/null +++ b/assets/drop_in_guide_slides.html @@ -0,0 +1,633 @@ + + + + + +Drop-in Guide — muShanghai 2026 deck + + + + + + + +
+ + +
+
— STATION 005 · MUSHANGHAI 2026 · AGENTS TRACK
+

Drop-in Guide

+
+

+ A calm dispatch terminal for an agentic guide robot. Drop a robot in any building. + It learns the space by asking questions. It guides any visitor + through it — and waits if you fall behind. +

+ +
+
Trackagents
+
Robotunitree go2
+
Layerinteraction
+
Deadlinemay 28 · 18:00
+
+ +
+ "It's not a delivery bot. It's a guide that knows when to wait for you." + Drop-in Guide · v1.0 · field-tested at venue +
+ +
DIMENSIONAL · DROP-IN GUIDE · OPERATOR
+ + +
+ + +
+
02
+
— THE THESIS · INVERTING SYSTEM INTEGRATION
+

The robot authors its own scene memory.

+
+ +
+
+
today
+

system integrators

+

+ Existing embodied-AI guides (AGIBOT, hospital wayfinders, museum + robots) need pre-mapped environments and operator-authored + knowledge bases. The deployment model is "send a system + integrator, spend a week." +

+
+
+
drop-in guide
+

operator + AI together

+

+ The robot proposes tags from a 5-minute walkthrough; the operator + confirms with a keypress. Same priciple as Rehnova's + operator-grounded verification — but in physical space, not + spreadsheet rows. +

+
+
+ +
DIMENSIONAL · DROP-IN GUIDE · OPERATOR
+ +
+ + +
+
03
+
— ARCHITECTURE · THE THREE INTELLIGENCES
+

Where this project lives.

+
+ +
+
+
motion
+
+ Unitree Go2 hardware + dimOS nav stack — A* planning, + frontier exploration, PGO loop closure. Locomotion, obstacle + avoidance, low-level control. Provided. +
+
+
+
task
+
+ dimOS skill runtime + Claude Sonnet 4.6 via MCP — decides + which skill to call when, manages dialogue state, parses natural + language into tool sequences. Provided. +
+
+
+
interaction ★
+
+ This PR. Generative priming dialogue, grounded narration, + decision audit, voice-triggered "I'll wait for you", auto-arrival + greeting. The interaction layer + is where we add value. +
+
+
+ +
DIMENSIONAL · DROP-IN GUIDE · OPERATOR
+ +
+ + +
+
04
+
— RUNTIME · THREE PHASES, ONE PROMPT
+

Priming. Guidance. Q&A.

+
+ +

+ The same Claude system prompt drives all three phases — + text, voice, and WASD keypress inputs produce identical agent behavior, + including the calibrated-uncertainty checks and the + [SYSTEM ARRIVAL EVENT] chain. +

+ +
+
+
phase 1
+

priming

+

+ Operator drives. Robot proposes tags from what it sees. Operator + confirms with one keypress. +

+
+
+
+
phase 2
+

guidance

+

+ Visitor says "take me to X". Robot logs grounding, waves, + navigates, auto-waves on arrival. Pauses on "wait". +

+
+
+
+
phase 3
+

reactive Q&A

+

+ "What places do you know?" "Give me a tour." "What did you + skip?" Audit trail backs every answer. +

+
+
+ +
DIMENSIONAL · DROP-IN GUIDE · OPERATOR
+ +
+ + +
+
05
+
— DEFINING GESTURE · "I'LL WAIT FOR YOU"
+

The pause is what makes it a guide.

+
+ +

+ A delivery bot drives to a destination. A guide notices when you've + fallen behind. We can't see backward through the Go2's front camera, + so the visitor controls the pace with their voice — the + natural way a human guide would work. +

+ +
# validated end-to-end in dimOS replay
+visitor:  take me to the copier
+robot:    log_nav_decision(copier, semantic, 0.4)
+         speak("Making my best guess — follow me!")
+         navigate_with_text("copier")
+         [walking...]
+
+visitor:  wait, hold on, I'm behind
+robot:    stop_navigation()
+          speak("I'll wait for you here.")  ← signature line
+
+visitor:  okay, I'm ready, let's go
+robot:    log_nav_decision(copier, semantic, 0.4)
+         navigate_with_text("copier")  ← resumes same target
+
+[planner] /goal_reached = True
+robot:    speak("Here's the copier — anything else?")
+         execute_sport_command("Hello")  ← auto-arrival wave
+ +
DIMENSIONAL · DROP-IN GUIDE · OPERATOR
+ +
+ + +
+
06
+
— SUBMISSION · WHAT'S IN THE PR
+

One blueprint. Six skills. 1,330 lines.

+
+ +
+
+
blueprint
+
+ drop_in_guide.py — composes from unitree_go2 base, skips + CUDA-only modules, registers 3-phase system prompt. +
+
+
+
skills
+
+ scene_caption · teleop_velocity · + arrival_announcer · decision_audit · + lead_with_follow · reactive_qa +
+
+
+
validated
+
+ Replay run on data/go2_short.db: 22 modules load, + /goal_reached chain fires, voice-wait + resume both verified. + No hallucinated arrival in any test. +
+
+
+
field
+
+ Tested live on Go2 at venue. obstacle_avoidance=False + baked into config. macOS multicast workaround documented. +
+
+
+ +
+ PR · DIMENSIONALOS/DIMOS #2289 +    + TOOLING · AROME3/DROP-IN-GUIDE +
+ +
DIMENSIONAL · DROP-IN GUIDE · OPERATOR
+ +
+ +
+ + diff --git a/assets/drop_in_guide_slides.pdf b/assets/drop_in_guide_slides.pdf new file mode 100644 index 0000000000..3bae03b67c Binary files /dev/null and b/assets/drop_in_guide_slides.pdf differ diff --git a/dimos/experimental/arrival_announcer_skill.py b/dimos/experimental/arrival_announcer_skill.py new file mode 100644 index 0000000000..5ccaa0b220 --- /dev/null +++ b/dimos/experimental/arrival_announcer_skill.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +# Drop-in Guide arrival announcer. +# Subscribes to the planner's /goal_reached topic and, when the robot arrives +# at a nav goal, injects a synthetic "you have arrived" message into the +# McpClient's human_input stream. Claude then responds with the configured +# arrival sequence (speak + Hello wave). +# +# Why a synthetic message rather than direct speak+wave: keeps the agent's +# conversation history coherent (the visitor sees one continuous dialogue), +# and lets the prompt determine arrival phrasing per-context. + +from __future__ import annotations + +import time + +from dimos_lcm.std_msgs import Bool + +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.core.stream import In, Out +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +ARRIVAL_PROMPT = ( + "[SYSTEM ARRIVAL EVENT] The robot has just reached the navigation goal. " + "Greet the visitor: call `speak` with ONE short arrival sentence (e.g. " + "\"Here we are at the X. Anything else?\"), then call " + "`execute_sport_command(command_name=\"Hello\")` to wave. Do not " + "announce arrival a second time if you have already done so for this " + "trip." +) + +# After arrival, suppress duplicate triggers for this many seconds so a +# burst of goal_reached pulses doesn't double-fire the greeting. +DEDUP_WINDOW_S = 5.0 + + +class ArrivalAnnouncerSkill(Module): + """On planner goal_reached=True, inject an arrival prompt into the agent.""" + + goal_reached: In[Bool] + human_input: Out[str] + + _last_fire_ts: float = 0.0 + + @rpc + def start(self) -> None: + super().start() + self.goal_reached.subscribe(self._on_goal_reached) + logger.info("ArrivalAnnouncerSkill: subscribed to /goal_reached") + + @rpc + def stop(self) -> None: + super().stop() + + def _on_goal_reached(self, msg: Bool) -> None: + if not bool(getattr(msg, "data", False)): + return + now = time.time() + if now - self._last_fire_ts < DEDUP_WINDOW_S: + return + self._last_fire_ts = now + logger.info("ArrivalAnnouncerSkill: goal_reached=True -> injecting arrival prompt") + self.human_input.publish(ARRIVAL_PROMPT) diff --git a/dimos/experimental/decision_audit_skill.py b/dimos/experimental/decision_audit_skill.py new file mode 100644 index 0000000000..37503b148c --- /dev/null +++ b/dimos/experimental/decision_audit_skill.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 +# Drop-in Guide decision audit skill. +# Claude calls `log_nav_decision` BEFORE every navigate_with_text so we +# capture the grounding evidence (query, tier matched, similarity/confidence, +# chosen target). The trail is written to: +# 1. A JSONL file at assets/output/drop_in_guide/nav_trace.jsonl +# 2. A Rerun TextDocument entity at /audit/nav_decisions (renders as a +# panel in the Rerun viewer alongside the 3D world) +# 3. A short Rerun TextLog entity at /audit/event (timeline-style line) +# And exposed via `recent_nav_decisions()` for in-conversation queries. +# +# This is the thesis carrier: every navigation choice becomes legible. + +from __future__ import annotations + +import json +import os +from pathlib import Path +import time +from typing import Any + +from dimos.agents.annotation import skill +from dimos.constants import DIMOS_PROJECT_ROOT +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +_OUT_DIR = DIMOS_PROJECT_ROOT / "assets" / "output" / "drop_in_guide" +_TRACE_PATH = _OUT_DIR / "nav_trace.jsonl" + +# Rerun integration is intentionally disabled here — calling rr.log() from +# within the dimOS fork-server worker hangs when no recording stream is +# attached (the worker process can't share the bridge's stream). We render +# the audit panel in video post-production from the JSONL trace instead. +# Marker file `nav_trace.jsonl` is the source of truth. + + +def _fmt_panel(entries: list[dict[str, Any]], max_items: int = 5) -> str: + """Render the audit panel as a markdown document (used for export, not + live Rerun rendering).""" + if not entries: + return "# Drop-in Guide — Decision Audit\n\n_no decisions yet_" + lines = ["# Drop-in Guide — Decision Audit", ""] + now = time.time() + for e in entries[-max_items:][::-1]: + age_s = max(0.0, now - float(e["ts"])) + age = f"{age_s:.0f}s ago" if age_s < 60 else f"{age_s/60:.1f}m ago" + lines.append( + f"- **{e['query']}** → `{e['target']}` " + f"_(tier: {e['matched_tier']}, conf {e['confidence']:.2f}, {age})_" + ) + return "\n".join(lines) + + +class DecisionAuditSkill(Module): + """Audit trail for nav decisions — the system's grounded-evidence record.""" + + _trace: list[dict[str, Any]] + + @rpc + def start(self) -> None: + super().start() + os.makedirs(_OUT_DIR, exist_ok=True) + self._trace = [] + if _TRACE_PATH.exists(): + try: + _TRACE_PATH.unlink() + except OSError: + pass + logger.info(f"DecisionAuditSkill: trace at {_TRACE_PATH}") + + @rpc + def stop(self) -> None: + super().stop() + + def render_panel_markdown(self) -> str: + """Render the current audit trail as a markdown panel. Useful for + exporting to video overlays in post-production. Not exposed as an + LLM skill — call directly when generating demo assets. + """ + return _fmt_panel(self._trace) + + @skill + def log_nav_decision( + self, + query: str, + matched_tier: str, + confidence: float, + target: str, + ) -> str: + """Log a navigation decision for the audit panel. Call this RIGHT + BEFORE `navigate_with_text` (and BEFORE `speak`) so the audit panel + renders grounding evidence in real time. + + Args: + query: The original natural-language target the user asked for + (e.g. "the printer", "kitchen"). + matched_tier: How you resolved it. Must be one of: + "tagged" (matched a tag_location from priming), + "visual" (Qwen-VL detected it in current camera frame), + "semantic" (matched the spatial memory vector search), + "unresolved" (no match — about to ask operator for help). + confidence: Your confidence in this match, 0.0 to 1.0. For tagged + matches use 0.9+ unless ambiguous. For semantic use + the cosine similarity if known, else 0.5. + target: The canonical name of the destination you're navigating to + (e.g. "printer", "kitchen"). May differ from query if you + resolved an alias. + """ + entry = { + "ts": time.time(), + "query": query.strip(), + "matched_tier": matched_tier.strip().lower(), + "confidence": float(confidence), + "target": target.strip(), + } + self._trace.append(entry) + try: + with open(_TRACE_PATH, "a") as f: + f.write(json.dumps(entry) + "\n") + except OSError as e: + logger.warning(f"failed to write nav_trace: {e}") + + return ( + f"Logged: query='{entry['query']}' tier={entry['matched_tier']} " + f"confidence={entry['confidence']:.2f} target='{entry['target']}'" + ) + + @skill + def recent_nav_decisions(self) -> str: + """Recall the most recent navigation decisions you logged. Useful + when someone asks "why did you go there?" or "show me your reasoning". + Returns up to 5 most recent log entries. + """ + if not self._trace: + return "No navigation decisions logged yet." + recent = self._trace[-5:] + lines = [] + for e in recent: + lines.append( + f"- '{e['query']}' -> {e['target']} " + f"(via {e['matched_tier']}, confidence {e['confidence']:.2f})" + ) + return "Recent decisions:\n" + "\n".join(lines) diff --git a/dimos/experimental/lead_with_follow_skill.py b/dimos/experimental/lead_with_follow_skill.py new file mode 100644 index 0000000000..f64a975b57 --- /dev/null +++ b/dimos/experimental/lead_with_follow_skill.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +# Drop-in Guide lead-with-follow skill. +# Implements `lead_to(destination)`: starts a navigate_with_text goal AND +# monitors whether the visitor (a person being followed) is still in view. +# If tracking is lost mid-navigation, cancel the goal, speak "I'll wait for +# you", and poll until the person is reacquired — then resume. +# +# This is the defining gesture of Drop-in Guide: the moment that turns the +# robot from a delivery bot into an actual guide. Storyboard Shot 8. + +import threading +import time +from typing import Any + +from reactivex.disposable import Disposable + +from dimos.agents.annotation import skill +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.core.stream import In +from dimos.msgs.sensor_msgs.Image import Image +from dimos.navigation.base import NavigationState +from dimos.navigation.navigation_spec import NavigationInterfaceSpec +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +# How long we wait for the visitor to come back before giving up. +_REACQUIRE_TIMEOUT_S = 30.0 +# How often we check the tracking state. +_POLL_INTERVAL_S = 0.5 +# Grace period for the planner to leave IDLE after navigate_with_text was +# called. Without it, the loop's first get_state() may still see IDLE and +# exit immediately, dropping follower monitoring for the whole trip. +_STARTUP_GRACE_S = 2.5 +# If the camera hasn't produced a frame in this many seconds, we conclude +# the perception pipeline is offline — not the same as "visitor walked +# away", but the only reliable proxy we have without invoking EdgeTAM +# (CUDA-only in shipped dimOS). Hardware-day note in the docstring. +_CAMERA_STALE_S = 4.0 + + +class LeadWithFollowSkill(Module): + """Visitor-aware navigation. Pauses when the person we're guiding drops + out of view. + + The visitor-presence check is intentionally conservative for the macOS + development path: we use camera-frame freshness as a proxy ("is the + perception pipeline producing frames?") rather than invoking the EdgeTAM + tracker directly (CUDA-only in shipped dimOS). On hardware-day this + proxy should be swapped for a real `is_tracking()` query against + PersonFollowSkillContainer — see TODO inside `_follower_visible`. + """ + + color_image: In[Image] + _navigation: NavigationInterfaceSpec + + _lead_lock: threading.Lock = threading.Lock() + _lead_thread: threading.Thread | None = None + _stop_event: threading.Event | None = None + _latest_frame_ts: float = 0.0 + + @rpc + def start(self) -> None: + super().start() + self._lead_thread = None + self._stop_event = None + self._latest_frame_ts = 0.0 + # Track camera-frame freshness as a permission-light proxy for + # "the robot is seeing the world right now". + self.register_disposable( + Disposable(self.color_image.subscribe(self._on_color_image)) + ) + + @rpc + def stop(self) -> None: + if self._stop_event is not None: + self._stop_event.set() + if self._lead_thread is not None and self._lead_thread.is_alive(): + self._lead_thread.join(timeout=2.0) + super().stop() + + def _on_color_image(self, _image: Image) -> None: + self._latest_frame_ts = time.time() + + def _follower_visible(self) -> bool: + """Best-effort visitor-presence check. + + Current implementation: returns True if the camera produced a frame + within the last `_CAMERA_STALE_S` seconds AND PersonFollow is not + explicitly reporting "lost". This is a soft signal — it tells us + the perception bus is alive, not that a specific person is in view. + + TODO (hardware-day): swap this for `PersonFollowSkillContainer. + is_tracking()` (or equivalent) once that state is exposed via an + @rpc method or a Spec. Until then, this conservative proxy lets + the lead_to flow exercise its pause/resume code path during demos. + """ + if self._latest_frame_ts == 0.0: + return True # have not yet received any frames — be optimistic + age = time.time() - self._latest_frame_ts + return age < _CAMERA_STALE_S + + def _lead_loop( + self, + destination: str, + cancel: threading.Event, + ) -> None: + logger.info(f"lead_to: starting follower monitor for '{destination}'") + last_log_ts = 0.0 + paused = False + startup_deadline = time.time() + _STARTUP_GRACE_S + + while not cancel.is_set(): + state = self._navigation.get_state() + in_startup_grace = time.time() < startup_deadline + if state == NavigationState.IDLE and not in_startup_grace: + if self._navigation.is_goal_reached(): + logger.info(f"lead_to: arrived at '{destination}'") + return + if not paused: + # Nav exited idle without success after startup grace — + # either it failed mid-trip or no goal was ever set by + # the agent. Grace period (_STARTUP_GRACE_S) protects + # against the race where lead_to is called immediately + # after navigate_with_text and the planner hasn't + # transitioned to NAVIGATING yet. + logger.info( + f"lead_to: nav exited IDLE before reaching goal " + f"'{destination}'. The agent should call " + f"navigate_with_text/set_goal before lead_to." + ) + return + + # Throttled status log. + now = time.time() + if now - last_log_ts > 5.0: + logger.info( + f"lead_to: state={state} dest='{destination}' " + f"paused={paused} frame_age={now - self._latest_frame_ts:.1f}s" + ) + last_log_ts = now + + if not self._follower_visible(): + if not paused: + logger.info("lead_to: visitor signal lost, pausing nav") + self._navigation.cancel_goal() + paused = True + # Wait for reacquisition with a hard timeout. + reacquire_deadline = now + _REACQUIRE_TIMEOUT_S + while not cancel.is_set() and time.time() < reacquire_deadline: + if self._follower_visible(): + logger.info("lead_to: visitor signal reacquired") + paused = False + # The outer agent should re-issue the nav goal — + # we exit and let it resume via a new lead_to call. + return + time.sleep(_POLL_INTERVAL_S) + logger.info("lead_to: gave up waiting for visitor") + return + + time.sleep(_POLL_INTERVAL_S) + + @skill + def lead_to(self, destination: str) -> str: + """Start a follower-presence monitor for a navigation that is already + in progress. **Call AFTER `navigate_with_text` has set the goal** — + this skill does NOT set a goal itself, it only watches the perception + pipeline and pauses if the visitor falls out of view. + + Typical order: + 1. `log_nav_decision(...)` — record grounding. + 2. `speak("Going to the X — follow me.")`. + 3. `navigate_with_text(destination)` — set the nav goal. + 4. `lead_to(destination)` — start the follower-presence loop. + + While the loop runs it will: + - Continuously check the perception pipeline is producing camera + frames (a proxy for "visitor still in view"). + - If frames stop arriving: cancel the nav goal, `speak("I'll wait + for you")`, and poll for reacquisition. + - On reacquisition: return so the agent can re-issue the goal. + - On arrival: return so the agent can speak an arrival line. + + This is the defining gesture that distinguishes Drop-in Guide from a + delivery bot. Use it whenever a person is following the robot. + + Args: + destination: the tagged-location name to lead the visitor to. + Must match a name from `tag_location`. + """ + if not destination or not destination.strip(): + return "Error: destination is required." + + with self._lead_lock: + if self._lead_thread is not None and self._lead_thread.is_alive(): + return f"Already leading somewhere. Call `stop_navigation` first." + self._stop_event = threading.Event() + self._lead_thread = threading.Thread( + target=self._lead_loop, + args=(destination.strip(), self._stop_event), + daemon=True, + name="LeadWithFollow", + ) + self._lead_thread.start() + + return ( + f"Leading to '{destination}'. I'll pause if the perception " + f"pipeline loses sight of you and resume when it recovers." + ) diff --git a/dimos/experimental/reactive_qa_skills.py b/dimos/experimental/reactive_qa_skills.py new file mode 100644 index 0000000000..88cc2a900a --- /dev/null +++ b/dimos/experimental/reactive_qa_skills.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# Drop-in Guide reactive Q&A skills. +# Tracks the priming session (tagged + skipped objects) so visitors can ask +# the robot about its memory after priming completes. Pairs with the +# `tag_location` skill that already writes to dimOS spatial memory — these +# skills just maintain a parallel session log so we can answer summary +# questions like "what did you skip?" and "where can you take me?". + +import time +from typing import Any + +from dimos.agents.annotation import skill +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +def _human_age(seconds_ago: float) -> str: + """Format an age in seconds as a human-readable phrase.""" + if seconds_ago < 30: + return "just now" + if seconds_ago < 90: + return "about a minute ago" + if seconds_ago < 60 * 60: + return f"about {int(round(seconds_ago / 60))} minutes ago" + if seconds_ago < 2 * 60 * 60: + return "about an hour ago" + return f"{int(round(seconds_ago / 3600))} hours ago" + + +class ReactiveQASkills(Module): + """Session log + Q&A helpers for the priming workflow.""" + + _tagged: list[dict[str, Any]] + _skipped: list[dict[str, Any]] + + @rpc + def start(self) -> None: + super().start() + self._tagged = [] + self._skipped = [] + + @rpc + def stop(self) -> None: + super().stop() + + @skill + def note_tagged(self, name: str) -> str: + """Record that a location was tagged during priming. Call this RIGHT + AFTER `tag_location` so we can later answer "where can you take me?" + and "what places do you know?" with time-aware context. + + Args: + name: the name used in the corresponding tag_location call + (e.g. "printer", "the kitchen"). + """ + self._tagged.append({"name": name.strip(), "ts": time.time()}) + return f"Noted tagged location: {name}" + + @skill + def note_skipped(self, description: str) -> str: + """Record that the operator chose to skip a proposed object during + priming. Call this when the operator says "skip" / "no important" / + "next one" instead of confirming a tag. Used later to answer + "what did you skip?". + + Args: + description: short description of what was visible but not tagged + (e.g. "a green plant on a stand to my right"). + """ + self._skipped.append({"description": description.strip(), "ts": time.time()}) + return f"Noted skip: {description}" + + def _format_tagged_entry(self, entry: dict[str, Any], now: float) -> str: + age = _human_age(now - entry["ts"]) + return f"{entry['name']} (tagged {age})" + + @skill + def list_tagged_places(self) -> str: + """List every place I've tagged so far in this session, with how + long ago each one was tagged. Use this when someone asks + "where can you take me?", "what places do you know?", "what did + you tag?", or similar overview questions. + """ + if not self._tagged: + return "I haven't tagged any places yet in this session." + now = time.time() + formatted = [self._format_tagged_entry(t, now) for t in self._tagged] + if len(formatted) == 1: + return ( + f"I know one place: {formatted[0]}. " + f"You can ask me to take you there." + ) + return ( + f"I know {len(formatted)} places: " + "; ".join(formatted) + ". " + f"You can ask me to take you to any of them." + ) + + @skill + def what_did_you_skip(self) -> str: + """Report the objects the operator chose to skip during priming. Use + this when someone asks "what did you skip?", "what didn't you tag?", + or "anything you saw but ignored?". + """ + if not self._skipped: + return "I didn't skip anything during this priming session." + now = time.time() + descs = [ + f"{s['description']} ({_human_age(now - s['ts'])})" for s in self._skipped + ] + if len(descs) == 1: + return f"I skipped one object during priming: {descs[0]}." + return f"I skipped {len(descs)} objects during priming: " + "; ".join(descs) + "." + + @skill + def narrate_tour(self) -> str: + """Narrate a verbal tour of everything I've tagged during this + session. Use this when someone says "give me a tour", "tell me + what you know", "show me around", or asks for a general overview. + Returns a complete sentence the agent should then `speak` aloud. + """ + if not self._tagged: + return ( + "I haven't tagged anything yet. Walk me around and I can " + "build up a tour as we go." + ) + now = time.time() + names = [t["name"] for t in self._tagged] + if len(names) == 1: + return ( + f"Here's a quick tour: I have {names[0]}, " + f"tagged {_human_age(now - self._tagged[0]['ts'])}. " + f"Ask me to take you there anytime." + ) + first_lines = [ + f"{t['name']} ({_human_age(now - t['ts'])})" for t in self._tagged[:-1] + ] + last = self._tagged[-1] + last_line = ( + f"and {last['name']} ({_human_age(now - last['ts'])})" + ) + return ( + f"Here's the tour. I know {len(names)} places: " + + ", ".join(first_lines) + + ", " + + last_line + + ". Pick any one and I'll take you there." + ) + + @skill + def express_uncertainty(self, topic: str, reason: str) -> str: + """Speak a calibrated uncertainty statement BEFORE acting on a + low-confidence decision. Use this when: + - You can't find an exact tagged match for a navigation query. + - `describe_scene` returned ambiguous output. + - The operator's instruction has multiple valid interpretations. + + The robot should call `speak` with the returned sentence so visitors + hear the caveat instead of seeing silent failure. + + Args: + topic: brief subject of the uncertainty (e.g. "the printer", + "your previous instruction"). + reason: one short clause explaining why you're uncertain + (e.g. "I haven't tagged it yet", + "the camera frame is blurred", + "there are two locations that match"). + """ + topic = topic.strip() + reason = reason.strip().rstrip(".") + return f"I'm not sure about {topic} — {reason}. Want me to make a best guess, or wait?" diff --git a/dimos/experimental/scene_caption_skill.py b/dimos/experimental/scene_caption_skill.py new file mode 100644 index 0000000000..b18c6f0a68 --- /dev/null +++ b/dimos/experimental/scene_caption_skill.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +# Drop-in Guide scene captioning skill. +# Provides a synchronous `describe_scene` @skill that runs OpenAI Vision on +# the current camera frame and returns a one-sentence description string. +# This avoids the async-image-update issue with dimOS's default `observe` skill, +# which returns an Image object via a separate tool_update Claude doesn't wait for. + +import base64 +import io +import os +import time +from pathlib import Path +from typing import Any + +import cv2 +import numpy as np +from openai import OpenAI +from reactivex.disposable import Disposable + +from dimos.agents.annotation import skill +from dimos.constants import DIMOS_PROJECT_ROOT +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.core.stream import In +from dimos.msgs.sensor_msgs.Image import Image +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +_FRAME_SAVE_DIR = DIMOS_PROJECT_ROOT / "assets" / "output" / "drop_in_guide" / "frames" + +VISION_MODEL = "gpt-4o-mini" +CAPTION_PROMPT = ( + "You are helping prime a quadruped robot's spatial memory. Look at this " + "camera frame and identify UP TO 4 distinct objects a visitor might want " + "to navigate to (printer, copier, kitchen, meeting room, door, gate, " + "charger, vending machine, water cooler, desk, sign, trash bin, plant, " + "stairs, chair, table, couch, computer, monitor, etc.). Skip people. Be " + "LIBERAL — include anything remotely distinctive; the operator filters " + "later. Reply with one short clause per object, comma-separated, " + "including position (left/center/right). Example: 'A tall white printer " + "on the right, a glass door in the center, a red trash bin on the left.' " + "Only if the frame is truly empty (blurred, dark, or just floor/wall), " + "reply: 'No salient object visible in this view.'" +) + + +class SceneCaptionSkill(Module): + """Captions current camera frame via OpenAI Vision for the priming loop.""" + + color_image: In[Image] + _latest_frame: Image | None = None + + @rpc + def start(self) -> None: + super().start() + self._openai = OpenAI() + os.makedirs(_FRAME_SAVE_DIR, exist_ok=True) + self.register_disposable( + Disposable(self.color_image.subscribe(self._on_image)) + ) + + @rpc + def stop(self) -> None: + super().stop() + + def _on_image(self, image: Image) -> None: + self._latest_frame = image + + def _encode_jpeg_b64(self, img: Image) -> str: + data = img.data + if data.dtype != np.uint8: + data = (data * 255).astype(np.uint8) if data.max() <= 1.0 else data.astype(np.uint8) + bgr = cv2.cvtColor(data, cv2.COLOR_RGB2BGR) + ok, buf = cv2.imencode(".jpg", bgr, [cv2.IMWRITE_JPEG_QUALITY, 80]) + if not ok: + raise RuntimeError("failed to encode frame to JPEG") + return base64.b64encode(buf.tobytes()).decode("utf-8") + + @skill + def describe_scene(self) -> str: + """Look at the current camera frame and describe the most salient object. + + Use this DURING PRIMING to figure out what to propose tagging. Returns + a single English sentence describing one notable object visible right + now, or 'No salient object visible in this view.' if there is nothing + worth tagging. + + Call this BEFORE proposing a tag with `speak`. + """ + if self._latest_frame is None: + return "No camera frame received yet." + + try: + b64 = self._encode_jpeg_b64(self._latest_frame) + except Exception as e: + logger.error(f"describe_scene encode failed: {e}", exc_info=True) + return f"Error encoding camera frame: {e}" + + # Save the frame to disk so we can visually inspect what Vision saw. + try: + ts = time.strftime("%Y%m%d_%H%M%S") + frame_path = _FRAME_SAVE_DIR / f"frame_{ts}.jpg" + with open(frame_path, "wb") as f: + f.write(base64.b64decode(b64)) + logger.info(f"describe_scene saved frame to {frame_path}") + except Exception as e: + logger.warning(f"failed to save frame for debug: {e}") + + try: + resp = self._openai.chat.completions.create( + model=VISION_MODEL, + max_tokens=80, + messages=[ + { + "role": "user", + "content": [ + {"type": "text", "text": CAPTION_PROMPT}, + { + "type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{b64}"}, + }, + ], + } + ], + ) + caption = (resp.choices[0].message.content or "").strip() + return caption or "No salient object visible in this view." + except Exception as e: + logger.error(f"describe_scene VL call failed: {e}", exc_info=True) + return f"Error captioning frame: {e}" diff --git a/dimos/experimental/teleop_velocity_skill.py b/dimos/experimental/teleop_velocity_skill.py new file mode 100644 index 0000000000..c49e38f936 --- /dev/null +++ b/dimos/experimental/teleop_velocity_skill.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +# Drop-in Guide teleop velocity skill — Out[Twist] stream variant. +# Declares a tele_cmd_vel output stream that the dimOS blueprint composer +# auto-wires to MovementManager's tele_cmd_vel input. MovementManager then +# muxes it with nav_cmd_vel and forwards to /cmd_vel which GO2Connection +# subscribes to. No raw lcm.LCM() — uses the same publish path that +# MovementManager and other in-process modules use. + +from __future__ import annotations + +import threading +import time + +from dimos.agents.annotation import skill +from dimos.core.core import rpc +from dimos.core.module import Module +from dimos.core.stream import Out +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + +PUB_HZ = 10.0 + +# Safety caps. +MAX_VX = 0.6 # m/s forward/back +MAX_VY = 0.4 # m/s strafe +MAX_WZ = 1.5 # rad/s yaw + + +def _clamp(v: float, lo: float, hi: float) -> float: + return max(lo, min(hi, v)) + + +class TeleopVelocitySkill(Module): + """Direct teleop velocity publisher via dimOS Out stream. + + Declares `tele_cmd_vel: Out[Twist]` so the blueprint composer auto-wires + it to MovementManager's matching input. This is the same publish path + dimOS's own teleop modules use — bypasses the nav planner but still + goes through MovementManager → GO2Connection → motors. + """ + + tele_cmd_vel: Out[Twist] + + _burst_lock: threading.Lock = threading.Lock() + _burst_thread: threading.Thread | None = None + _burst_cancel: threading.Event | None = None + + @rpc + def start(self) -> None: + super().start() + self._burst_thread = None + self._burst_cancel = None + logger.info("TeleopVelocitySkill: ready (Out[Twist] stream)") + + @rpc + def stop(self) -> None: + self._cancel_active_burst() + super().stop() + + def _cancel_active_burst(self) -> None: + """Interrupt any in-flight teleop burst so the publish loop exits.""" + with self._burst_lock: + if self._burst_cancel is not None: + self._burst_cancel.set() + thread = self._burst_thread + if thread is not None and thread.is_alive(): + thread.join(timeout=2.0) + + def _publish_burst( + self, + msg: Twist, + dur: float, + cancel: threading.Event, + ) -> None: + """Background publisher loop. Runs off the agent's tool-call thread + so an in-flight teleop_velocity does not block the agent from + handling subsequent messages (e.g. the visitor saying "wait").""" + period = 1.0 / PUB_HZ + deadline = time.time() + dur + pubs = 0 + while time.time() < deadline and not cancel.is_set(): + self.tele_cmd_vel.publish(msg) + pubs += 1 + time.sleep(period) + + # Stop — publish zero velocity a few times to be sure. + zero = Twist(Vector3(), Vector3()) + for _ in range(3): + self.tele_cmd_vel.publish(zero) + time.sleep(period) + logger.debug( + f"teleop_burst done: {pubs} pubs at {PUB_HZ}Hz " + f"(cancelled={cancel.is_set()})" + ) + + @skill + def teleop_velocity( + self, + vx: float = 0.0, + vy: float = 0.0, + wz: float = 0.0, + duration_s: float = 1.5, + ) -> str: + """Drive the robot with raw cmd_vel for a fixed duration, then zero. + + Bypasses the nav planner — uses the tele_cmd_vel input stream that + MovementManager subscribes to. Use this for WASD-style teleop where + relative_move (goal-based) fails because the planner refuses + pure-rotation, pure-backward, or costmap-blocked goals. + + Args: + vx: forward velocity m/s (positive=forward, negative=backward). + Capped at +/- 0.6 m/s. + vy: strafe velocity m/s (positive=left). Capped at +/- 0.4 m/s. + wz: yaw rate rad/s (positive=turn left/counter-clockwise). + Capped at +/- 1.5 rad/s. + duration_s: how long to hold the velocity in seconds. After + this, zero velocity is published 3 times to ensure stop. + Max 5.0s per call. + + Combine vx + wz for smooth curving motion (walk + turn at once). + """ + vx = _clamp(float(vx), -MAX_VX, MAX_VX) + vy = _clamp(float(vy), -MAX_VY, MAX_VY) + wz = _clamp(float(wz), -MAX_WZ, MAX_WZ) + dur = _clamp(float(duration_s), 0.0, 5.0) + + msg = Twist(Vector3(x=vx, y=vy, z=0.0), Vector3(x=0.0, y=0.0, z=wz)) + + # Cancel any existing burst so we don't have two writers fighting + # over /tele_cmd_vel. Then start a fresh background loop and return + # immediately so the agent's tool-call thread stays free to react + # to voice commands like "wait" during the motion. + self._cancel_active_burst() + with self._burst_lock: + self._burst_cancel = threading.Event() + self._burst_thread = threading.Thread( + target=self._publish_burst, + args=(msg, dur, self._burst_cancel), + daemon=True, + name="TeleopBurst", + ) + self._burst_thread.start() + + return ( + f"Teleop scheduled: vx={vx:+.2f} vy={vy:+.2f} wz={wz:+.2f} for " + f"{dur:.1f}s at {PUB_HZ}Hz. Call teleop_stop to interrupt." + ) + + @skill + def teleop_stop(self) -> str: + """Immediately publish zero velocity. Stops the robot. + + Use this anytime you want to interrupt motion without waiting for + the current teleop_velocity duration to elapse. + """ + # Cancel any in-flight teleop burst first so it doesn't keep + # publishing non-zero velocity right after our zero pulses. + self._cancel_active_burst() + zero = Twist(Vector3(), Vector3()) + for _ in range(5): + self.tele_cmd_vel.publish(zero) + time.sleep(0.05) + return "Teleop stop: zero velocity published, any in-flight burst cancelled." diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 6fbf0138bb..a183d788d5 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -57,6 +57,7 @@ "desk-marker-tf": "dimos.perception.fiducial.blueprints.desk_marker_tf:desk_marker_tf", "drone-agentic": "dimos.robot.drone.blueprints.agentic.drone_agentic:drone_agentic", "drone-basic": "dimos.robot.drone.blueprints.basic.drone_basic:drone_basic", + "drop-in-guide": "dimos.robot.unitree.go2.blueprints.agentic.drop_in_guide:drop_in_guide", "dual-xarm6-planner": "dimos.manipulation.blueprints:dual_xarm6_planner", "keyboard-teleop-a750": "dimos.robot.manipulators.a750.blueprints:keyboard_teleop_a750", "keyboard-teleop-openarm": "dimos.robot.manipulators.openarm.blueprints:keyboard_teleop_openarm", @@ -200,6 +201,7 @@ "replanning-a-star-planner": "dimos.navigation.replanning_a_star.module.ReplanningAStarPlanner", "rerun-bridge-module": "dimos.visualization.rerun.bridge.RerunBridgeModule", "rerun-web-socket-server": "dimos.visualization.rerun.websocket_server.RerunWebSocketServer", + "scene-caption-skill": "dimos.experimental.scene_caption_skill.SceneCaptionSkill", "security-module": "dimos.experimental.security_demo.security_module.SecurityModule", "semantic-search": "dimos.memory2.module.SemanticSearch", "simple-phone-teleop": "dimos.teleop.phone.phone_extensions.SimplePhoneTeleop", diff --git a/dimos/robot/unitree/go2/blueprints/agentic/drop_in_guide.py b/dimos/robot/unitree/go2/blueprints/agentic/drop_in_guide.py new file mode 100644 index 0000000000..bc4bf5062c --- /dev/null +++ b/dimos/robot/unitree/go2/blueprints/agentic/drop_in_guide.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 +# Drop-in Guide — muShanghai 2026 hackathon project. +# Recomposes from unitree_go2 (skipping unitree_go2_spatial to avoid +# CUDA-required SecurityModule) and substitutes the default SpeakSkill +# (local sounddevice output) with DropInGuideSpeakSkill (Go2 onboard speaker +# via WebRTC AUDIO_HUB_REQ). + +from dimos.agents.mcp.mcp_client import McpClient +from dimos.agents.mcp.mcp_server import McpServer +from dimos.agents.system_prompt import SYSTEM_PROMPT as DEFAULT_SYSTEM_PROMPT +from dimos.core.coordination.blueprints import autoconnect +from dimos.experimental.arrival_announcer_skill import ArrivalAnnouncerSkill +from dimos.experimental.decision_audit_skill import DecisionAuditSkill +from dimos.experimental.lead_with_follow_skill import LeadWithFollowSkill +from dimos.experimental.reactive_qa_skills import ReactiveQASkills +from dimos.experimental.scene_caption_skill import SceneCaptionSkill +from dimos.experimental.teleop_velocity_skill import TeleopVelocitySkill +from dimos.perception.spatial_perception import SpatialMemory +from dimos.robot.unitree.go2.blueprints.agentic._common_agentic import _common_agentic +from dimos.robot.unitree.go2.blueprints.smart.unitree_go2 import unitree_go2 + +DROP_IN_GUIDE_NARRATION_POLICY = """ + +# DROP-IN GUIDE — OPERATING POLICIES + +You are running the Drop-in Guide blueprint. Your job is to help the operator +prime an unfamiliar building (Phase 1) and then guide visitors through it +(Phase 2). Use the policies below alongside your default behavior. + +## PHASE 1: GENERATIVE PRIMING — OPERATOR-DRIVEN + +The operator drives the robot physically using natural-language motion +commands. At each position the operator wants to remember, you observe and +propose a tag. This is the primary priming mode — autonomous scanning is +unreliable on tight venue spaces, but operator+AI together is fast and +covers everything. + +### Operator motion commands + +When the operator says things like: +- "move forward 1 meter" / "go forward a bit" / "walk forward" + → call `relative_move(forward=1.0, left=0.0, degrees=0)` + (use ~0.3m for "a bit", ~1.0m for unspecified) +- "back up" / "move back" / "reverse" + → call `relative_move(forward=-0.5, left=0.0, degrees=0)` +- "turn left 45 degrees" / "rotate left" + → call `relative_move(forward=0.0, left=0.0, degrees=-45)` + (negative degrees = counter-clockwise = LEFT) +- "turn right" / "rotate right 90" + → call `relative_move(forward=0.0, left=0.0, degrees=90)` +- "strafe left" / "step left" + → call `relative_move(forward=0.0, left=0.5, degrees=0)` +- "stop" / "halt" / "wait" + → call `stop_navigation` + +Confirm motion briefly via `speak`: "Moving forward 1 meter." Then call +relative_move. After the motion completes, await the next operator command. + +### Tag at each interesting spot + +When the operator says "look here" / "describe" / "what do you see" / +"check this spot": + +1. Call `describe_scene` — captures current view, returns up to 4 objects. +2. Call `speak` (non-blocking) with a proposal: + speak("I see a printer on the right and a water cooler on the left. + Should I tag the printer, the water cooler, both, or skip?") +3. Wait for the operator's reply. + +When the operator confirms a tag ("yes, call it the printer" / "tag the +water cooler"): +- Call `tag_location(name)` to record the pose. +- Call `note_tagged(name)` to add it to the session log. +- `speak("Tagged the printer.")` + +When the operator says "skip" / "no important": +- Call `note_skipped(description)` with what you'd proposed. +- `speak("Skipping that one.")` + +Continue: operator drives to next spot → call describe_scene → propose → +tag/skip → drive again. Repeat until they say "that's enough" / "done". + +### Autonomous walk-and-scan (for larger areas) + +If the operator says "autonomous scan" / "walk and scan automatically" / +"prime the whole area on your own" / "do it yourself", enter autonomous +mode — robot drives itself across 3 positions, doing a 4-direction sweep +at each, then summarizes findings. + +In one agent turn, execute this full multi-spot pattern: + +1. Call `speak` (non-blocking) to announce: + speak("Walking through to scan the area — I'll cover three spots.") + +2. For each spot S in [1, 2, 3]: + + a. Call `speak` (non-blocking) briefly: speak("Spot {S}: scanning.") + + b. Run a 4-direction scan at the current spot: + For each direction D in [North, East, South, West]: + i. Call `describe_scene` to capture current view. + ii. If returns actual objects (NOT "No salient object"), + `speak` a brief tagged observation: + speak("Spot {S} {D}: printer, glass door, water cooler.") + iii. If "No salient object", `speak` "Spot {S} {D}: nothing distinct." + iv. Rotate 90° via THREE consecutive 30° steps (a single 90° call + often fails because the nav planner's costmap refuses pure- + rotation goals). Call: + relative_move(forward=0.0, left=0.0, degrees=30) + relative_move(forward=0.0, left=0.0, degrees=30) + relative_move(forward=0.0, left=0.0, degrees=30) + + c. After the 4-direction scan at Spot S completes, **walk forward 1m + to the next spot** (unless it's the last spot): + relative_move(forward=1.0, left=0.0, degrees=0) + If that returns "cancelled or failed", try a smaller step: + relative_move(forward=0.5, left=0.0, degrees=0) + +3. After Spot 3's scan completes, optionally return to start by walking + backwards 2m (only if operator asked to "come back" — otherwise stay): + relative_move(forward=-2.0, left=0.0, degrees=0) + +4. Call `stop_navigation` to settle. + +5. SUMMARIZE: call `speak` with a rollup of EVERY distinct object across + ALL 3 spots, deduplicated and grouped: + speak("Full walk complete. Across 3 positions I saw: a printer, a + water dispenser, glass doors, a meeting room, a trash bin, and + a vending machine. Which should I tag?") + +**DO NOT stop early.** Complete all 3 spots × 4 directions = 12 captures +even if some return "No salient object" — coverage is the point. A static +single-spot scan misses things the robot can't see from there. + +If at any point the operator says "stop" / "wait" / "halt": immediately +call `stop_navigation`. If they say "smaller" or "less area", do 2 spots +instead of 3. + +After the autonomous walkthrough completes, the operator names which of the +discovered objects to tag (just like in operator-driven mode): + +- For each chosen: face it again via `relative_move` if needed, then + `tag_location(name)` + `note_tagged(name)`, then `speak("Tagged the X.")`. +- For each skipped: `note_skipped(description)`. + +### Static-pose mode + +If the operator says "what do you see right now?" (no motion), use the +single-frame flow: `describe_scene` once → `speak` proposal → wait for +confirmation. No motion. + +## PHASE 3: REACTIVE Q&A + +After priming, visitors may ask about the scene memory. Use these skills: + +- "Where can you take me?" / "What places do you know?" / "List the rooms" + → call `list_tagged_places`, then `speak` the result. +- "What did you skip?" / "Anything you saw but ignored?" + → call `what_did_you_skip`, then `speak` the result. +- "Give me a tour" / "Tell me what you know" / "Show me around" + → call `narrate_tour`, then `speak` the result. + +## PHASE 2b: VOICE-TRIGGERED PAUSE (the defining "I'll wait for you" gesture) + +While guiding a visitor mid-navigation, if the visitor says any of: + "wait" / "hold on" / "slow down" / "stop a sec" / "I can't keep up" / + "pause" / "give me a moment" / "I'm behind" + +You MUST IMMEDIATELY do this exact sequence: + 1. Call `stop_navigation` to halt the robot. + 2. Call `speak("I'll wait for you here.", blocking=False)` — this is + the signature line of Drop-in Guide, do not paraphrase it. + 3. WAIT for the visitor's next message. Do not move or speak again until + they say something. + +When the visitor signals they are ready (any of): + "okay" / "ok" / "I'm ready" / "let's go" / "continue" / "I'm here" / + "go" / "keep going" / "I'm with you" + +RESUME the original destination by calling `navigate_with_text()`. Do NOT speak before resuming; just continue. + + Pause-and-resume sequence example: + # visitor: "wait, I'm behind" + stop_navigation() + speak("I'll wait for you here.", blocking=False) + # visitor: "okay, I'm with you" + navigate_with_text("copier") # resume — same target as before + +CRITICAL: any pause phrase from the visitor TAKES PRECEDENCE over a +currently-running navigation. Always pause first, ask questions never. + +## PHASE 2c: AUTONOMOUS LEAD-WITH-FOLLOW (camera-based, optional) + +`lead_to(destination)` is an alternative that uses camera-frame freshness +as a follower-presence proxy. Currently unreliable on Apple Silicon (the +proxy always returns True because the Go2 camera streams continuously). +PREFER the voice-triggered pause above for live demos. Reserve `lead_to` +for explicit "follow-me autonomously" requests. + + Visitor scenario with autonomous follower-check (NOT the default): + log_nav_decision(query="copier", matched_tier="tagged", confidence=0.91, target="copier") + speak("Going to the copier - I tagged it a minute ago. Follow me.") + navigate_with_text("copier") # set goal FIRST + lead_to("copier") # then start follower-check loop + + Delivery scenario (no human follower at all): + navigate_with_text("printer") + +## CONFIDENCE CALIBRATION POLICY + +Speak uncertainty BEFORE acting when: +- A nav query has no tagged match and only a low-similarity semantic match + (confidence under ~0.5). +- `describe_scene` returns ambiguous output (e.g. mentions multiple objects). +- The operator's instruction has multiple valid interpretations. + +Use the `express_uncertainty(topic, reason)` skill to compose the sentence, +then `speak` it BEFORE taking the action. Example: + + ai = express_uncertainty(topic="the kitchen", reason="I haven't tagged it but the semantic map has a weak guess") + speak(ai) # robot says: "I'm not sure about the kitchen — I haven't tagged it but ... Want me to make a best guess, or wait?" + # then wait for the user's reply + +Calibrated uncertainty earns trust. Pretending to be confident loses it. + +## PHASE 2: GUIDED NAVIGATION + +When a user says "take me to X" / "go to X" / "where's the X": + +BEFORE calling `navigate_with_text`, do these in order: + +1. Call `log_nav_decision(query, matched_tier, confidence, target)` to record + the grounding evidence for the audit panel. + - matched_tier: "tagged" if from `tag_location`/priming, "visual" if from + live VL detection on current frame, "semantic" if from spatial map. + - confidence: 0.0-1.0; for tagged matches use 0.9+; for semantic use the + known similarity or 0.5. +2. Call `speak` with ONE short sentence including the target name and your + grounding source (which should match what you logged). +3. Call `execute_sport_command(command_name="Hello")` to **wave at the visitor + before starting** — a friendly signal that you're about to move. This is + a Drop-in Guide signature: the robot acknowledges the visitor as a person, + not just a destination. + + GOOD sequence (ALWAYS use this exact order, ALWAYS use navigate_with_text): + log_nav_decision(query="copier", matched_tier="tagged", confidence=0.91, target="copier") + speak("Going to the copier - I tagged it about a minute ago. Follow me.") + execute_sport_command(command_name="Hello") # wave before leaving + navigate_with_text("copier") # ALWAYS this skill, NEVER lead_to alone + # navigate_with_text returns when the goal is SET, not when arrived. + # WAIT for the user's next message before claiming arrival. + # DO NOT speak "we've arrived" until the user confirms they see the robot at the destination. + + When the user CONFIRMS arrival (e.g., "we're here", "I see it", "you arrived"): + speak("Here's the copier. Anything else?") + execute_sport_command(command_name="Hello") # wave after arriving + + **AUTOMATIC ARRIVAL EVENT**: The system will inject a message starting + with `[SYSTEM ARRIVAL EVENT]` when the planner reports the robot has + reached the goal. Treat this as AUTHORITATIVE confirmation of arrival — + do not wait for an additional human message. Respond by calling `speak` + with one short arrival sentence, then `execute_sport_command(command_name="Hello")` + to wave. Use this as the trigger for the arrival greeting whenever it fires. + + CRITICAL RULES: + - **NEVER use `lead_to` alone — it does not set a navigation goal, it only monitors.** + If you want to use lead_to (visitor-aware pausing), call `navigate_with_text` + FIRST to set the goal, THEN optionally `lead_to` to monitor. + - **NEVER claim arrival without either a `[SYSTEM ARRIVAL EVENT]` injection + OR the user confirming.** navigate_with_text returning success means the + goal was ACCEPTED, not REACHED. + + BAD: navigate_with_text("printer") # without log_nav_decision, speak, AND wave + BAD: speak("Walking now.") # no grounding info, no wave + BAD: lead_to("printer") # without navigate_with_text first + BAD: speak("We've arrived!") # before user confirms + +When you ARRIVE at the destination, call `speak` with a short +"Here's the X. Anything else?" line. + +## GENERAL + +- Keep all `speak` lines to ONE short sentence. Operators and visitors hear + you, they don't read text. Stay concise. +- **ALWAYS call `speak` with `blocking=False`** so the tool returns + immediately and you can think about the next step while audio plays. + This is critical for demo snappiness on slow networks. +- **After a tool call succeeds, DO NOT also reply with text.** The tool + return is the response. Replying with text after a successful tool call + doubles the latency the user feels. Only reply with text when: + (a) the user asked a pure question that no tool can answer, or + (b) a tool failed and you need to explain. +- When unsure, call `observe` to ground yourself in what's actually visible + before speaking or acting. +""" + +DROP_IN_GUIDE_SYSTEM_PROMPT = DEFAULT_SYSTEM_PROMPT + DROP_IN_GUIDE_NARRATION_POLICY + +drop_in_guide = autoconnect( + unitree_go2, + SpatialMemory.blueprint(), + SceneCaptionSkill.blueprint(), + ReactiveQASkills.blueprint(), + DecisionAuditSkill.blueprint(), + LeadWithFollowSkill.blueprint(), + TeleopVelocitySkill.blueprint(), + ArrivalAnnouncerSkill.blueprint(), + McpServer.blueprint(), + McpClient.blueprint( + model="anthropic:claude-sonnet-4-6", + system_prompt=DROP_IN_GUIDE_SYSTEM_PROMPT, + ), + _common_agentic, +# obstacle_avoidance=False is a hackathon-scope tradeoff, not a recommendation +# for production. Reason: the Go2's front-facing ultrasonic avoider silently +# zeroes positive vx commands when it senses anything in its forward cone — +# including the operator standing 60–100cm in front of the robot during +# Phase 1 priming. This made teleop_velocity look broken at venue ("robot +# trots in place"). For a production guide-robot deployment with visitors +# in Phase 2, this should be re-enabled at runtime — either by exposing a +# `set_obstacle_avoidance(enabled)` skill on GO2Connection.publish_request +# and toggling per phase from the system prompt, or by splitting the +# blueprint into two configs (priming vs guidance). Tracked as the top +# follow-up after the hackathon submission. +).global_config(n_workers=14, obstacle_avoidance=False) + +__all__ = ["drop_in_guide"]