Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions dimos/msgs/sensor_msgs/Image.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,22 @@ def lcm_decode(cls, data: bytes, **kwargs: Any) -> Image:
),
)

def to_jpeg_bytes(self, quality: int = 75) -> bytes:
"""Encode this image as JPEG bytes using TurboJPEG.

Args:
quality: JPEG compression quality (0-100, default 75).

Returns:
Raw JPEG bytes.
"""
from turbojpeg import TJPF_RGB, TurboJPEG

jpeg = TurboJPEG()
# Canonicalize to RGB so JPEG bytes are deterministic regardless of input format.
rgb_array = self.to_rgb().data
return jpeg.encode(rgb_array, quality=quality, pixel_format=TJPF_RGB) # type: ignore[no-any-return]

def lcm_jpeg_encode(self, quality: int = 75, frame_id: str | None = None) -> bytes:
"""Convert to LCM Image message with JPEG-compressed data.

Expand All @@ -539,9 +555,6 @@ def lcm_jpeg_encode(self, quality: int = 75, frame_id: str | None = None) -> byt
Returns:
LCM-encoded bytes with JPEG-compressed image data
"""
from turbojpeg import TJPF_RGB, TurboJPEG

jpeg = TurboJPEG()
msg = LCMImage()

# Header
Expand All @@ -558,9 +571,7 @@ def lcm_jpeg_encode(self, quality: int = 75, frame_id: str | None = None) -> byt
msg.header.stamp.sec = int(now)
msg.header.stamp.nsec = int((now - int(now)) * 1e9)

# Canonicalize to RGB so JPEG bytes are deterministic regardless of input format.
rgb_array = self.to_rgb().data
jpeg_data = jpeg.encode(rgb_array, quality=quality, pixel_format=TJPF_RGB)
jpeg_data = self.to_jpeg_bytes(quality=quality)

