diff --git a/dimos/msgs/sensor_msgs/Image.py b/dimos/msgs/sensor_msgs/Image.py index 2a68f14eb3..5eaca03886 100644 --- a/dimos/msgs/sensor_msgs/Image.py +++ b/dimos/msgs/sensor_msgs/Image.py @@ -529,6 +529,22 @@ def lcm_decode(cls, data: bytes, **kwargs: Any) -> Image: ), ) + def to_jpeg_bytes(self, quality: int = 75) -> bytes: + """Encode this image as JPEG bytes using TurboJPEG. + + Args: + quality: JPEG compression quality (0-100, default 75). + + Returns: + Raw JPEG bytes. + """ + from turbojpeg import TJPF_RGB, TurboJPEG + + jpeg = TurboJPEG() + # Canonicalize to RGB so JPEG bytes are deterministic regardless of input format. + rgb_array = self.to_rgb().data + return jpeg.encode(rgb_array, quality=quality, pixel_format=TJPF_RGB) # type: ignore[no-any-return] + def lcm_jpeg_encode(self, quality: int = 75, frame_id: str | None = None) -> bytes: """Convert to LCM Image message with JPEG-compressed data. @@ -539,9 +555,6 @@ def lcm_jpeg_encode(self, quality: int = 75, frame_id: str | None = None) -> byt Returns: LCM-encoded bytes with JPEG-compressed image data """ - from turbojpeg import TJPF_RGB, TurboJPEG - - jpeg = TurboJPEG() msg = LCMImage() # Header @@ -558,9 +571,7 @@ def lcm_jpeg_encode(self, quality: int = 75, frame_id: str | None = None) -> byt msg.header.stamp.sec = int(now) msg.header.stamp.nsec = int((now - int(now)) * 1e9) - # Canonicalize to RGB so JPEG bytes are deterministic regardless of input format. - rgb_array = self.to_rgb().data - jpeg_data = jpeg.encode(rgb_array, quality=quality, pixel_format=TJPF_RGB) + jpeg_data = self.to_jpeg_bytes(quality=quality) # Store JPEG data and metadata msg.height = self.height diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index c3e0f9fff0..66824b95e0 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -75,10 +75,12 @@ "teleop-phone-go2": "dimos.teleop.phone.blueprints:teleop_phone_go2", "teleop-phone-go2-fleet": "dimos.teleop.phone.blueprints:teleop_phone_go2_fleet", "teleop-quest-dual": "dimos.teleop.quest.blueprints:teleop_quest_dual", + "teleop-quest-go2": "dimos.teleop.quest.blueprints:teleop_quest_go2", "teleop-quest-piper": "dimos.teleop.quest.blueprints:teleop_quest_piper", "teleop-quest-rerun": "dimos.teleop.quest.blueprints:teleop_quest_rerun", "teleop-quest-xarm6": "dimos.teleop.quest.blueprints:teleop_quest_xarm6", "teleop-quest-xarm7": "dimos.teleop.quest.blueprints:teleop_quest_xarm7", + "teleop-quest-xarm7-video": "dimos.teleop.quest.blueprints:teleop_quest_xarm7_video", "uintree-g1-primitive-no-nav": "dimos.robot.unitree.g1.blueprints.primitive.uintree_g1_primitive_no_nav:uintree_g1_primitive_no_nav", "unitree-g1": "dimos.robot.unitree.g1.blueprints.perceptive.unitree_g1:unitree_g1", "unitree-g1-agentic": "dimos.robot.unitree.g1.blueprints.agentic.unitree_g1_agentic:unitree_g1_agentic", @@ -153,6 +155,7 @@ "go2-connection": "dimos.robot.unitree.go2.connection.GO2Connection", "go2-fleet-connection": "dimos.robot.unitree.go2.fleet_connection.Go2FleetConnection", "go2-memory": "dimos.robot.unitree.go2.blueprints.smart.unitree_go2.Go2Memory", + "go2-teleop-module": "dimos.teleop.quest.quest_extensions.Go2TeleopModule", "google-maps-skill-container": "dimos.agents.skills.google_maps_skill_container.GoogleMapsSkillContainer", "gps-nav-skill-container": "dimos.agents.skills.gps_nav_skill.GpsNavSkillContainer", "grasp-gen-module": "dimos.manipulation.grasping.graspgen_module.GraspGenModule", @@ -215,6 +218,7 @@ "unitree-g1-skill-container": "dimos.robot.unitree.g1.skill_container.UnitreeG1SkillContainer", "unitree-skill-container": "dimos.robot.unitree.unitree_skill_container.UnitreeSkillContainer", "unity-bridge-module": "dimos.simulation.unity.module.UnityBridgeModule", + "video-arm-teleop-module": "dimos.teleop.quest.quest_extensions.VideoArmTeleopModule", "vlm-agent": "dimos.agents.vlm_agent.VLMAgent", "vlm-stream-tester": "dimos.agents.vlm_stream_tester.VlmStreamTester", "voxel-grid-mapper": "dimos.mapping.voxels.VoxelGridMapper", diff --git a/dimos/robot/test_all_blueprints.py b/dimos/robot/test_all_blueprints.py index 876fe70748..0bb677ebc5 100644 --- a/dimos/robot/test_all_blueprints.py +++ b/dimos/robot/test_all_blueprints.py @@ -51,10 +51,12 @@ "coordinator-xarm7", "dual-xarm6-planner", "teleop-quest-dual", + "teleop-quest-go2", "teleop-quest-piper", "teleop-quest-rerun", "teleop-quest-xarm6", "teleop-quest-xarm7", + "teleop-quest-xarm7-video", "unitree-g1-nav-sim", "xarm-perception", "xarm-perception-agent", diff --git a/dimos/teleop/quest/blueprints.py b/dimos/teleop/quest/blueprints.py index 6a30638832..db8bb284e2 100644 --- a/dimos/teleop/quest/blueprints.py +++ b/dimos/teleop/quest/blueprints.py @@ -19,6 +19,7 @@ hardware. The underlying coordinator blueprints branch on `global_config.simulation`. """ +from dimos.constants import DEFAULT_CAPACITY_COLOR_IMAGE from dimos.control.blueprints.teleop import ( coordinator_teleop_dual, coordinator_teleop_piper, @@ -26,9 +27,16 @@ coordinator_teleop_xarm7, ) from dimos.core.coordination.blueprints import autoconnect -from dimos.core.transport import LCMTransport +from dimos.core.transport import LCMTransport, pSHMTransport from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped -from dimos.teleop.quest.quest_extensions import ArmTeleopModule +from dimos.msgs.geometry_msgs.Twist import Twist +from dimos.msgs.sensor_msgs.Image import Image +from dimos.robot.unitree.go2.connection import GO2Connection +from dimos.teleop.quest.quest_extensions import ( + ArmTeleopModule, + Go2TeleopModule, + VideoArmTeleopModule, +) from dimos.teleop.quest.quest_types import Buttons from dimos.visualization.vis_module import vis_module @@ -59,6 +67,21 @@ ) +# XArm7 teleop + camera streaming into the Quest scene as a panel. +teleop_quest_xarm7_video = autoconnect( + VideoArmTeleopModule.blueprint(task_names={"right": "teleop_xarm"}), + coordinator_teleop_xarm7, +).transports( + { + ("right_controller_output", PoseStamped): LCMTransport( + "/coordinator/cartesian_command", PoseStamped + ), + ("buttons", Buttons): LCMTransport("/teleop/buttons", Buttons), + ("color_image", Image): LCMTransport("/teleop/color_image", Image), + } +) + + # Piper teleop (sim with --simulation, real otherwise): left controller -> piper arm teleop_quest_piper = autoconnect( ArmTeleopModule.blueprint(task_names={"left": "teleop_piper"}), @@ -104,10 +127,30 @@ ) +# Go2 quadruped: thumbstick velocity teleop + camera streamed to the headset. +teleop_quest_go2 = ( + autoconnect( + Go2TeleopModule.blueprint(), + GO2Connection.blueprint(), + ) + .transports( + { + ("cmd_vel", Twist): LCMTransport("/cmd_vel", Twist), + ("color_image", Image): pSHMTransport( + "color_image", default_capacity=DEFAULT_CAPACITY_COLOR_IMAGE + ), + } + ) + .global_config(robot_model="unitree_go2") +) + + __all__ = [ "teleop_quest_dual", + "teleop_quest_go2", "teleop_quest_piper", "teleop_quest_rerun", "teleop_quest_xarm6", "teleop_quest_xarm7", + "teleop_quest_xarm7_video", ] diff --git a/dimos/teleop/quest/quest_extensions.py b/dimos/teleop/quest/quest_extensions.py index 4860ccfa4c..7b1865c4b2 100644 --- a/dimos/teleop/quest/quest_extensions.py +++ b/dimos/teleop/quest/quest_extensions.py @@ -17,18 +17,63 @@ Available subclasses: - ArmTeleopModule: Per-hand press-and-hold engage (X/A hold to track), task name routing - TwistTeleopModule: Outputs Twist instead of PoseStamped + - VideoArmTeleopModule: ArmTeleopModule + JPEG frames pushed to the Quest over /ws + - Go2TeleopModule: Thumbstick → Twist velocity for the Go2 + camera over /ws """ +import asyncio from typing import Any +from fastapi import WebSocket from pydantic import Field from dimos.core.core import rpc -from dimos.core.stream import Out +from dimos.core.stream import In, Out from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped +from dimos.msgs.geometry_msgs.Twist import Twist from dimos.msgs.geometry_msgs.TwistStamped import TwistStamped +from dimos.msgs.geometry_msgs.Vector3 import Vector3 +from dimos.msgs.sensor_msgs.Image import Image from dimos.teleop.quest.quest_teleop_module import Hand, QuestTeleopConfig, QuestTeleopModule from dimos.teleop.quest.quest_types import Buttons, QuestControllerState +from dimos.utils.logging_config import setup_logger + +logger = setup_logger() + + +async def _ws_send_jpeg(ws: WebSocket, data: bytes) -> None: + try: + await ws.send_bytes(data) + except Exception: + # Client closed or write failed — drop the frame; the base /ws + # disconnect handler evicts the dead client. + pass + + +def _push_jpeg(module: QuestTeleopModule, msg: Image, quality: int) -> None: + """JPEG-encode an Image and push it to all of module's connected /ws clients. + + Runs on the RX thread; sends are scheduled on the asyncio loop captured by + QuestTeleopModule when the first client connected. + """ + # Snapshot clients under the lock to avoid concurrent set mutation from + # the uvicorn thread. Skip the encode entirely if nobody is listening. + loop = module._ws_loop + if loop is None: + return + with module._clients_lock: + clients = tuple(module._connected_clients) + if not clients: + return + + try: + jpeg = msg.to_jpeg_bytes(quality=quality) + except Exception: + logger.exception("Failed to encode camera frame") + return + + for ws in clients: + asyncio.run_coroutine_threadsafe(_ws_send_jpeg(ws, jpeg), loop) class TwistTeleopConfig(QuestTeleopConfig): @@ -148,3 +193,93 @@ def _publish_button_state( right=right.trigger if right is not None else 0.0, ) self.buttons.publish(buttons) + + +class VideoArmTeleopConfig(ArmTeleopConfig): + """Configuration for VideoArmTeleopModule.""" + + video_jpeg_quality: int = 70 + + +class VideoArmTeleopModule(ArmTeleopModule): + """ArmTeleopModule + camera frames pushed to the Quest as JPEG over /ws. + + Subscribes to color_image, JPEG-encodes each frame, and broadcasts raw + JPEG bytes to every connected /ws client as a binary message. The client + decodes via createObjectURL and uploads to a WebGL texture. + + Inputs: + - color_image: In[Image] (required — wire to a camera output) + + Outputs: + - left_controller_output: PoseStamped (inherited) + - right_controller_output: PoseStamped (inherited) + - buttons: Buttons (inherited) + """ + + config: VideoArmTeleopConfig + + color_image: In[Image] + + async def handle_color_image(self, msg: Image) -> None: + _push_jpeg(self, msg, self.config.video_jpeg_quality) + + +class Go2TeleopConfig(QuestTeleopConfig): + """Configuration for Go2TeleopModule.""" + + linear_speed: float = 0.5 # m/s at full stick deflection + angular_speed: float = 0.8 # rad/s at full stick deflection + deadzone: float = 0.1 + video_jpeg_quality: int = 70 + + +class Go2TeleopModule(QuestTeleopModule): + """Quest teleop for the Unitree Go2: thumbstick driving + camera in the headset. + + Velocity is derived from the controller thumbsticks as each Joy message + arrives (left stick → forward/strafe, right stick → yaw) and published on + cmd_vel for GO2Connection.move. The Go2 camera (color_image) is JPEG-encoded + and pushed to the headset over /ws. A deadzone suppresses stick drift. + + Inputs: + - color_image: In[Image] (wire to the Go2 camera output) + + Outputs: + - cmd_vel: Twist (base velocity command) + """ + + config: Go2TeleopConfig + + color_image: In[Image] + cmd_vel: Out[Twist] + + def _deadzone(self, v: float) -> float: + return 0.0 if abs(v) < self.config.deadzone else v + + def _on_joy_bytes(self, data: bytes) -> None: + super()._on_joy_bytes(data) + with self._lock: + left = self._controllers.get(Hand.LEFT) + right = self._controllers.get(Hand.RIGHT) + twist = Twist() + twist.linear = Vector3(0.0, 0.0, 0.0) + twist.angular = Vector3(0.0, 0.0, 0.0) + if left is not None: + twist.linear.x = -self._deadzone(left.thumbstick.y) * self.config.linear_speed + twist.linear.y = -self._deadzone(left.thumbstick.x) * self.config.linear_speed + if right is not None: + twist.angular.z = -self._deadzone(right.thumbstick.x) * self.config.angular_speed + self.cmd_vel.publish(twist) + + async def handle_color_image(self, msg: Image) -> None: + _push_jpeg(self, msg, self.config.video_jpeg_quality) + + @rpc + def stop(self) -> None: + # Send one zero Twist so the base halts if teleop dies mid-motion. + try: + self.cmd_vel.publish(Twist.zero()) + except Exception: + logger.exception("Failed to publish stop Twist") + super().stop() diff --git a/dimos/teleop/quest/quest_teleop_module.py b/dimos/teleop/quest/quest_teleop_module.py index cd1e5aea42..f1eab00df9 100644 --- a/dimos/teleop/quest/quest_teleop_module.py +++ b/dimos/teleop/quest/quest_teleop_module.py @@ -21,6 +21,7 @@ deltas, and publishes PoseStamped commands. """ +import asyncio from dataclasses import dataclass from enum import IntEnum from pathlib import Path @@ -125,6 +126,13 @@ def __init__(self, **kwargs: Any) -> None: LCMJoy._get_packed_fingerprint(): self._on_joy_bytes, } + # Tracked here so subclasses can push from non-asyncio threads. + # _clients_lock guards add/discard/snapshot of the set across the + # uvicorn thread and the RX subscriber thread. + self._connected_clients: set[WebSocket] = set() + self._clients_lock = threading.Lock() + self._ws_loop: asyncio.AbstractEventLoop | None = None + self._setup_routes() def _setup_routes(self) -> None: @@ -143,6 +151,9 @@ async def teleop_index() -> HTMLResponse: @self._web_server.app.websocket("/ws") async def websocket_endpoint(ws: WebSocket) -> None: await ws.accept() + self._ws_loop = asyncio.get_running_loop() + with self._clients_lock: + self._connected_clients.add(ws) logger.info("Quest client connected") try: while True: @@ -157,6 +168,9 @@ async def websocket_endpoint(ws: WebSocket) -> None: logger.info("Quest client disconnected") except Exception: logger.exception("WebSocket error") + finally: + with self._clients_lock: + self._connected_clients.discard(ws) @rpc def start(self) -> None: diff --git a/dimos/teleop/quest/web/static/index.html b/dimos/teleop/quest/web/static/index.html index 8d31289f88..b23a8362b1 100644 --- a/dimos/teleop/quest/web/static/index.html +++ b/dimos/teleop/quest/web/static/index.html @@ -4,101 +4,7 @@