Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions dimensional-hackathon/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Go2 Minimal Demo

This is a stripped-down demo app for the Unitree Go2 Air.

What it does:

- connects to the Go2 over DimensionalOS WebRTC
- serves the live FPV stream on a local dashboard
- runs a simple sequential waypoint patrol
- accepts `/patrol`, `/stop`, and `/status` via Telegram
- runs YOLO-only tool detection, detects persons, chairs, boxes etc
- sends Telegram alerts with a JPG snapshot
- plays a WAV locally and, if available, through the Go2 speaker


## Layout

- `config.yaml`: runtime config
- `.env.example`: required environment variables
- `run.sh`: start script
- `demo_app/`: source-only application package

## Setup

From the workspace root:

```bash
cd dimos/demo
pip install -r requirements.txt
cp .env.example .env
```

Set values in `.env`:

```bash
TELEGRAM_BOT_TOKEN=...
TELEGRAM_OWNER_CHAT_ID=...
ROBOT_IP=192.168.12.1
```

If you prefer shell exports, that also works.

## Run

```bash
cd dimos/demo
./run.sh
```

Then open:

- dashboard: [http://localhost:8080](http://localhost:8080)

Telegram commands:

- `/start`
- `/patrol`
- `/stop`
- `/status`

## Notes

- The app is intended to run on your laptop or another host on the same Wi-Fi as the robot.
- Detection is YOLO-only. Any matching detection above the configured threshold can trigger an alert.
56 changes: 56 additions & 0 deletions dimensional-hackathon/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
robot:
ip: "192.168.12.1"
obstacle_avoidance: true
camera_resize: [640, 480]
connect_timeout_sec: 15

waypoints:
- {id: W1, pos: [0.5, 0.0, 0.0], yaw: 0}
- {id: W2, pos: [0.5, 0.5, 0.0], yaw: 90}

patrol:
loop_forever: true
scan_turns: 1
scan_pause_sec: 0.15
motion_settle_sec: 0.05
forward_steps_per_cycle: 5
forward_speed_mps: 0.60
forward_step_duration_sec: 1.25
sweep_yaw_radps: 0.24
sweep_turn_duration_sec: 0.28

detection:
enabled: true
model_name: yolo11n.pt
interval_sec: 0.5
conf_threshold: 0.25
cooldown_sec: 0
detection_classes: [chair, office chair, box, container]

aisle:
enabled: true
x_min_m: 0.4
x_max_m: 2.0
half_width_m: 0.42
min_points_in_zone: 6
min_z_m: -0.35
max_z_m: 1.20
cell_size_m: 0.12
min_occupied_cells: 2
alert_repeat_sec: 3.0
closeup_stop_distance_m: 0.8
approach_step_m: 0.25
max_approach_steps: 3
turn_step_deg: 12
reclear_consecutive_frames: 2

alert:
audio_file: ../deploy_agentics_real/assets/alert.wav
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The audio_file path is a relative reference to ../deploy_agentics_real/assets/alert.wav, which is outside the repo and almost certainly won't exist for anyone cloning this project. The app will fail silently (the _play_local_blocking catches exceptions) but no alert sound will ever play. The README doesn't mention this file or how to obtain it. Consider shipping a placeholder path (e.g., assets/alert.wav) and documenting how to provide the file.

Suggested change
audio_file: ../deploy_agentics_real/assets/alert.wav
audio_file: assets/alert.wav # Provide a WAV file at this path

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clip_duration_sec: 5
buffer_seconds: 10
capture_fps: 15

web:
host: 0.0.0.0
port: 8080
stream_fps: 10
1 change: 1 addition & 0 deletions dimensional-hackathon/demo_app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Minimal Go2 patrol demo."""
114 changes: 114 additions & 0 deletions dimensional-hackathon/demo_app/aisle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from __future__ import annotations

import numpy as np

from demo_app.types import AisleObservation


class AisleCorridorDetector:
def __init__(
self,
x_min_m: float,
x_max_m: float,
half_width_m: float,
min_points_in_zone: int,
min_z_m: float,
max_z_m: float,
cell_size_m: float,
min_occupied_cells: int,
) -> None:
self._x_min = x_min_m
self._x_max = x_max_m
self._half_width = half_width_m
self._min_points = min_points_in_zone
self._min_z = min_z_m
self._max_z = max_z_m
self._cell_size = max(cell_size_m, 0.01)
self._min_cells = max(min_occupied_cells, 1)

def analyze(self, points: np.ndarray) -> AisleObservation:
if points.size == 0:
return AisleObservation(
corridor_clear=True,
obstruction_distance_m=None,
obstruction_direction="center",
obstruction_point_count=0,
occupied_cell_count=0,
obstruction_center_xy=None,
)

pts = points.astype(np.float32, copy=False)
finite = np.isfinite(pts).all(axis=1)
pts = pts[finite]
if pts.size == 0:
return AisleObservation(
corridor_clear=True,
obstruction_distance_m=None,
obstruction_direction="center",
obstruction_point_count=0,
occupied_cell_count=0,
obstruction_center_xy=None,
)

if pts.shape[1] >= 3:
height_mask = (pts[:, 2] >= self._min_z) & (pts[:, 2] <= self._max_z)
pts = pts[height_mask]
if pts.size == 0:
return AisleObservation(
corridor_clear=True,
obstruction_distance_m=None,
obstruction_direction="center",
obstruction_point_count=0,
occupied_cell_count=0,
obstruction_center_xy=None,
)

corridor_mask = (
(pts[:, 0] >= self._x_min)
& (pts[:, 0] <= self._x_max)
& (np.abs(pts[:, 1]) <= self._half_width)
)
zone = pts[corridor_mask]
point_count = int(zone.shape[0])
if point_count < self._min_points:
return AisleObservation(
corridor_clear=True,
obstruction_distance_m=None,
obstruction_direction="center",
obstruction_point_count=point_count,
occupied_cell_count=0,
obstruction_center_xy=None,
)

cell_xy = np.floor(zone[:, :2] / self._cell_size).astype(np.int32)
occupied_cells = np.unique(cell_xy, axis=0)
occupied_count = int(occupied_cells.shape[0])
if occupied_count < self._min_cells:
return AisleObservation(
corridor_clear=True,
obstruction_distance_m=None,
obstruction_direction="center",
obstruction_point_count=point_count,
occupied_cell_count=occupied_count,
obstruction_center_xy=None,
)

nearest_idx = int(np.argmin(zone[:, 0]))
nearest_distance = float(zone[nearest_idx, 0])
center_y = float(np.mean(zone[:, 1]))
if center_y > 0.12:
direction = "left"
elif center_y < -0.12:
direction = "right"
else:
direction = "center"

center_x = float(np.mean(zone[:, 0]))
return AisleObservation(
corridor_clear=False,
obstruction_distance_m=nearest_distance,
obstruction_direction=direction,
obstruction_point_count=point_count,
occupied_cell_count=occupied_count,
obstruction_center_xy=(center_x, center_y),
)
30 changes: 30 additions & 0 deletions dimensional-hackathon/demo_app/audio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from __future__ import annotations

import asyncio
import logging
from pathlib import Path
from typing import Any

from demo_app.types import AlertEvent

logger = logging.getLogger(__name__)


class AudioAlert:
def __init__(self, audio_file: str, runner: Any = None):
self._audio_file = str(Path(audio_file))
self._runner = runner

async def play(self, event: AlertEvent) -> None:
tasks = [asyncio.to_thread(self._play_local_blocking)]
if self._runner is not None and hasattr(self._runner, "play_alert_on_robot"):
tasks.append(asyncio.create_task(self._runner.play_alert_on_robot(self._audio_file)))
await asyncio.gather(*tasks, return_exceptions=True)

def _play_local_blocking(self) -> None:
try:
import playsound3

playsound3.playsound(self._audio_file, block=True)
except Exception as e:
logger.warning("Local audio playback failed: %s", e)
98 changes: 98 additions & 0 deletions dimensional-hackathon/demo_app/capture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from __future__ import annotations

import asyncio
from collections import deque
from pathlib import Path

import cv2
import numpy as np

from demo_app.types import AlertEvent


class CaptureBuffer:
def __init__(self, buffer_seconds: int, fps: int, output_dir: Path):
self._buffer: deque[tuple[float, np.ndarray]] = deque(maxlen=buffer_seconds * fps)
self._fps = fps
self._output_dir = output_dir
self._output_dir.mkdir(parents=True, exist_ok=True)

def push(self, frame: np.ndarray, timestamp: float) -> None:
self._buffer.append((timestamp, frame.copy()))

async def snapshot(self, event: AlertEvent) -> Path:
return await asyncio.to_thread(self._encode_snapshot, event)

async def snapshot_and_clip(self, event: AlertEvent, duration_sec: float) -> tuple[Path, Path]:
return await asyncio.to_thread(self._encode, event, duration_sec)

def _encode_snapshot(self, event: AlertEvent) -> Path:
ts_label = int(event.timestamp * 1000)
jpg_path = self._output_dir / f"obstruction_{ts_label}.jpg"
annotated = event.frame.copy()
for idx, line in enumerate([
"AISLE OBSTRUCTION DETECTED",
f"dir={event.obstruction_direction or 'center'} "
f"dist={event.obstruction_distance_m:.2f}m" if event.obstruction_distance_m is not None else "dist=unknown",
f"points={event.obstruction_point_count}",
]):
y = 28 + idx * 28
cv2.putText(
annotated,
line,
(12, y),
cv2.FONT_HERSHEY_SIMPLEX,
0.75,
(0, 0, 255),
2,
)

detections = event.evidence_detections or []
if detections:
for det in detections:
x1, y1, x2, y2 = det.bbox
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 255), 2)
label = f"{det.class_name} {det.confidence:.2f}"
cv2.putText(
annotated,
label,
(x1, max(90, y1 - 8)),
cv2.FONT_HERSHEY_SIMPLEX,
0.6,
(0, 255, 255),
2,
)
elif event.bbox is not None:
x1, y1, x2, y2 = event.bbox
cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 0), 2)

cv2.imwrite(str(jpg_path), annotated)
return jpg_path

def _encode(self, event: AlertEvent, duration_sec: float) -> tuple[Path, Path]:
jpg_path = self._encode_snapshot(event)
ts_label = int(event.timestamp * 1000)
mp4_path = self._output_dir / f"anomaly_{ts_label}.mp4"
annotated = event.frame.copy()

half = duration_sec / 2.0
window = [
(ts, frame)
for ts, frame in list(self._buffer)
if event.timestamp - half <= ts <= event.timestamp + half
]
if not window:
fallback = list(self._buffer)[-int(duration_sec * self._fps):]
window = fallback or [(event.timestamp, annotated)]

h, w = window[0][1].shape[:2]
writer = cv2.VideoWriter(
str(mp4_path),
cv2.VideoWriter_fourcc(*"mp4v"),
self._fps,
(w, h),
)
for _, frame in window:
writer.write(frame)
writer.release()
return jpg_path, mp4_path
Loading