# Store JPEG data and metadata
msg.height = self.height
Expand Down
4 changes: 4 additions & 0 deletions dimos/robot/all_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@
"teleop-phone-go2": "dimos.teleop.phone.blueprints:teleop_phone_go2",
"teleop-phone-go2-fleet": "dimos.teleop.phone.blueprints:teleop_phone_go2_fleet",
"teleop-quest-dual": "dimos.teleop.quest.blueprints:teleop_quest_dual",
"teleop-quest-go2": "dimos.teleop.quest.blueprints:teleop_quest_go2",
"teleop-quest-piper": "dimos.teleop.quest.blueprints:teleop_quest_piper",
"teleop-quest-rerun": "dimos.teleop.quest.blueprints:teleop_quest_rerun",
"teleop-quest-xarm6": "dimos.teleop.quest.blueprints:teleop_quest_xarm6",
"teleop-quest-xarm7": "dimos.teleop.quest.blueprints:teleop_quest_xarm7",
"teleop-quest-xarm7-video": "dimos.teleop.quest.blueprints:teleop_quest_xarm7_video",
"uintree-g1-primitive-no-nav": "dimos.robot.unitree.g1.blueprints.primitive.uintree_g1_primitive_no_nav:uintree_g1_primitive_no_nav",
"unitree-g1": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1:unitree_g1",
"unitree-g1-agentic": "dimos.robot.unitree.g1.blueprints.agentic.unitree_g1_agentic:unitree_g1_agentic",
Expand Down Expand Up @@ -153,6 +155,7 @@
"go2-connection": "dimos.robot.unitree.go2.connection.GO2Connection",
"go2-fleet-connection": "dimos.robot.unitree.go2.fleet_connection.Go2FleetConnection",
"go2-memory": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2.Go2Memory",
"go2-teleop-module": "dimos.teleop.quest.quest_extensions.Go2TeleopModule",
"google-maps-skill-container": "dimos.agents.skills.google_maps_skill_container.GoogleMapsSkillContainer",
"gps-nav-skill-container": "dimos.agents.skills.gps_nav_skill.GpsNavSkillContainer",
"grasp-gen-module": "dimos.manipulation.grasping.graspgen_module.GraspGenModule",
Expand Down Expand Up @@ -215,6 +218,7 @@
"unitree-g1-skill-container": "dimos.robot.unitree.g1.skill_container.UnitreeG1SkillContainer",
"unitree-skill-container": "dimos.robot.unitree.unitree_skill_container.UnitreeSkillContainer",
"unity-bridge-module": "dimos.simulation.unity.module.UnityBridgeModule",
"video-arm-teleop-module": "dimos.teleop.quest.quest_extensions.VideoArmTeleopModule",
"vlm-agent": "dimos.agents.vlm_agent.VLMAgent",
"vlm-stream-tester": "dimos.agents.vlm_stream_tester.VlmStreamTester",
"voxel-grid-mapper": "dimos.mapping.voxels.VoxelGridMapper",
Expand Down
2 changes: 2 additions & 0 deletions dimos/robot/test_all_blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@
"coordinator-xarm7",
"dual-xarm6-planner",
"teleop-quest-dual",
"teleop-quest-go2",
"teleop-quest-piper",
"teleop-quest-rerun",
"teleop-quest-xarm6",
"teleop-quest-xarm7",
"teleop-quest-xarm7-video",
"unitree-g1-nav-sim",
"xarm-perception",
"xarm-perception-agent",
Expand Down
47 changes: 45 additions & 2 deletions dimos/teleop/quest/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,24 @@
hardware. The underlying coordinator blueprints branch on `global_config.simulation`.
"""

from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE
from dimos.control.blueprints.teleop import (
coordinator_teleop_dual,
coordinator_teleop_piper,
coordinator_teleop_xarm6,
coordinator_teleop_xarm7,
)
from dimos.core.coordination.blueprints import autoconnect
from dimos.core.transport import LCMTransport
from dimos.core.transport import LCMTransport, pSHMTransport
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
from dimos.teleop.quest.quest_extensions import ArmTeleopModule
from dimos.msgs.geometry_msgs.Twist import Twist
from dimos.msgs.sensor_msgs.Image import Image
from dimos.robot.unitree.go2.connection import GO2Connection
from dimos.teleop.quest.quest_extensions import (
ArmTeleopModule,
Go2TeleopModule,
VideoArmTeleopModule,
)
from dimos.teleop.quest.quest_types import Buttons
from dimos.visualization.vis_module import vis_module

Expand Down Expand Up @@ -59,6 +67,21 @@
)


# XArm7 teleop + camera streaming into the Quest scene as a panel.
teleop_quest_xarm7_video = autoconnect(
VideoArmTeleopModule.blueprint(task_names={"right": "teleop_xarm"}),
coordinator_teleop_xarm7,
).transports(
{
("right_controller_output", PoseStamped): LCMTransport(
"/coordinator/cartesian_command", PoseStamped
),
("buttons", Buttons): LCMTransport("/teleop/buttons", Buttons),
("color_image", Image): LCMTransport("/teleop/color_image", Image),
}
)


# Piper teleop (sim with --simulation, real otherwise): left controller -> piper arm
teleop_quest_piper = autoconnect(
ArmTeleopModule.blueprint(task_names={"left": "teleop_piper"}),
Expand Down Expand Up @@ -104,10 +127,30 @@
)


# Go2 quadruped: thumbstick velocity teleop + camera streamed to the headset.
teleop_quest_go2 = (
autoconnect(
Go2TeleopModule.blueprint(),
GO2Connection.blueprint(),
)
.transports(
{
("cmd_vel", Twist): LCMTransport("/cmd_vel", Twist),
("color_image", Image): pSHMTransport(
"color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE
),
}
)
.global_config(robot_model="unitree_go2")
)


__all__ = [
"teleop_quest_dual",
"teleop_quest_go2",
"teleop_quest_piper",
"teleop_quest_rerun",
"teleop_quest_xarm6",
"teleop_quest_xarm7",
"teleop_quest_xarm7_video",
]
137 changes: 136 additions & 1 deletion dimos/teleop/quest/quest_extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,63 @@
Available subclasses:
- ArmTeleopModule: Per-hand press-and-hold engage (X/A hold to track), task name routing
- TwistTeleopModule: Outputs Twist instead of PoseStamped
- VideoArmTeleopModule: ArmTeleopModule + JPEG frames pushed to the Quest over /ws
- Go2TeleopModule: Thumbstick → Twist velocity for the Go2 + camera over /ws
"""

import asyncio
from typing import Any

from fastapi import WebSocket
from pydantic import Field

from dimos.core.core import rpc
from dimos.core.stream import Out
from dimos.core.stream import In, Out
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
from dimos.msgs.geometry_msgs.Twist import Twist
from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped
from dimos.msgs.geometry_msgs.Vector3 import Vector3
from dimos.msgs.sensor_msgs.Image import Image
from dimos.teleop.quest.quest_teleop_module import Hand, QuestTeleopConfig, QuestTeleopModule
from dimos.teleop.quest.quest_types import Buttons, QuestControllerState
from dimos.utils.logging_config import setup_logger

logger = setup_logger()


async def _ws_send_jpeg(ws: WebSocket, data: bytes) -> None:
try:
await ws.send_bytes(data)
except Exception:
# Client closed or write failed — drop the frame; the base /ws
# disconnect handler evicts the dead client.
pass


def _push_jpeg(module: QuestTeleopModule, msg: Image, quality: int) -> None:
"""JPEG-encode an Image and push it to all of module's connected /ws clients.

Runs on the RX thread; sends are scheduled on the asyncio loop captured by
QuestTeleopModule when the first client connected.
"""
# Snapshot clients under the lock to avoid concurrent set mutation from
# the uvicorn thread. Skip the encode entirely if nobody is listening.
loop = module._ws_loop
if loop is None:
return
with module._clients_lock:
clients = tuple(module._connected_clients)
if not clients:
return

try:
jpeg = msg.to_jpeg_bytes(quality=quality)
except Exception:
logger.exception("Failed to encode camera frame")
return

