diff --git a/dimos/agents/skills/browser_notification_skill.py b/dimos/agents/skills/browser_notification_skill.py new file mode 100644 index 0000000000..42a70b3ccf --- /dev/null +++ b/dimos/agents/skills/browser_notification_skill.py @@ -0,0 +1,409 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import asyncio +from queue import Queue +import threading +import time +from typing import Any + +from fastapi import WebSocket, WebSocketDisconnect +from fastapi.responses import HTMLResponse, JSONResponse + +from dimos.agents.annotation import skill +from dimos.agents.skills.speak_skill_spec import SpeakSkillSpec +from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT +from dimos.core.core import rpc +from dimos.core.module import Module, ModuleConfig +from dimos.utils.logging_config import setup_logger +from dimos.utils.path_utils import get_project_root +from dimos.web.robot_web_interface import RobotWebInterface + +logger = setup_logger() + + +class BrowserNotificationConfig(ModuleConfig): + server_port: int = 8450 + + +AlertPayload = dict[str, Any] +ClientQueue = Queue[AlertPayload | None] + + +class BrowserNotificationSkill(Module): + """Expose a browser notification page and a skill for robot-triggered alerts.""" + + config: BrowserNotificationConfig + _speaker: SpeakSkillSpec | None = None + + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._lock = threading.RLock() + self._latest_alert: AlertPayload | None = None + self._next_alert_id = 0 + self._client_queues: set[ClientQueue] = set() + self._web_server = RobotWebInterface(port=self.config.server_port) + self._web_server_thread: threading.Thread | None = None + self._setup_routes() + + @rpc + def start(self) -> None: + super().start() + self._start_server() + + @rpc + def stop(self) -> None: + self._stop_server() + super().stop() + + @skill + def notify_user( + self, + title: str, + message: str, + urgency: str = "normal", + sound: bool = True, + vibrate: bool = True, + speak_text: str = "", + ) -> str: + """Send an alert to connected browser clients. + + Args: + title: Short alert title. + message: Alert body shown in the browser. + urgency: One of "low", "normal", or "high". Unknown values become "normal". + sound: Whether the browser should play an alert tone. + vibrate: Whether the browser should request phone vibration. + speak_text: Optional text for the robot to speak at the same time. + """ + + normalized_urgency = self._normalize_urgency(urgency) + alert = self._make_alert( + title=title, + message=message, + urgency=normalized_urgency, + sound=sound, + vibrate=vibrate, + ) + client_count = self._publish_alert(alert) + + speech_status = "" + if speak_text: + speech_status = self._speak(speak_text) + + result = ( + f"Queued {normalized_urgency} browser notification '{title}' " + f"for {client_count} connected client(s)." + ) + if speech_status: + result += f" {speech_status}" + return result + + @staticmethod + def _normalize_urgency(urgency: str) -> str: + normalized = urgency.lower().strip() + if normalized not in {"low", "normal", "high"}: + return "normal" + return normalized + + def _make_alert( + self, + title: str, + message: str, + urgency: str, + sound: bool, + vibrate: bool, + ) -> AlertPayload: + with self._lock: + self._next_alert_id += 1 + alert_id = self._next_alert_id + + return { + "id": alert_id, + "title": title, + "message": message, + "urgency": urgency, + "sound": bool(sound), + "vibrate": bool(vibrate), + "ts": time.time(), + } + + def _publish_alert(self, alert: AlertPayload) -> int: + with self._lock: + self._latest_alert = alert + queues = list(self._client_queues) + + for client_queue in queues: + client_queue.put(alert) + return len(queues) + + def _speak(self, text: str) -> str: + if self._speaker is None: + return "Speech skipped because SpeakSkill is not connected." + try: + return self._speaker.speak(text, blocking=False) + except Exception: + logger.exception("Failed to speak browser notification text") + return "Speech failed." + + def _setup_routes(self) -> None: + @self._web_server.app.get("/notify", response_class=HTMLResponse) + async def notify_index() -> HTMLResponse: + return HTMLResponse(content=_NOTIFY_HTML) + + @self._web_server.app.get("/notify/latest") + async def latest_alert() -> JSONResponse: + with self._lock: + alert = self._latest_alert + return JSONResponse(alert or {"id": 0}) + + @self._web_server.app.websocket("/notify/ws") + async def notify_ws(ws: WebSocket) -> None: + await ws.accept() + client_queue: ClientQueue = Queue() + with self._lock: + self._client_queues.add(client_queue) + latest = self._latest_alert + + try: + if latest is not None: + await ws.send_json(latest) + while True: + alert = await asyncio.to_thread(client_queue.get) + if alert is None: + break + await ws.send_json(alert) + except WebSocketDisconnect: + logger.info("Browser notification client disconnected") + finally: + with self._lock: + self._client_queues.discard(client_queue) + + def _start_server(self) -> None: + if self._web_server_thread is not None and self._web_server_thread.is_alive(): + logger.warning("Browser notification web server already running") + return + + self._web_server_thread = threading.Thread( + target=self._web_server.run, + kwargs={ + "ssl": True, + "ssl_certs_dir": get_project_root() / "assets" / "teleop_certs", + }, + daemon=True, + name="BrowserNotificationWebServer", + ) + self._web_server_thread.start() + logger.info( + "Browser notification page started", + url=f"https://0.0.0.0:{self.config.server_port}/notify", + ) + + def _stop_server(self) -> None: + with self._lock: + queues = list(self._client_queues) + self._client_queues.clear() + + for client_queue in queues: + client_queue.put(None) + + self._web_server.shutdown() + if self._web_server_thread is not None: + self._web_server_thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) + self._web_server_thread = None + + +_NOTIFY_HTML = """ + + + + + DimOS Alerts + + + +
+
Disconnected
+
+
DimOS
+
Tap enable, then wait for robot alerts.
+
+ +
+ + + +""" + + +__all__ = ["BrowserNotificationConfig", "BrowserNotificationSkill"] diff --git a/dimos/agents/skills/test_browser_notification_skill.py b/dimos/agents/skills/test_browser_notification_skill.py new file mode 100644 index 0000000000..8a341fd633 --- /dev/null +++ b/dimos/agents/skills/test_browser_notification_skill.py @@ -0,0 +1,59 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from queue import Queue + +from dimos.agents.skills.browser_notification_skill import BrowserNotificationSkill + + +def test_notify_user_stores_and_broadcasts_alert() -> None: + notifier = BrowserNotificationSkill() + client_queue = Queue() + with notifier._lock: + notifier._client_queues.add(client_queue) + + try: + result = notifier.notify_user( + title="Stop", + message="Red traffic light detected", + urgency="high", + sound=True, + vibrate=True, + ) + + alert = client_queue.get_nowait() + assert alert["id"] == 1 + assert alert["title"] == "Stop" + assert alert["message"] == "Red traffic light detected" + assert alert["urgency"] == "high" + assert alert["sound"] is True + assert alert["vibrate"] is True + assert notifier._latest_alert == alert + assert "1 connected client" in result + finally: + notifier.stop() + + +def test_notify_user_normalizes_unknown_urgency() -> None: + notifier = BrowserNotificationSkill() + try: + notifier.notify_user( + title="Go", + message="Green traffic light detected", + urgency="unexpected", + ) + assert notifier._latest_alert is not None + assert notifier._latest_alert["urgency"] == "normal" + finally: + notifier.stop() diff --git a/dimos/perception/visual_events/__init__.py b/dimos/perception/visual_events/__init__.py new file mode 100644 index 0000000000..9d1e8c73fc --- /dev/null +++ b/dimos/perception/visual_events/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Reusable visual-event detectors.""" + diff --git a/dimos/perception/visual_events/test_traffic_light_color_detector.py b/dimos/perception/visual_events/test_traffic_light_color_detector.py new file mode 100644 index 0000000000..391bd9b40f --- /dev/null +++ b/dimos/perception/visual_events/test_traffic_light_color_detector.py @@ -0,0 +1,106 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cv2 +import numpy as np + +from dimos.perception.visual_events.traffic_light import ( + StableStateDebouncer, + TrafficLightColorDetector, +) + + +def _circle_bgr(color: tuple[int, int, int], radius: int = 100) -> np.ndarray: + image = np.zeros((320, 320, 3), dtype=np.uint8) + cv2.circle(image, (160, 160), radius, color, thickness=-1) + return image + + +def test_detects_red_circle() -> None: + detector = TrafficLightColorDetector(min_area_ratio=0.01) + result = detector.classify(_circle_bgr((0, 0, 255)), color_space="BGR") + assert result.state == "red" + assert result.confidence > 0.9 + assert result.red_score > result.green_score + + +def test_detects_yellow_circle() -> None: + detector = TrafficLightColorDetector(min_area_ratio=0.01) + result = detector.classify(_circle_bgr((0, 255, 255)), color_space="BGR") + assert result.state == "yellow" + assert result.confidence > 0.9 + assert result.yellow_score > result.red_score + + +def test_detects_green_circle() -> None: + detector = TrafficLightColorDetector(min_area_ratio=0.01) + result = detector.classify(_circle_bgr((0, 255, 0)), color_space="BGR") + assert result.state == "green" + assert result.confidence > 0.9 + assert result.green_score > result.red_score + + +def test_unknown_for_dark_or_invalid_frames() -> None: + detector = TrafficLightColorDetector(min_area_ratio=0.01) + assert detector.classify(np.zeros((320, 320, 3), dtype=np.uint8)).state == "unknown" + assert detector.classify(np.zeros((320, 320), dtype=np.uint8)).state == "unknown" + assert detector.classify(np.zeros((0, 0, 3), dtype=np.uint8)).state == "unknown" + + +def test_roi_ignores_colored_signal_outside_crop() -> None: + detector = TrafficLightColorDetector(min_area_ratio=0.01) + image = np.zeros((320, 320, 3), dtype=np.uint8) + cv2.circle(image, (40, 40), 35, (0, 0, 255), thickness=-1) + + full_frame = detector.classify(image, color_space="BGR") + cropped = detector.classify(image, color_space="BGR", roi=(0.35, 0.35, 0.95, 0.95)) + + assert full_frame.state == "red" + assert cropped.state == "unknown" + + +def test_raises_for_unsupported_color_space() -> None: + detector = TrafficLightColorDetector() + image = _circle_bgr((0, 0, 255)) + + try: + detector.classify(image, color_space="HSV") # type: ignore[arg-type] + except ValueError as exc: + assert "Unsupported color_space" in str(exc) + else: + raise AssertionError("Expected ValueError for unsupported color_space") + + +def test_debouncer_emits_after_stable_frames() -> None: + debouncer = StableStateDebouncer(stable_frames=3, cooldown_s=0.0) + + assert debouncer.update("red", now=1.0) is None + assert debouncer.update("red", now=2.0) is None + assert debouncer.update("red", now=3.0) == "red" + assert debouncer.update("red", now=4.0) is None + + assert debouncer.update("green", now=5.0) is None + assert debouncer.update("green", now=6.0) is None + assert debouncer.update("green", now=7.0) == "green" + + +def test_debouncer_resets_on_unknown_and_honors_cooldown() -> None: + debouncer = StableStateDebouncer(stable_frames=2, cooldown_s=10.0) + + assert debouncer.update("red", now=1.0) is None + assert debouncer.update("unknown", now=2.0) is None + assert debouncer.update("red", now=3.0) is None + assert debouncer.update("red", now=4.0) is None + assert debouncer.update("red", now=14.1) == "red" + diff --git a/dimos/perception/visual_events/traffic_light.py b/dimos/perception/visual_events/traffic_light.py new file mode 100644 index 0000000000..248940134e --- /dev/null +++ b/dimos/perception/visual_events/traffic_light.py @@ -0,0 +1,265 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from dataclasses import dataclass +import time +from typing import Literal + +import cv2 +import numpy as np + +TrafficLightState = Literal["red", "yellow", "green", "unknown"] + + +@dataclass(frozen=True) +class TrafficLightDetection: + """Result from a lightweight color-state visual event detector.""" + + state: TrafficLightState + confidence: float + red_score: float + yellow_score: float + green_score: float + area_ratio: float + ts: float + + +class TrafficLightColorDetector: + """Classify large red, yellow, or green visual signals with HSV masks. + + This detector is intended for prototype visual-event triggers such as demo + traffic signals, facility status lights, warning signs, and companion robot + alerts. It is not a production traffic-safety perception system. + """ + + def __init__( + self, + min_area_ratio: float = 0.015, + min_saturation: int = 80, + min_value: int = 80, + blur_kernel: int = 5, + ) -> None: + self.min_area_ratio = float(min_area_ratio) + self.min_saturation = int(min_saturation) + self.min_value = int(min_value) + self.blur_kernel = int(blur_kernel) + + def classify( + self, + frame: np.ndarray, + *, + color_space: Literal["RGB", "BGR"] = "RGB", + roi: tuple[float, float, float, float] | None = None, + ) -> TrafficLightDetection: + """Classify an image as red, yellow, green, or unknown. + + Args: + frame: HxWx3 uint8 image. + color_space: Whether the input frame is RGB or BGR. + roi: Optional normalized crop as (x1, y1, x2, y2), each in [0, 1]. + """ + + if frame is None or frame.size == 0: + return self._unknown() + if frame.ndim != 3 or frame.shape[2] < 3: + return self._unknown() + + image = frame[:, :, :3] + if color_space == "RGB": + bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) + elif color_space == "BGR": + bgr = image + else: + raise ValueError(f"Unsupported color_space: {color_space}") + + bgr = self._crop_normalized(bgr, roi) + if bgr.size == 0: + return self._unknown() + + if self.blur_kernel > 1: + kernel_size = self.blur_kernel if self.blur_kernel % 2 == 1 else self.blur_kernel + 1 + bgr = cv2.GaussianBlur(bgr, (kernel_size, kernel_size), 0) + + hsv = cv2.cvtColor(bgr, cv2.COLOR_BGR2HSV) + + red_mask_1 = cv2.inRange( + hsv, + np.array([0, self.min_saturation, self.min_value]), + np.array([10, 255, 255]), + ) + red_mask_2 = cv2.inRange( + hsv, + np.array([170, self.min_saturation, self.min_value]), + np.array([179, 255, 255]), + ) + red_score = self._mask_area_ratio(cv2.bitwise_or(red_mask_1, red_mask_2)) + + yellow_score = self._mask_area_ratio( + cv2.inRange( + hsv, + np.array([18, self.min_saturation, self.min_value]), + np.array([34, 255, 255]), + ) + ) + green_score = self._mask_area_ratio( + cv2.inRange( + hsv, + np.array([35, self.min_saturation, self.min_value]), + np.array([90, 255, 255]), + ) + ) + + scores: dict[TrafficLightState, float] = { + "red": red_score, + "yellow": yellow_score, + "green": green_score, + } + best_state = max(scores, key=scores.__getitem__) + best_score = scores[best_state] + + if best_score < self.min_area_ratio: + return TrafficLightDetection( + state="unknown", + confidence=0.0, + red_score=red_score, + yellow_score=yellow_score, + green_score=green_score, + area_ratio=best_score, + ts=time.time(), + ) + + total_score = red_score + yellow_score + green_score + confidence = best_score / max(total_score, 1e-9) + return TrafficLightDetection( + state=best_state, + confidence=float(min(max(confidence, 0.0), 1.0)), + red_score=red_score, + yellow_score=yellow_score, + green_score=green_score, + area_ratio=best_score, + ts=time.time(), + ) + + def draw_debug_overlay( + self, + frame: np.ndarray, + detection: TrafficLightDetection, + *, + color_space: Literal["RGB", "BGR"] = "RGB", + ) -> np.ndarray: + """Return a BGR debug frame with state and score overlays.""" + + if color_space == "RGB": + out = cv2.cvtColor(frame[:, :, :3], cv2.COLOR_RGB2BGR) + else: + out = frame[:, :, :3].copy() + + cv2.putText( + out, + f"{detection.state.upper()} conf={detection.confidence:.2f} area={detection.area_ratio:.3f}", + (20, 40), + cv2.FONT_HERSHEY_SIMPLEX, + 0.9, + (255, 255, 255), + 2, + cv2.LINE_AA, + ) + cv2.putText( + out, + f"R={detection.red_score:.3f} Y={detection.yellow_score:.3f} G={detection.green_score:.3f}", + (20, 75), + cv2.FONT_HERSHEY_SIMPLEX, + 0.7, + (255, 255, 255), + 2, + cv2.LINE_AA, + ) + return out + + @staticmethod + def _crop_normalized( + frame: np.ndarray, + roi: tuple[float, float, float, float] | None, + ) -> np.ndarray: + if roi is None: + return frame + + h, w = frame.shape[:2] + x1, y1, x2, y2 = roi + x1_i = max(0, min(w - 1, int(x1 * w))) + y1_i = max(0, min(h - 1, int(y1 * h))) + x2_i = max(x1_i + 1, min(w, int(x2 * w))) + y2_i = max(y1_i + 1, min(h, int(y2 * h))) + return frame[y1_i:y2_i, x1_i:x2_i] + + @staticmethod + def _mask_area_ratio(mask: np.ndarray) -> float: + kernel = np.ones((5, 5), np.uint8) + clean = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel) + clean = cv2.morphologyEx(clean, cv2.MORPH_CLOSE, kernel) + return float(cv2.countNonZero(clean)) / float(mask.shape[0] * mask.shape[1]) + + @staticmethod + def _unknown() -> TrafficLightDetection: + return TrafficLightDetection( + state="unknown", + confidence=0.0, + red_score=0.0, + yellow_score=0.0, + green_score=0.0, + area_ratio=0.0, + ts=time.time(), + ) + + +class StableStateDebouncer: + """Emit a visual event only after repeated stable frames.""" + + def __init__(self, stable_frames: int = 3, cooldown_s: float = 2.0) -> None: + self.stable_frames = int(stable_frames) + self.cooldown_s = float(cooldown_s) + self._candidate: TrafficLightState = "unknown" + self._candidate_count = 0 + self._last_emitted: TrafficLightState = "unknown" + self._last_emit_ts = 0.0 + + def update(self, state: TrafficLightState, now: float | None = None) -> TrafficLightState | None: + """Return a newly stable state, or None if no event should fire.""" + + timestamp = time.time() if now is None else now + + if state == "unknown": + self._candidate = "unknown" + self._candidate_count = 0 + return None + + if state != self._candidate: + self._candidate = state + self._candidate_count = 1 + return None + + self._candidate_count += 1 + if self._candidate_count < self.stable_frames: + return None + if state == self._last_emitted: + return None + if timestamp - self._last_emit_ts < self.cooldown_s: + return None + + self._last_emitted = state + self._last_emit_ts = timestamp + return state + diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 6fbf0138bb..05ae00ff42 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -98,6 +98,7 @@ "unitree-go2-agentic-huggingface": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_agentic_huggingface:unitree_go2_agentic_huggingface", "unitree-go2-agentic-ollama": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_agentic_ollama:unitree_go2_agentic_ollama", "unitree-go2-basic": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_basic:unitree_go2_basic", + "unitree-go2-companion-alert": "dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_companion_alert:unitree_go2_companion_alert", "unitree-go2-coordinator": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_coordinator:unitree_go2_coordinator", "unitree-go2-detection": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2_detection:unitree_go2_detection", "unitree-go2-fleet": "dimos.robot.unitree.go2.blueprints.basic.unitree_go2_fleet:unitree_go2_fleet", @@ -128,6 +129,7 @@ "arm-teleop-module": "dimos.teleop.quest.quest_extensions.ArmTeleopModule", "b-box-navigation-module": "dimos.navigation.bbox_navigation.BBoxNavigationModule", "b1-connection-module": "dimos.robot.unitree.b1.connection.B1ConnectionModule", + "browser-notification-skill": "dimos.agents.skills.browser_notification_skill.BrowserNotificationSkill", "camera-module": "dimos.hardware.sensors.camera.module.CameraModule", "cartesian-motion-controller": "dimos.manipulation.control.servo_control.cartesian_motion_controller.CartesianMotionController", "control-coordinator": "dimos.control.coordinator.ControlCoordinator", diff --git a/dimos/robot/unitree/go2/blueprints/agentic/unitree_go2_companion_alert.py b/dimos/robot/unitree/go2/blueprints/agentic/unitree_go2_companion_alert.py new file mode 100644 index 0000000000..376ffac52d --- /dev/null +++ b/dimos/robot/unitree/go2/blueprints/agentic/unitree_go2_companion_alert.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dimos.agents.skills.browser_notification_skill import BrowserNotificationSkill +from dimos.agents.skills.speak_skill import SpeakSkill +from dimos.agents.web_human_input import WebInput +from dimos.core.coordination.blueprints import autoconnect +from dimos.robot.unitree.go2.blueprints.agentic.unitree_go2_agentic import unitree_go2_agentic + +unitree_go2_companion_alert = autoconnect( + unitree_go2_agentic, + BrowserNotificationSkill.blueprint(), +) +# The companion alert demo is driven by MCP and the phone alert page; disabling +# voice/web input avoids optional local audio dependencies for this focused stack. +unitree_go2_companion_alert = unitree_go2_companion_alert.disabled_modules(WebInput, SpeakSkill) + +__all__ = ["unitree_go2_companion_alert"] diff --git a/docs/examples/go2_visual_event_notifier.md b/docs/examples/go2_visual_event_notifier.md new file mode 100644 index 0000000000..75ab673773 --- /dev/null +++ b/docs/examples/go2_visual_event_notifier.md @@ -0,0 +1,152 @@ +# Go2 Visual Event Notifier + +This example turns robot perception into phone alerts. It watches a Go2 RGB camera +stream, detects stable red, yellow, or green visual states, and publishes alerts +through the local DimOS browser notification page and optionally through ntfy. + +The demo uses a traffic-light-style signal, but the reusable pattern is: + +```text +robot perception -> stable visual event -> external app action +``` + +This is a prototype visual-event trigger, not a production traffic-safety system. + +## Phone Setup + +Use the local browser alert page as the primary path. It does not require an API +key or internet connection. + +```bash +export LAPTOP_IP=$(ipconfig getifaddr en1) +[ -z "$LAPTOP_IP" ] && export LAPTOP_IP=$(ipconfig getifaddr en0) +echo "$LAPTOP_IP" +``` + +Open this on the phone after the demo script starts: + +```text +https://:8450/notify +``` + +Accept the certificate warning, tap **Enable alerts**, turn volume up, and keep +the page in the foreground. + +For an Android-native notification fallback, install the ntfy app and subscribe +to a hard-to-guess topic: + +```bash +export NTFY_TOPIC=go2-demo-$(uuidgen | tr '[:upper:]' '[:lower:]' | cut -c1-8) +echo "$NTFY_TOPIC" +curl \ + -H "Title: Go2 Test" \ + -H "Priority: urgent" \ + -H "Tags: rotating_light" \ + -d "Go2 notification test" \ + "https://ntfy.sh/$NTFY_TOPIC" +``` + +## Real Go2 Run + +Connect the laptop to the Go2 network, discover the robot, and verify the IP: + +```bash +uv run dimos go2tool discover +export ROBOT_IP= +ping -c 3 "$ROBOT_IP" +``` + +Start the demo: + +```bash +uv run python examples/go2_visual_event_notifier/go2_traffic_light_companion.py \ + --source go2 \ + --robot-ip "$ROBOT_IP" \ + --browser-alerts \ + --listen-host 0.0.0.0 \ + --browser-port 8450 \ + --ntfy-topic "$NTFY_TOPIC" \ + --stable-frames 3 \ + --cooldown-s 2 \ + --fps 5 \ + --log-file go2_visual_event_log.jsonl +``` + +Open the signal page and put it in front of the Go2 camera: + +```bash +open examples/go2_visual_event_notifier/demo_signals.html +``` + +Expected behavior: + +```text +GREEN -> normal "Go" phone alert +YELLOW -> high-priority "Caution" phone alert +RED -> urgent "Stop" phone alert +``` + +## Webcam Rehearsal + +Use a local webcam to test the detector and alert delivery before connecting to +real hardware: + +```bash +uv run python examples/go2_visual_event_notifier/go2_traffic_light_companion.py \ + --source webcam \ + --camera-index 0 \ + --browser-alerts \ + --listen-host 0.0.0.0 \ + --browser-port 8450 \ + --ntfy-topic "$NTFY_TOPIC" \ + --display \ + --run-seconds 60 +``` + +If the detector is too sensitive, crop the image and lower the area threshold: + +```bash +uv run python examples/go2_visual_event_notifier/go2_traffic_light_companion.py \ + --source go2 \ + --robot-ip "$ROBOT_IP" \ + --browser-alerts \ + --listen-host 0.0.0.0 \ + --roi 0.25,0.15,0.75,0.85 \ + --min-area-ratio 0.005 \ + --stable-frames 2 +``` + +## Recording Script + +Record a 90-second video with the Go2, terminal, and phone visible. + +```text +0-10s: +"We built a DimOS visual-event notifier. Go2 watches the world and turns stable +visual events into phone actions." + +10-25s: +Show the detector, demo script, browser notification skill, and tests. + +25-45s: +Show GREEN. Terminal prints GREEN event. Phone receives "Go". + +45-65s: +Show YELLOW. Terminal prints YELLOW event. Phone receives "Caution". + +65-82s: +Show RED. Terminal prints RED event. Phone receives urgent "Stop". + +82-90s: +"The demo is a traffic-light companion prototype, but the same DimOS pattern +generalizes to factory status lights, package alerts, warning signs, and patrol +events." +``` + +## Notes + +- No API key, LLM agent, robot speech, navigation, or watch SDK is required. +- Keep the robot stationary; this demo only uses camera frames. +- Use a large, bright signal and avoid glare. +- Use ntfy only when the laptop has internet while connected to the robot setup. + diff --git a/examples/go2_visual_event_notifier/demo_signals.html b/examples/go2_visual_event_notifier/demo_signals.html new file mode 100644 index 0000000000..33f98490cd --- /dev/null +++ b/examples/go2_visual_event_notifier/demo_signals.html @@ -0,0 +1,178 @@ + + + + + + Go2 Visual Event Signals + + + +
+
+ + + +
+
+
+ + + + +
+ + + + diff --git a/examples/go2_visual_event_notifier/go2_traffic_light_companion.py b/examples/go2_visual_event_notifier/go2_traffic_light_companion.py new file mode 100755 index 0000000000..717c3af710 --- /dev/null +++ b/examples/go2_visual_event_notifier/go2_traffic_light_companion.py @@ -0,0 +1,390 @@ +#!/usr/bin/env python3 +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Go2 visual-event notifier demo. + +This example turns stable red, yellow, or green visual states into phone alerts. +It can read from a local webcam for rehearsal or from a real Unitree Go2 camera +through DimOS's direct WebRTC connection. +""" + +from __future__ import annotations + +import argparse +import json +import os +from pathlib import Path +import threading +import time +from typing import Any, Literal + +import cv2 +import numpy as np +import requests + +from dimos.agents.skills.browser_notification_skill import BrowserNotificationSkill +from dimos.core.global_config import global_config +from dimos.perception.visual_events.traffic_light import ( + StableStateDebouncer, + TrafficLightColorDetector, + TrafficLightDetection, + TrafficLightState, +) + +SourceName = Literal["go2", "webcam"] + + +EVENT_MESSAGES: dict[TrafficLightState, dict[str, str]] = { + "red": { + "title": "Stop", + "message": "Red visual signal detected.", + "browser_urgency": "high", + "ntfy_priority": "urgent", + "ntfy_tags": "rotating_light,warning", + }, + "yellow": { + "title": "Caution", + "message": "Yellow visual signal detected.", + "browser_urgency": "high", + "ntfy_priority": "high", + "ntfy_tags": "warning", + }, + "green": { + "title": "Go", + "message": "Green visual signal detected.", + "browser_urgency": "normal", + "ntfy_priority": "default", + "ntfy_tags": "green_circle,heavy_check_mark", + }, + "unknown": { + "title": "Unknown", + "message": "No stable visual signal detected.", + "browser_urgency": "normal", + "ntfy_priority": "default", + "ntfy_tags": "question", + }, +} + + +def parse_roi(value: str | None) -> tuple[float, float, float, float] | None: + """Parse a normalized ROI string in x1,y1,x2,y2 form.""" + + if not value: + return None + + parts = [float(part.strip()) for part in value.split(",")] + if len(parts) != 4: + raise argparse.ArgumentTypeError("--roi must be x1,y1,x2,y2") + + x1, y1, x2, y2 = parts + if not (0 <= x1 < x2 <= 1 and 0 <= y1 < y2 <= 1): + raise argparse.ArgumentTypeError("--roi values must satisfy 0 <= x1 < x2 <= 1 and 0 <= y1 < y2 <= 1") + return x1, y1, x2, y2 + + +class BrowserAlertPublisher: + """Manage the local DimOS browser notification skill for this example.""" + + def __init__(self, enabled: bool, listen_host: str, port: int) -> None: + self.enabled = enabled + self.listen_host = listen_host + self.port = port + self._skill: BrowserNotificationSkill | None = None + + @property + def url(self) -> str: + return f"https://{self.listen_host}:{self.port}/notify" + + def start(self) -> None: + if not self.enabled: + return + + global_config.listen_host = self.listen_host + self._skill = BrowserNotificationSkill(server_port=self.port) + self._skill.start() + print(f"[BROWSER] Open {self.url} on your phone and tap Enable alerts.", flush=True) + + def stop(self) -> None: + if self._skill is not None: + self._skill.stop() + self._skill = None + + def publish(self, title: str, message: str, urgency: str) -> str: + if self._skill is None: + return "browser alerts disabled" + return self._skill.notify_user( + title=title, + message=message, + urgency=urgency, + sound=True, + vibrate=True, + ) + + +def publish_ntfy( + *, + server: str, + topic: str, + title: str, + message: str, + priority: str, + tags: str, + timeout_s: float, +) -> str: + """Publish a notification through ntfy.""" + + if not topic: + return "ntfy skipped: no topic configured" + + response = requests.post( + f"{server.rstrip('/')}/{topic}", + data=message.encode("utf-8"), + headers={ + "Title": title, + "Priority": priority, + "Tags": tags, + "Content-Type": "text/plain; charset=utf-8", + }, + timeout=timeout_s, + ) + response.raise_for_status() + return f"ntfy status={response.status_code}" + + +def append_jsonl(path: str | None, record: dict[str, Any]) -> None: + """Append one JSON record to a log file if logging is enabled.""" + + if not path: + return + + log_path = Path(path) + log_path.parent.mkdir(parents=True, exist_ok=True) + with log_path.open("a", encoding="utf-8") as log_file: + log_file.write(json.dumps(record, ensure_ascii=True) + "\n") + + +def handle_event( + args: argparse.Namespace, + detection: TrafficLightDetection, + browser_alerts: BrowserAlertPublisher, +) -> None: + """Send all configured outputs for one stable visual event.""" + + event = EVENT_MESSAGES[detection.state] + record: dict[str, Any] = { + "ts": time.time(), + "event": detection.state, + "confidence": detection.confidence, + "area_ratio": detection.area_ratio, + "red_score": detection.red_score, + "yellow_score": detection.yellow_score, + "green_score": detection.green_score, + "notification": event, + } + + print( + "\n[EVENT] " + f"{detection.state.upper()} confidence={detection.confidence:.2f} " + f"area={detection.area_ratio:.3f}", + flush=True, + ) + + browser_result = browser_alerts.publish( + title=event["title"], + message=event["message"], + urgency=event["browser_urgency"], + ) + record["browser_result"] = browser_result + print(f"[BROWSER] {browser_result}", flush=True) + + if args.ntfy_topic: + try: + ntfy_result = publish_ntfy( + server=args.ntfy_server, + topic=args.ntfy_topic, + title=f"Go2 {event['title']}", + message=event["message"], + priority=event["ntfy_priority"], + tags=event["ntfy_tags"], + timeout_s=args.ntfy_timeout_s, + ) + record["ntfy_result"] = ntfy_result + print(f"[NTFY] {ntfy_result}", flush=True) + except requests.RequestException as exc: + record["ntfy_error"] = repr(exc) + print(f"[NTFY_ERROR] {exc!r}", flush=True) + + append_jsonl(args.log_file, record) + + +def process_frame( + args: argparse.Namespace, + detector: TrafficLightColorDetector, + debouncer: StableStateDebouncer, + browser_alerts: BrowserAlertPublisher, + frame_rgb: np.ndarray, +) -> bool: + """Process one RGB frame. Returns False when the display window requests stop.""" + + detection = detector.classify(frame_rgb, color_space="RGB", roi=args.roi) + print( + f"[FRAME] state={detection.state:<7} conf={detection.confidence:.2f} " + f"R={detection.red_score:.3f} Y={detection.yellow_score:.3f} G={detection.green_score:.3f}", + end="\r", + flush=True, + ) + + event = debouncer.update(detection.state) + if event is not None: + handle_event(args, detection, browser_alerts) + + if args.display: + debug = detector.draw_debug_overlay(frame_rgb, detection, color_space="RGB") + cv2.imshow("Go2 Visual Event Notifier", debug) + if cv2.waitKey(1) & 0xFF == ord("q"): + return False + + return True + + +def run_webcam(args: argparse.Namespace, browser_alerts: BrowserAlertPublisher) -> None: + """Run the detector against a local webcam.""" + + cap = cv2.VideoCapture(args.camera_index) + if not cap.isOpened(): + raise RuntimeError(f"Could not open webcam index {args.camera_index}") + + detector = TrafficLightColorDetector(min_area_ratio=args.min_area_ratio) + debouncer = StableStateDebouncer(stable_frames=args.stable_frames, cooldown_s=args.cooldown_s) + start = time.time() + + print("[INFO] Running webcam source. Press Ctrl-C or q in the display window to stop.", flush=True) + try: + while True: + ok, frame_bgr = cap.read() + if not ok: + time.sleep(0.05) + continue + + frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) + if not process_frame(args, detector, debouncer, browser_alerts, frame_rgb): + break + + if args.run_seconds and time.time() - start > args.run_seconds: + break + finally: + cap.release() + cv2.destroyAllWindows() + + +def run_go2(args: argparse.Namespace, browser_alerts: BrowserAlertPublisher) -> None: + """Run the detector against a real Go2 WebRTC camera stream.""" + + from dimos.robot.unitree.connection import UnitreeWebRTCConnection + + latest_frame: dict[str, np.ndarray | None] = {"frame": None} + lock = threading.Lock() + + print(f"[INFO] Connecting to Go2 at {args.robot_ip}", flush=True) + connection = UnitreeWebRTCConnection(args.robot_ip) + + def on_image(image: Any) -> None: + try: + frame_rgb = image.to_rgb().as_numpy() + except Exception: + frame_rgb = image.as_numpy() + + with lock: + latest_frame["frame"] = np.array(frame_rgb, copy=True) + + subscription = connection.video_stream().subscribe(on_image) + detector = TrafficLightColorDetector(min_area_ratio=args.min_area_ratio) + debouncer = StableStateDebouncer(stable_frames=args.stable_frames, cooldown_s=args.cooldown_s) + start = time.time() + + print("[INFO] Go2 video source running. Press Ctrl-C to stop.", flush=True) + try: + while True: + with lock: + frame = None if latest_frame["frame"] is None else np.array(latest_frame["frame"], copy=True) + + if frame is None: + print("[INFO] Waiting for Go2 camera frame...", end="\r", flush=True) + time.sleep(0.1) + continue + + if not process_frame(args, detector, debouncer, browser_alerts, frame): + break + + if args.run_seconds and time.time() - start > args.run_seconds: + break + + time.sleep(1.0 / max(args.fps, 1.0)) + finally: + print("\n[INFO] Shutting down Go2 connection.", flush=True) + subscription.dispose() + connection.stop() + cv2.destroyAllWindows() + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Go2 visual event notifier demo") + parser.add_argument("--source", choices=["go2", "webcam"], default="go2") + parser.add_argument("--robot-ip", default=os.environ.get("ROBOT_IP", "192.168.12.1")) + parser.add_argument("--camera-index", type=int, default=0) + + parser.add_argument("--browser-alerts", action="store_true") + parser.add_argument("--listen-host", default="127.0.0.1") + parser.add_argument("--browser-port", type=int, default=8450) + + parser.add_argument("--ntfy-topic", default=os.environ.get("NTFY_TOPIC", "")) + parser.add_argument("--ntfy-server", default=os.environ.get("NTFY_SERVER", "https://ntfy.sh")) + parser.add_argument("--ntfy-timeout-s", type=float, default=5.0) + + parser.add_argument("--min-area-ratio", type=float, default=0.015) + parser.add_argument("--stable-frames", type=int, default=3) + parser.add_argument("--cooldown-s", type=float, default=2.0) + parser.add_argument("--fps", type=float, default=5.0) + parser.add_argument("--run-seconds", type=float, default=0.0) + parser.add_argument("--roi", type=parse_roi, default=None, help="Optional normalized crop x1,y1,x2,y2") + parser.add_argument("--display", action="store_true") + parser.add_argument("--log-file", default="go2_visual_event_log.jsonl") + return parser + + +def main() -> None: + args = build_parser().parse_args() + browser_alerts = BrowserAlertPublisher( + enabled=args.browser_alerts, + listen_host=args.listen_host, + port=args.browser_port, + ) + + print("[INFO] Config:") + print(json.dumps(vars(args), indent=2, default=str), flush=True) + + browser_alerts.start() + try: + if args.source == "webcam": + run_webcam(args, browser_alerts) + else: + run_go2(args, browser_alerts) + finally: + browser_alerts.stop() + + +if __name__ == "__main__": + main() + diff --git a/pyproject.toml b/pyproject.toml index 39db0c1b7c..92616928e4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -212,6 +212,7 @@ agents = [ web = [ "fastapi>=0.115.6", + "python-multipart>=0.0.27", "sse-starlette>=2.2.1", "uvicorn>=0.34.0", "jinja2>=3.1.6", diff --git a/uv.lock b/uv.lock index b4da9cadaf..3a31e4dc24 100644 --- a/uv.lock +++ b/uv.lock @@ -40,7 +40,7 @@ resolution-markers = [ ] [options] -exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. +exclude-newer = "2026-05-21T13:22:29.307859Z" exclude-newer-span = "P7D" [options.exclude-newer-package] @@ -2028,6 +2028,7 @@ base = [ { name = "omegaconf" }, { name = "openai" }, { name = "pillow" }, + { name = "python-multipart" }, { name = "rerun-sdk" }, { name = "sounddevice" }, { name = "soundfile" }, @@ -2159,6 +2160,7 @@ unitree = [ { name = "omegaconf" }, { name = "openai" }, { name = "pillow" }, + { name = "python-multipart" }, { name = "rerun-sdk" }, { name = "sounddevice" }, { name = "soundfile" }, @@ -2192,6 +2194,7 @@ unitree-dds = [ { name = "omegaconf" }, { name = "openai" }, { name = "pillow" }, + { name = "python-multipart" }, { name = "rerun-sdk" }, { name = "sounddevice" }, { name = "soundfile" }, @@ -2210,6 +2213,7 @@ web = [ { name = "fastapi" }, { name = "ffmpeg-python" }, { name = "jinja2" }, + { name = "python-multipart" }, { name = "soundfile" }, { name = "sse-starlette" }, { name = "uvicorn" }, @@ -2457,6 +2461,7 @@ requires-dist = [ { name = "pyrealsense2", marker = "sys_platform != 'darwin' and extra == 'manipulation'" }, { name = "python-dotenv" }, { name = "python-multipart", marker = "extra == 'misc'", specifier = ">=0.0.27" }, + { name = "python-multipart", marker = "extra == 'web'", specifier = ">=0.0.27" }, { name = "pyturbojpeg", specifier = "==1.8.2" }, { name = "pyturbojpeg", marker = "extra == 'docker'" }, { name = "pyyaml", marker = "extra == 'manipulation'", specifier = ">=6.0" },