for ws in clients:
asyncio.run_coroutine_threadsafe(_ws_send_jpeg(ws, jpeg), loop)


class TwistTeleopConfig(QuestTeleopConfig):
Expand Down Expand Up @@ -148,3 +193,93 @@ def _publish_button_state(
right=right.trigger if right is not None else 0.0,
)
self.buttons.publish(buttons)


class VideoArmTeleopConfig(ArmTeleopConfig):
"""Configuration for VideoArmTeleopModule."""

video_jpeg_quality: int = 70


class VideoArmTeleopModule(ArmTeleopModule):
"""ArmTeleopModule + camera frames pushed to the Quest as JPEG over /ws.

Subscribes to color_image, JPEG-encodes each frame, and broadcasts raw
JPEG bytes to every connected /ws client as a binary message. The client
decodes via createObjectURL and uploads to a WebGL texture.

Inputs:
- color_image: In[Image] (required — wire to a camera output)

Outputs:
- left_controller_output: PoseStamped (inherited)
- right_controller_output: PoseStamped (inherited)
- buttons: Buttons (inherited)
"""

config: VideoArmTeleopConfig

color_image: In[Image]

async def handle_color_image(self, msg: Image) -> None:
_push_jpeg(self, msg, self.config.video_jpeg_quality)


class Go2TeleopConfig(QuestTeleopConfig):
"""Configuration for Go2TeleopModule."""

linear_speed: float = 0.5 # m/s at full stick deflection
angular_speed: float = 0.8 # rad/s at full stick deflection
deadzone: float = 0.1
video_jpeg_quality: int = 70


class Go2TeleopModule(QuestTeleopModule):
"""Quest teleop for the Unitree Go2: thumbstick driving + camera in the headset.

Velocity is derived from the controller thumbsticks as each Joy message
arrives (left stick → forward/strafe, right stick → yaw) and published on
cmd_vel for GO2Connection.move. The Go2 camera (color_image) is JPEG-encoded
and pushed to the headset over /ws. A deadzone suppresses stick drift.

Inputs:
- color_image: In[Image] (wire to the Go2 camera output)

Outputs:
- cmd_vel: Twist (base velocity command)
"""

config: Go2TeleopConfig

color_image: In[Image]
cmd_vel: Out[Twist]

def _deadzone(self, v: float) -> float:
return 0.0 if abs(v) < self.config.deadzone else v

def _on_joy_bytes(self, data: bytes) -> None:
super()._on_joy_bytes(data)
with self._lock:
left = self._controllers.get(Hand.LEFT)
right = self._controllers.get(Hand.RIGHT)
twist = Twist()
twist.linear = Vector3(0.0, 0.0, 0.0)
twist.angular = Vector3(0.0, 0.0, 0.0)
if left is not None:
twist.linear.x = -self._deadzone(left.thumbstick.y) * self.config.linear_speed
twist.linear.y = -self._deadzone(left.thumbstick.x) * self.config.linear_speed
if right is not None:
twist.angular.z = -self._deadzone(right.thumbstick.x) * self.config.angular_speed
self.cmd_vel.publish(twist)

async def handle_color_image(self, msg: Image) -> None:
_push_jpeg(self, msg, self.config.video_jpeg_quality)

@rpc
def stop(self) -> None:
# Send one zero Twist so the base halts if teleop dies mid-motion.
try:
self.cmd_vel.publish(Twist.zero())
except Exception:
logger.exception("Failed to publish stop Twist")
super().stop()
14 changes: 14 additions & 0 deletions dimos/teleop/quest/quest_teleop_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
deltas, and publishes PoseStamped commands.
"""

import asyncio
from dataclasses import dataclass
from enum import IntEnum
from pathlib import Path
Expand Down Expand Up @@ -125,6 +126,13 @@ def __init__(self, **kwargs: Any) -> None:
LCMJoy._get_packed_fingerprint(): self._on_joy_bytes,
}

# Tracked here so subclasses can push from non-asyncio threads.
# _clients_lock guards add/discard/snapshot of the set across the
# uvicorn thread and the RX subscriber thread.
self._connected_clients: set[WebSocket] = set()
self._clients_lock = threading.Lock()
self._ws_loop: asyncio.AbstractEventLoop | None = None

self._setup_routes()

def _setup_routes(self) -> None:
Expand All @@ -143,6 +151,9 @@ async def teleop_index() -> HTMLResponse:
@self._web_server.app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket) -> None:
await ws.accept()
self._ws_loop = asyncio.get_running_loop()
Comment thread
ruthwikdasyam marked this conversation as resolved.
with self._clients_lock:
self._connected_clients.add(ws)
logger.info("Quest client connected")
try:
while True:
Expand All @@ -157,6 +168,9 @@ async def websocket_endpoint(ws: WebSocket) -> None:
logger.info("Quest client disconnected")
except Exception:
logger.exception("WebSocket error")
finally:
with self._clients_lock:
self._connected_clients.discard(ws)

@rpc
def start(self) -> None:
Expand Down
Loading
Loading