diff --git a/dimos/agents/annotation.py b/dimos/agents/annotation.py index ed4c6235e5..847156ed43 100644 --- a/dimos/agents/annotation.py +++ b/dimos/agents/annotation.py @@ -18,7 +18,7 @@ import inspect import threading import time -from typing import Any, TypeVar, cast +from typing import Any, Literal, TypeVar, cast, overload from dimos.core.core import rpc from dimos.utils.logging_config import setup_logger @@ -29,6 +29,9 @@ _SKILL_CONTEXT = threading.local() +SkillLifecycle = Literal["instant", "background"] +_VALID_LIFECYCLES = ("instant", "background") + def current_skill_context() -> dict[str, Any] | None: """Return the per-call context for the currently executing `@skill`. @@ -66,7 +69,7 @@ def _stamp_and_log(func_name: str, result: Any, elapsed_ms: float) -> Any: return result -def skill(func: F) -> F: +def _make_skill(func: F, uses: list[str], lifecycle: SkillLifecycle) -> F: if inspect.iscoroutinefunction(func): @functools.wraps(func) @@ -108,4 +111,42 @@ def sync_context_wrapper(*args: Any, **kwargs: Any) -> Any: wrapped = rpc(context_wrapper) wrapped.__skill__ = True # type: ignore[attr-defined] + wrapped.__skill_uses__ = list(uses) # type: ignore[attr-defined] + wrapped.__skill_lifecycle__ = lifecycle # type: ignore[attr-defined] return cast("F", wrapped) + + +@overload +def skill(func: F) -> F: ... +@overload +def skill(*, uses: list[str] | None = ..., lifecycle: SkillLifecycle = ...) -> Callable[[F], F]: ... + + +def skill( + func: F | None = None, + *, + uses: list[str] | None = None, + lifecycle: SkillLifecycle = "instant", +) -> F | Callable[[F], F]: + """Mark a method as an agent-callable skill. + + Supports both bare-form `@skill` and parameterized `@skill(uses=[...], lifecycle=...)`. + + `uses` declares capabilities the skill needs (e.g. `["movement"]`). The MCP + server uses these to refuse the call when another skill is already holding + a required capability. Default: no capabilities. + + `lifecycle` is `"instant"` (default) for skills that finish their work before + returning, or `"background"` for skills that kick off background work and + return early -- those must use `start_tool`/`stop_tool` so the matching + stop-tool frame can release their capabilities. + """ + if lifecycle not in _VALID_LIFECYCLES: + raise ValueError(f"lifecycle must be one of {_VALID_LIFECYCLES}, got {lifecycle!r}") + if func is not None: + return _make_skill(func, uses=[], lifecycle="instant") + + def decorator(f: F) -> F: + return _make_skill(f, uses=list(uses or []), lifecycle=lifecycle) + + return decorator diff --git a/dimos/agents/capabilities.py b/dimos/agents/capabilities.py new file mode 100644 index 0000000000..0f64d0af65 --- /dev/null +++ b/dimos/agents/capabilities.py @@ -0,0 +1,139 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Capability registry for skill-level mutual exclusion. + +A skill declares the capabilities it occupies via `@skill(uses=[...])`. The MCP +server consults a process-wide `CapabilityRegistry` before dispatching each +`tools/call`. When a required capability is held by another skill, the server +either waits briefly for it to clear (for short, self-completing holders) or +refuses with a plain text "Cannot start X: capability Y is held by Z" result, in +which case the LLM decides what to do (typically: call the appropriate stop tool, +then retry). + +Capabilities are plain strings. Today the only declared capability is +`CAP_MOVEMENT`. New capabilities should be added as constants here so they are +discoverable from one place. +""" + +from __future__ import annotations + +from collections.abc import Callable +import threading +import time +from typing import NamedTuple + +CAP_MOVEMENT = "movement" + + +class _Hold(NamedTuple): + """Who currently holds a capability. + + `tool_name` is the human-meaningful skill name (used for conflict messages + and the same-tool takeover decision). `token` is unique per invocation and + scopes release, so a stale invocation can't free a live hold. + """ + + tool_name: str + token: str + + +class CapabilityRegistry: + """In-memory map of capability -> holding invocation. + + All methods are thread-safe. The registry is intentionally simple: every + capability is exclusive (no shared/exclusive distinction yet). On conflict + `acquire` either refuses immediately (the default try-lock) or, when given a + `timeout`, blocks until the conflicting hold clears or the timeout expires. + + A hold is identified per-invocation by an opaque `token`, not by the tool + name. Two invocations of the *same* tool don't conflict -- the later one + takes over the hold -- but release is scoped to the token, so the earlier + invocation's teardown can't release the capability out from under the later + one. Different tools sharing a capability still conflict. + """ + + def __init__(self) -> None: + self._holders: dict[str, _Hold] = {} + self._cond = threading.Condition() + + def acquire( + self, + caps: list[str], + tool_name: str, + token: str, + *, + timeout: float = 0.0, + can_wait: Callable[[str], bool] | None = None, + ) -> tuple[str, str] | None: + """Atomic all-or-nothing acquire of `caps` for one invocation. + + Conflicts only with a *different* `tool_name`; a same-tool re-acquire is + a takeover that overwrites the holder with the new `token`. + + With the default `timeout=0.0` this is a non-blocking try-lock: it returns + `None` on success or, on conflict, `(cap, current_tool)` for the first + conflicting capability (no caps are acquired in that case). + + With `timeout > 0` it blocks up to `timeout` seconds for a conflicting hold + to clear, re-attempting the all-or-nothing acquire whenever a hold is + released. `can_wait(holder_tool_name)` decides whether to wait on a given + holder; if it returns `False` the conflict is returned immediately instead + of waiting (used to refuse, rather than block on, holders that won't release + on their own). + """ + deadline = time.monotonic() + timeout + with self._cond: + while True: + conflict = self._acquire_locked(caps, tool_name, token) + if conflict is None: + return None + if can_wait is not None and not can_wait(conflict[1]): + return conflict + remaining = deadline - time.monotonic() + if remaining <= 0: + return conflict + self._cond.wait(remaining) + + def _acquire_locked( + self, caps: list[str], tool_name: str, token: str + ) -> tuple[str, str] | None: + """Atomic check-and-set of `caps`; the caller must hold `self._cond`.""" + for cap in caps: + current = self._holders.get(cap) + if current is not None and current.tool_name != tool_name: + return (cap, current.tool_name) + for cap in caps: + self._holders[cap] = _Hold(tool_name, token) + return None + + def release_by_token(self, token: str) -> list[str]: + """Release every capability whose current holder matches `token`. + + A stale token (one already taken over by a newer invocation of the same + tool) matches nothing and releases nothing. Wakes any callers blocked in + `acquire` so they can re-attempt. + """ + with self._cond: + released = [cap for cap, h in self._holders.items() if h.token == token] + for cap in released: + del self._holders[cap] + if released: + self._cond.notify_all() + return released + + def snapshot(self) -> dict[str, str]: + """Map of capability -> holding tool name (for logging/diagnostics).""" + with self._cond: + return {cap: h.tool_name for cap, h in self._holders.items()} diff --git a/dimos/agents/demos/agent_demo_instructions.md b/dimos/agents/demos/agent_demo_instructions.md new file mode 100644 index 0000000000..e1efbada01 --- /dev/null +++ b/dimos/agents/demos/agent_demo_instructions.md @@ -0,0 +1,95 @@ +# Agent Capability Demo Instructions + +This demo runs a simulated warehouse inspection robot. It has no hardware +dependencies. All work is simulated with sleeps and tool-stream updates. + +## Start + +Terminal 1: + +```bash +uv run dimos run demo-capabilities +``` + +Terminal 2: + +```bash +uv run humancli +``` + +## Manual Prompts + +Type these in `humancli`. + +### Sequential vs. Parallel Instant Tools + +```text +Read the battery, read the temperature, and capture a photo at the same time. +``` + +Expected: the three timestamp windows should overlap substantially. + +### Background Tool Streams Without Conflict + +```text +Start patrolling and start an environment scan. +``` + +Expected: `start_patrol` starts and streams `visiting waypoint N` updates. +`start_environment_scan` also starts and streams air readings. These should run +together because environment scanning has no capability. + +### Capability Conflict + +```text +Without stopping patrol first, try to turn in place 90 degrees. If there is a conflict, report the exact error and do not recover. +``` + +Expected: `turn_in_place` is refused because `movement` is held by +`start_patrol`. The response should include text like: + +```text +Cannot start 'turn_in_place': capability 'movement' is held by 'start_patrol'. +``` + +### Release and Retry + +```text +Stop patrol, then turn in place 90 degrees. +``` + +Expected: `stop_patrol` closes the patrol tool stream, releasing `movement`. +`turn_in_place` then succeeds with a 2-second timestamp window. + +### Self-Terminating Background Tool + +```text +Do a lap and start an environment scan. +``` + +Expected: `do_a_lap` streams four checkpoint updates over about 8 seconds and +then stops by itself, releasing `movement`. The environment scan keeps running +until stopped. + +Stop the scan when done: + +```text +Stop the environment scan. +``` + +### Different Capability + +```text +Weigh the sample box and secure it at the same time. +``` + +Expected: one of the two payload tools may be refused while the other holds `payload`. + +### Competing Background Movement Tools + +```text +Start patrol and do a lap at the same time. +``` + +Expected: one movement tool starts. The other should be refused because +`movement` is already held by the first movement tool. diff --git a/dimos/agents/demos/demo_capabilities.py b/dimos/agents/demos/demo_capabilities.py new file mode 100644 index 0000000000..37e087084c --- /dev/null +++ b/dimos/agents/demos/demo_capabilities.py @@ -0,0 +1,319 @@ +# Copyright 2025-2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from threading import Event, Lock, Thread +import time +from typing import Any + +from dimos.agents.annotation import skill +from dimos.agents.capabilities import CAP_MOVEMENT +from dimos.agents.mcp.mcp_client import McpClient +from dimos.agents.mcp.mcp_server import McpServer +from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT +from dimos.core.coordination.blueprints import autoconnect +from dimos.core.core import rpc +from dimos.core.module import Module + +CAP_PAYLOAD = "payload" + +DEMO_CAPABILITIES_PROMPT = """ +You are controlling a simulated warehouse inspection robot. + +Use the available tools to satisfy user requests. The tools are demos: they +sleep, stream status updates, and report timestamps so a human can inspect tool +ordering and capability conflicts. + +When a user asks for multiple independent actions "at the same time", call the +requested tools in the same agent turn. + +Some tools hold exclusive capabilities. If a tool returns "Cannot start ...", +report the exact conflict unless the user explicitly asked you to stop the +blocking tool and retry. + +Don't say things like "Let me know if there's anything else you'd like to do!" +People will prompt you when they want. You don't need to ask for a prompt. +""" + + +def _format_time(t: float) -> str: + return time.strftime("%H:%M:%S", time.localtime(t)) + f".{int(t % 1 * 1000):03d}" + + +def _stamp(label: str, start: float, end: float) -> str: + return f"{label} (started {_format_time(start)}, finished {_format_time(end)})" + + +class DemoSensors(Module): + @rpc + def start(self) -> None: + super().start() + + @rpc + def stop(self) -> None: + super().stop() + + @skill + def read_battery(self) -> str: + """Read the simulated robot battery percentage.""" + start = time.time() + time.sleep(1.0) + end = time.time() + return _stamp("Battery is 83%", start, end) + + @skill + def read_temperature(self) -> str: + """Read the simulated cargo bay temperature.""" + start = time.time() + time.sleep(5.0) + end = time.time() + return _stamp("Cargo bay temperature is 21.6 C", start, end) + + @skill + def capture_photo(self) -> str: + """Capture a simulated inspection photo.""" + start = time.time() + time.sleep(2.0) + end = time.time() + return _stamp("Captured cam0.jpg", start, end) + + @skill + def speak(self, text: str) -> str: + """Say a short message through the simulated robot speaker. + + Args: + text: The words the robot should say. + """ + now = time.time() + return _stamp(f'Robot said: "{text}"', now, now) + + +class DemoRobotActions(Module): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._lock = Lock() + self._patrol_stop = Event() + self._patrol_thread: Thread | None = None + self._lap_stop = Event() + self._lap_thread: Thread | None = None + + @rpc + def start(self) -> None: + super().start() + + @rpc + def stop(self) -> None: + self._stop_patrol() + self._stop_lap() + super().stop() + + @skill(uses=[CAP_MOVEMENT]) + def turn_in_place(self, degrees: float) -> str: + """Turn the simulated robot in place. + + Args: + degrees: Signed turn amount in degrees. Positive turns left, negative turns right. + """ + start = time.time() + time.sleep(2.0) + end = time.time() + return _stamp(f"Turned {degrees:.1f} degrees in place", start, end) + + @skill(uses=[CAP_PAYLOAD]) + def weigh_payload(self, item: str) -> str: + """Weigh an item on the simulated payload scale. + + Args: + item: Name of the item to weigh. + """ + start = time.time() + time.sleep(2.0) + end = time.time() + return _stamp(f"{item} weighs 4.2 kg", start, end) + + @skill(uses=[CAP_PAYLOAD]) + def secure_payload(self, item: str) -> str: + """Secure an item in the simulated payload bay. + + Args: + item: Name of the item to secure. + """ + start = time.time() + time.sleep(3.0) + end = time.time() + return _stamp(f"{item} is secured in the payload bay", start, end) + + @skill(uses=[CAP_MOVEMENT], lifecycle="background") + def start_patrol(self) -> str: + """Start a simulated warehouse patrol that streams waypoint updates.""" + time.sleep(1.0) + # Open (or re-stamp, on a same-tool takeover) the tool-stream before the + # "already running" return so the movement hold is always carried by a + # live stream. + self.start_tool("start_patrol") + with self._lock: + if self._patrol_thread is not None and self._patrol_thread.is_alive(): + return "Patrol is already running. Use stop_patrol to stop it." + + self._patrol_stop.clear() + thread = Thread(target=self._patrol_loop, name="demo-patrol", daemon=True) + self._patrol_thread = thread + thread.start() + return "Patrol started." + + @skill + def stop_patrol(self) -> str: + """Stop the simulated warehouse patrol.""" + self._stop_patrol() + return "Patrol stopped." + + @skill(uses=[CAP_MOVEMENT], lifecycle="background") + def do_a_lap(self) -> str: + """Do one simulated patrol lap, stream progress, then stop automatically.""" + # Open (or re-stamp, on a same-tool takeover) the tool-stream before the + # "already running" return so the movement hold is always carried by a + # live stream. + self.start_tool("do_a_lap") + with self._lock: + if self._lap_thread is not None and self._lap_thread.is_alive(): + return "Lap is already running." + + self._lap_stop.clear() + thread = Thread(target=self._lap_loop, name="demo-lap", daemon=True) + self._lap_thread = thread + thread.start() + return "Lap started. It will stop automatically." + + def _stop_patrol(self) -> None: + thread: Thread | None + with self._lock: + self._patrol_stop.set() + thread = self._patrol_thread + + if thread is not None: + thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) + + with self._lock: + if self._patrol_thread is thread: + self._patrol_thread = None + self.stop_tool("start_patrol") + + def _stop_lap(self) -> None: + thread: Thread | None + with self._lock: + self._lap_stop.set() + thread = self._lap_thread + + if thread is not None: + thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) + + with self._lock: + if self._lap_thread is thread: + self._lap_thread = None + self.stop_tool("do_a_lap") + + def _patrol_loop(self) -> None: + waypoint = 1 + try: + while not self._patrol_stop.wait(timeout=5.0): + self.tool_update("start_patrol", f"visiting waypoint {waypoint}") + waypoint += 1 + finally: + self.stop_tool("start_patrol") + + def _lap_loop(self) -> None: + try: + for step in range(1, 5): + if self._lap_stop.wait(timeout=6.5): + return + self.tool_update("do_a_lap", f"lap checkpoint {step} of 4") + finally: + with self._lock: + self._lap_thread = None + self.stop_tool("do_a_lap") + + +class DemoMonitoring(Module): + def __init__(self, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._lock = Lock() + self._scan_stop = Event() + self._scan_thread: Thread | None = None + + @rpc + def start(self) -> None: + super().start() + + @rpc + def stop(self) -> None: + self._stop_environment_scan() + super().stop() + + @skill(lifecycle="background") + def start_environment_scan(self) -> str: + """Start a simulated environment scan that streams air-quality updates.""" + with self._lock: + if self._scan_thread is not None and self._scan_thread.is_alive(): + return "Environment scan is already running." + + self._scan_stop.clear() + self.start_tool("start_environment_scan") + thread = Thread(target=self._scan_loop, name="demo-environment-scan", daemon=True) + self._scan_thread = thread + thread.start() + return "Environment scan started." + + @skill + def stop_environment_scan(self) -> str: + """Stop the simulated environment scan.""" + self._stop_environment_scan() + return "Environment scan stopped." + + def _stop_environment_scan(self) -> None: + thread: Thread | None + with self._lock: + self._scan_stop.set() + thread = self._scan_thread + + if thread is not None: + thread.join(timeout=DEFAULT_THREAD_JOIN_TIMEOUT) + + with self._lock: + if self._scan_thread is thread: + self._scan_thread = None + self.stop_tool("start_environment_scan") + + def _scan_loop(self) -> None: + reading = 1 + try: + while not self._scan_stop.wait(timeout=4.7): + pm25 = 5 + reading + co2 = 410 + reading * 3 + self.tool_update( + "start_environment_scan", + f"air reading {reading}: PM2.5={pm25} ug/m3, CO2={co2} ppm", + ) + reading += 1 + finally: + self.stop_tool("start_environment_scan") + + +demo_capabilities = autoconnect( + DemoSensors.blueprint(), + DemoRobotActions.blueprint(), + DemoMonitoring.blueprint(), + McpServer.blueprint(), + McpClient.blueprint(system_prompt=DEMO_CAPABILITIES_PROMPT), +) diff --git a/dimos/agents/mcp/mcp_server.py b/dimos/agents/mcp/mcp_server.py index dbd31f8d87..2b792ef59a 100644 --- a/dimos/agents/mcp/mcp_server.py +++ b/dimos/agents/mcp/mcp_server.py @@ -20,6 +20,7 @@ import os import time from typing import TYPE_CHECKING, Any +import uuid from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware @@ -29,6 +30,7 @@ import uvicorn from dimos.agents.annotation import skill +from dimos.agents.capabilities import CapabilityRegistry from dimos.agents.mcp import tool_stream from dimos.core.core import rpc from dimos.core.module import Module @@ -43,6 +45,12 @@ _SSE_KEEPALIVE_INTERVAL = 20.0 # seconds +# How long a `tools/call` waits for a capability held by a short, self-completing +# (instant) skill before refusing. Well under the MCP client's 120s HTTP timeout. +# Background holders run until stopped, so they are never waited on (see +# `_can_wait` in `_handle_tools_call`). +DEFAULT_CAP_ACQUIRE_TIMEOUT = 30.0 # seconds + app = FastAPI() app.add_middleware( CORSMiddleware, @@ -51,9 +59,12 @@ allow_headers=["*"], ) app.state.skills = [] +app.state.skills_by_name = {} app.state.rpc_calls = {} app.state.sse_queues = [] app.state.event_loop = None +app.state.cap_registry = CapabilityRegistry() +app.state.cap_acquire_timeout = DEFAULT_CAP_ACQUIRE_TIMEOUT def _jsonrpc_result(req_id: Any, result: Any) -> dict[str, Any]: @@ -89,6 +100,11 @@ def _handle_tools_list(req_id: Any, skills: list[SkillInfo]) -> dict[str, Any]: tool: dict[str, Any] = {"name": s.func_name, "inputSchema": schema} if description: tool["description"] = description + if s.uses or s.lifecycle != "instant": + tool["_meta"] = { + "dimos/uses": list(s.uses), + "dimos/lifecycle": s.lifecycle, + } tools.append(tool) return _jsonrpc_result(req_id, {"tools": tools}) @@ -107,22 +123,91 @@ async def _handle_tools_call( logger.warning("MCP tool not found", tool=name) return _jsonrpc_result_text(req_id, f"Tool not found: {name}") + skill_info = app.state.skills_by_name.get(name) + uses: list[str] = list(skill_info.uses) if skill_info is not None else [] + lifecycle = skill_info.lifecycle if skill_info is not None else "instant" + cap_registry: CapabilityRegistry = app.state.cap_registry + + # A per-invocation token scopes the capability hold, so a stale invocation's + # teardown can't release a hold that a newer same-tool invocation took over. + acquire_token = uuid.uuid4().hex + if uses: + + def _can_wait(holder: str) -> bool: + # Wait only on instant holders; they release when they return. + # Background holders run until explicitly stopped, so refuse instead + # of blocking until the timeout. + info = app.state.skills_by_name.get(holder) + return (info.lifecycle if info is not None else "instant") != "background" + + # Run the (possibly blocking) acquire off the event loop so waiting for a + # busy capability doesn't stall the server. + conflict = await asyncio.get_event_loop().run_in_executor( + None, + lambda: cap_registry.acquire( + uses, + tool_name=name, + token=acquire_token, + timeout=app.state.cap_acquire_timeout, + can_wait=_can_wait, + ), + ) + if conflict is not None: + cap, holder = conflict + logger.info( + "MCP tool refused (capability busy)", + tool=name, + cap=cap, + holder=holder, + snapshot=cap_registry.snapshot(), + ) + # A background holder has a stop tool to call; an instant holder is + # waited on above, so reaching here means it outlasted the timeout. + holder_info = app.state.skills_by_name.get(holder) + holder_lifecycle = holder_info.lifecycle if holder_info is not None else "instant" + if holder_lifecycle == "background": + advice = "Call the appropriate stop tool first, then retry." + else: + advice = "It is taking longer than expected; wait a moment and then retry." + return _jsonrpc_result_text( + req_id, + f"Cannot start '{name}': capability '{cap}' is held by '{holder}'. {advice}", + ) + logger.info("MCP tool call", tool=name, args=args, progress_token=progress_token) t0 = time.monotonic() - # _mcp_context is a reserved kwarg consumed by the `@skill` wrapper; - # it never reaches the user-visible skill signature. + # _mcp_context is a reserved kwarg consumed by the `@skill` wrapper; it never + # reaches the user-visible skill signature. The acquire token rides along so + # a background skill's ToolStream can stamp it on its stop frame for release. call_kwargs = dict(args) + mcp_context: dict[str, Any] = {} if progress_token is not None: - call_kwargs["_mcp_context"] = {"progress_token": progress_token} - + mcp_context["progress_token"] = progress_token + if uses: + mcp_context["acquire_token"] = acquire_token + if mcp_context: + call_kwargs["_mcp_context"] = mcp_context + + # Track whether we still hold the caps so we can release on failure even + # for background skills. On success the background skill keeps them until + # its tool-stream closes. + caps_held = bool(uses) try: - result = await asyncio.get_event_loop().run_in_executor( - None, lambda: rpc_call(**call_kwargs) - ) - except Exception as e: - logger.exception("MCP tool error", tool=name, duration=f"{time.monotonic() - t0:.3f}s") - return _jsonrpc_result_text(req_id, f"Error running tool '{name}': {e}") + try: + result = await asyncio.get_event_loop().run_in_executor( + None, lambda: rpc_call(**call_kwargs) + ) + except Exception as e: + logger.exception("MCP tool error", tool=name, duration=f"{time.monotonic() - t0:.3f}s") + return _jsonrpc_result_text(req_id, f"Error running tool '{name}': {e}") + + if lifecycle == "background": + # Hand ownership of the caps off to the tool-stream lifecycle. + caps_held = False + finally: + if caps_held: + cap_registry.release_by_token(acquire_token) duration = f"{time.monotonic() - t0:.3f}s" response = str(result)[:200] @@ -187,7 +272,23 @@ def _sse_frame(data: dict[str, Any]) -> str: def _fan_out_to_sse_queues(msg: dict[str, Any]) -> None: - """LCM subscriber callback: forward a tool-stream frame to every active SSE client.""" + """LCM subscriber callback: forward a tool-stream frame to every active SSE client. + + Also releases capabilities held by a background skill when its tool-stream + closes (signaled by a ``dimos/tool_stopped`` frame). + """ + if msg.get("method") == tool_stream.TOOL_STREAM_STOPPED_METHOD: + params = msg.get("params") or {} + token = params.get("token") + if token: + released = app.state.cap_registry.release_by_token(token) + if released: + logger.info( + "Capabilities released on tool-stream stop", + holder=params.get("tool_name"), + token=token, + released=released, + ) loop = app.state.event_loop if loop is None: return @@ -282,6 +383,7 @@ def on_system_modules(self, modules: list[RPCClient]) -> None: app.state.skills = [ skill_info for module in modules for skill_info in (module.get_skills() or []) ] + app.state.skills_by_name = {s.func_name: s for s in app.state.skills} app.state.rpc_calls = { skill_info.func_name: RpcCall( None, self.rpc, skill_info.func_name, skill_info.class_name, [] diff --git a/dimos/agents/mcp/test_mcp_server.py b/dimos/agents/mcp/test_mcp_server.py index fd514b0643..ee51902e11 100644 --- a/dimos/agents/mcp/test_mcp_server.py +++ b/dimos/agents/mcp/test_mcp_server.py @@ -16,9 +16,11 @@ import asyncio import json +import threading from unittest.mock import MagicMock -from dimos.agents.mcp.mcp_server import handle_request +from dimos.agents.capabilities import CapabilityRegistry +from dimos.agents.mcp.mcp_server import app, handle_request from dimos.core.module import SkillInfo @@ -161,3 +163,165 @@ def test_mcp_module_initialize_and_unknown() -> None: response = asyncio.run(handle_request({"method": "unknown/method", "id": 2}, [], {})) assert response["error"]["code"] == -32601 + + +def test_mcp_module_injects_acquire_token_for_capability_skill() -> None: + """A capability-using skill receives a per-invocation `acquire_token` in its + `_mcp_context`, and the registry records the hold under the tool name. The + token lets the skill's stop frame release exactly this invocation's hold.""" + schema = json.dumps({"type": "object", "properties": {}}) + mover = SkillInfo( + class_name="TestSkills", + func_name="mover", + args_schema=schema, + uses=("movement",), + lifecycle="background", + ) + rpc_calls = _make_rpc_calls([mover], {"mover": "moving"}) + + # `_handle_tools_call` reads skill metadata and the registry off `app.state`, + # not the `skills` arg; set/restore them so other tests aren't affected. + saved_skills = app.state.skills_by_name + saved_registry = app.state.cap_registry + app.state.skills_by_name = {"mover": mover} + app.state.cap_registry = CapabilityRegistry() + try: + asyncio.run( + handle_request( + {"method": "tools/call", "id": 9, "params": {"name": "mover", "arguments": {}}}, + [mover], + rpc_calls, + ) + ) + ctx = rpc_calls["mover"].call_args.kwargs["_mcp_context"] + assert isinstance(ctx["acquire_token"], str) and ctx["acquire_token"] + # A background skill hands its hold off to the tool-stream lifecycle, so + # the hold persists after the call, keyed by the tool name. + assert app.state.cap_registry.snapshot() == {"movement": "mover"} + finally: + app.state.skills_by_name = saved_skills + app.state.cap_registry = saved_registry + + +def test_refusal_message_distinguishes_holder_lifecycle() -> None: + """The conflict message tells the LLM to call a stop tool for a background + holder, but to wait for an instant holder (which has no stop tool).""" + schema = json.dumps({"type": "object", "properties": {}}) + requester = SkillInfo( + class_name="TestSkills", + func_name="follow_person", + args_schema=schema, + uses=("movement",), + lifecycle="background", + ) + + def _refusal_text(holder: SkillInfo) -> str: + rpc_calls = _make_rpc_calls([requester], {"follow_person": "ok"}) + saved_skills = app.state.skills_by_name + saved_registry = app.state.cap_registry + saved_timeout = app.state.cap_acquire_timeout + app.state.skills_by_name = {holder.func_name: holder, "follow_person": requester} + registry = CapabilityRegistry() + registry.acquire(["movement"], tool_name=holder.func_name, token="held") + app.state.cap_registry = registry + # An instant holder is waited on; keep that wait short so the test that + # exercises the timeout path stays fast. + app.state.cap_acquire_timeout = 0.05 + try: + response = asyncio.run( + handle_request( + { + "method": "tools/call", + "id": 1, + "params": {"name": "follow_person", "arguments": {}}, + }, + [requester], + rpc_calls, + ) + ) + # The requester is refused, so its RPC never runs. + rpc_calls["follow_person"].assert_not_called() + return response["result"]["content"][0]["text"] + finally: + app.state.skills_by_name = saved_skills + app.state.cap_registry = saved_registry + app.state.cap_acquire_timeout = saved_timeout + + background_text = _refusal_text( + SkillInfo( + class_name="TestSkills", + func_name="start_patrol", + args_schema=schema, + uses=("movement",), + lifecycle="background", + ) + ) + assert "held by 'start_patrol'" in background_text + assert "stop tool" in background_text + + instant_text = _refusal_text( + SkillInfo( + class_name="TestSkills", + func_name="turn_in_place", + args_schema=schema, + uses=("movement",), + lifecycle="instant", + ) + ) + assert "held by 'turn_in_place'" in instant_text + assert "wait" in instant_text.lower() + assert "stop tool" not in instant_text + + +def test_instant_holder_conflict_waits_then_runs() -> None: + """A call blocked by an *instant* holder waits for the hold to clear and then + runs, instead of being refused. This is what lets two same-capability instant + tools requested 'at the same time' both succeed (serialized).""" + schema = json.dumps({"type": "object", "properties": {}}) + holder = SkillInfo( + class_name="TestSkills", + func_name="weigh_payload", + args_schema=schema, + uses=("payload",), + lifecycle="instant", + ) + requester = SkillInfo( + class_name="TestSkills", + func_name="secure_payload", + args_schema=schema, + uses=("payload",), + lifecycle="instant", + ) + rpc_calls = _make_rpc_calls([requester], {"secure_payload": "secured"}) + + saved_skills = app.state.skills_by_name + saved_registry = app.state.cap_registry + saved_timeout = app.state.cap_acquire_timeout + app.state.skills_by_name = {"weigh_payload": holder, "secure_payload": requester} + registry = CapabilityRegistry() + registry.acquire(["payload"], tool_name="weigh_payload", token="held") + app.state.cap_registry = registry + app.state.cap_acquire_timeout = 2.0 + # Free the holder shortly after the requester starts waiting on it. + releaser = threading.Timer(0.1, registry.release_by_token, args=("held",)) + releaser.start() + try: + response = asyncio.run( + handle_request( + { + "method": "tools/call", + "id": 1, + "params": {"name": "secure_payload", "arguments": {}}, + }, + [requester], + rpc_calls, + ) + ) + # It waited for the hold to clear, then actually ran the tool. + assert response["result"]["content"][0]["text"] == "secured" + rpc_calls["secure_payload"].assert_called_once() + finally: + releaser.join() + app.state.skills_by_name = saved_skills + app.state.cap_registry = saved_registry + app.state.cap_acquire_timeout = saved_timeout diff --git a/dimos/agents/mcp/test_tool_stream.py b/dimos/agents/mcp/test_tool_stream.py index 73e42c55ef..0979163335 100644 --- a/dimos/agents/mcp/test_tool_stream.py +++ b/dimos/agents/mcp/test_tool_stream.py @@ -22,14 +22,17 @@ from dimos.agents import annotation as annotation_module from dimos.agents.annotation import skill +from dimos.agents.capabilities import CapabilityRegistry from dimos.agents.mcp.mcp_adapter import McpAdapter from dimos.agents.mcp.mcp_server import McpServer from dimos.agents.mcp.tool_stream import ( NOTIFICATIONS_MESSAGE_METHOD, NOTIFICATIONS_PROGRESS_METHOD, + TOOL_STREAM_STOPPED_METHOD, ToolStream, make_notification, make_progress_notification, + make_stopped_notification, ) from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT from dimos.core.coordination.blueprints import autoconnect @@ -292,6 +295,7 @@ def test_send_publishes_notification(stream_with_transport_mock) -> None: def test_send_after_stop_does_not_raise(stream_with_transport_mock) -> None: stream, mock_transport = stream_with_transport_mock stream.stop() + mock_transport.reset_mock() stream.send("should be ignored") mock_transport.publish.assert_not_called() @@ -303,11 +307,33 @@ def test_stop_tears_down_transport(stream_with_transport_mock) -> None: mock_transport.stop.assert_called_once() -def test_stop_without_send_does_nothing(stream_with_transport_mock) -> None: +def test_stop_emits_stopped_frame(stream_with_transport_mock) -> None: + """`stop()` publishes a `dimos/tool_stopped` frame so subscribers (e.g. + McpServer's capability registry) know the background skill released.""" stream, mock_transport = stream_with_transport_mock + stream.send("kick off transport") + mock_transport.reset_mock() stream.stop() - mock_transport.start.assert_not_called() - mock_transport.stop.assert_not_called() + publish_calls = mock_transport.publish.call_args_list + assert len(publish_calls) == 1 + frame = publish_calls[0].args[0] + assert frame["method"] == TOOL_STREAM_STOPPED_METHOD + assert frame["params"]["tool_name"] == "test_tool" + + +def test_stop_without_send_still_emits_stopped_frame(stream_with_transport_mock) -> None: + """The stopped frame must go out even when no `send()` ever happened, so + `start_tool`/`stop_tool` pairs that emit no progress updates (e.g. + `start_patrol`) still release their capability hold.""" + stream, mock_transport = stream_with_transport_mock + stream.stop() + mock_transport.start.assert_called_once() + publish_calls = mock_transport.publish.call_args_list + assert len(publish_calls) == 1 + frame = publish_calls[0].args[0] + assert frame["method"] == TOOL_STREAM_STOPPED_METHOD + assert frame["params"]["tool_name"] == "test_tool" + mock_transport.stop.assert_called_once() def test_double_stop_is_safe(stream_with_transport_mock) -> None: @@ -327,6 +353,35 @@ def test_make_notification_shape() -> None: } +def test_make_stopped_notification_shape() -> None: + assert make_stopped_notification("patrol") == { + "jsonrpc": "2.0", + "method": TOOL_STREAM_STOPPED_METHOD, + "params": {"tool_name": "patrol"}, + } + + +def test_make_stopped_notification_includes_token() -> None: + assert make_stopped_notification("patrol", "tok-1") == { + "jsonrpc": "2.0", + "method": TOOL_STREAM_STOPPED_METHOD, + "params": {"tool_name": "patrol", "token": "tok-1"}, + } + + +def test_stop_frame_carries_acquire_token(mocker, skill_context) -> None: + """A capability-using skill's stop frame carries its acquire token so the + McpServer can release the hold for that specific invocation.""" + skill_context({"acquire_token": "tok-1"}) + mock_transport = mocker.MagicMock() + mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport) + stream = ToolStream("follow_person") + stream.stop() + frame = mock_transport.publish.call_args.args[0] + assert frame["method"] == TOOL_STREAM_STOPPED_METHOD + assert frame["params"] == {"tool_name": "follow_person", "token": "tok-1"} + + def test_make_progress_notification_shape() -> None: frame = make_progress_notification( "pt-abc", progress=2, message="Halfway", tool_name="fan", total=5 @@ -447,9 +502,11 @@ def start(self, name: str) -> str: @skill def double_start(self, name: str) -> str: + # The second call is a same-tool re-invoke (capability takeover): it + # re-stamps the live stream rather than raising or opening a second one. self.start_tool(name) - self.start_tool(name) # should raise - return "unreachable" + self.start_tool(name) + return "ok" @pytest.fixture() @@ -471,14 +528,102 @@ def tool_helper_module(mocker): module._close_all_tools() -def test_start_tool_duplicate_raises(tool_helper_module, skill_context) -> None: +def test_start_tool_duplicate_restamps(tool_helper_module, skill_context) -> None: + """A same-tool re-invoke re-stamps the live stream's acquire token instead + of raising or opening a second stream (capability takeover). This lets + background skills call `start_tool` unconditionally before any early return. + """ module, _ = tool_helper_module + skill_context({"acquire_token": "T1"}) module.start_tool("job") - with pytest.raises(RuntimeError, match="already active"): - module.start_tool("job") + assert module._tools["job"]._acquire_token == "T1" + + skill_context({"acquire_token": "T2"}) + module.start_tool("job") # re-invoke: no raise, no second stream + assert list(module._tools) == ["job"] + assert module._tools["job"]._acquire_token == "T2" + module.stop_tool("job") +def test_start_tool_restamp_updates_stop_frame_token(tool_helper_module, skill_context) -> None: + """After a takeover re-invoke, the single live stream's stop frame carries + the new token, so the registry releases *this* hold. Regression for the leak + where the stop frame carried the superseded token and released nothing. + """ + module, mock_transport = tool_helper_module + skill_context({"acquire_token": "T1"}) + module.start_tool("job") + skill_context({"acquire_token": "T2"}) + module.start_tool("job") + module.stop_tool("job") + + stop_frames = [ + c.args[0] + for c in mock_transport.publish.call_args_list + if c.args[0].get("method") == TOOL_STREAM_STOPPED_METHOD + ] + assert len(stop_frames) == 1 + assert stop_frames[0]["params"] == {"tool_name": "job", "token": "T2"} + + +def test_rebind_acquire_token_noops_when_closed_or_no_context(mocker, skill_context) -> None: + """`rebind_acquire_token` updates the live token, but is a no-op outside a + skill context or once the stream is closed.""" + mock_transport = mocker.MagicMock() + mocker.patch("dimos.agents.mcp.tool_stream.pLCMTransport", return_value=mock_transport) + + skill_context({"acquire_token": "T1"}) + stream = ToolStream("job") + assert stream._acquire_token == "T1" + + skill_context({"acquire_token": "T2"}) + stream.rebind_acquire_token() + assert stream._acquire_token == "T2" + + skill_context(None) # outside any skill -> no-op + stream.rebind_acquire_token() + assert stream._acquire_token == "T2" + + skill_context({"acquire_token": "T3"}) + stream.stop() # closed -> no-op + stream.rebind_acquire_token() + assert stream._acquire_token == "T2" + + +def test_takeover_then_stop_releases_capability(tool_helper_module, skill_context) -> None: + """End-to-end of the leak the review found: a same-tool re-invoke takes over + the hold (new token), `start_tool` re-stamps the one live stream, and its + single stop frame carries the new token -- so `release_by_token` frees + movement. Before the fix the stop frame carried the superseded token and the + hold leaked permanently. + """ + module, mock_transport = tool_helper_module + registry = CapabilityRegistry() + + # Invocation #1 acquires the hold and opens the stream. + assert registry.acquire(["movement"], tool_name="start_patrol", token="T1") is None + skill_context({"acquire_token": "T1"}) + module.start_tool("start_patrol") + + # Invocation #2 (loop still alive) takes over and re-invokes start_tool. + assert registry.acquire(["movement"], tool_name="start_patrol", token="T2") is None + assert registry.snapshot() == {"movement": "start_patrol"} + skill_context({"acquire_token": "T2"}) + module.start_tool("start_patrol") + + # The running loop eventually closes the single stream. + module.stop_tool("start_patrol") + stop_frames = [ + c.args[0] + for c in mock_transport.publish.call_args_list + if c.args[0].get("method") == TOOL_STREAM_STOPPED_METHOD + ] + assert len(stop_frames) == 1 + assert registry.release_by_token(stop_frames[0]["params"]["token"]) == ["movement"] + assert registry.snapshot() == {} + + def test_tool_update_without_start_is_lenient(tool_helper_module) -> None: module, mock_transport = tool_helper_module # No start_tool call. tool_update should warn and return, not raise. @@ -499,7 +644,13 @@ def test_tool_update_routes_to_registered_stream(tool_helper_module, skill_conte module.tool_update("job", "progress 2") module.stop_tool("job") - texts = [c.args[0]["params"]["data"] for c in mock_transport.publish.call_args_list] + # publish() also receives the final `dimos/tool_stopped` frame from stop_tool; + # filter to only the `notifications/message` (data) frames. + texts = [ + c.args[0]["params"]["data"] + for c in mock_transport.publish.call_args_list + if c.args[0].get("method") == NOTIFICATIONS_MESSAGE_METHOD + ] assert texts == ["progress 1", "progress 2"] diff --git a/dimos/agents/mcp/tool_stream.py b/dimos/agents/mcp/tool_stream.py index cb3d55d6db..403539f93a 100644 --- a/dimos/agents/mcp/tool_stream.py +++ b/dimos/agents/mcp/tool_stream.py @@ -46,6 +46,7 @@ TOOL_STREAM_TOPIC = "/tool_streams" NOTIFICATIONS_MESSAGE_METHOD = "notifications/message" NOTIFICATIONS_PROGRESS_METHOD = "notifications/progress" +TOOL_STREAM_STOPPED_METHOD = "dimos/tool_stopped" ToolStreamCallback = Callable[[dict[str, Any]], Any] @@ -81,6 +82,17 @@ def make_progress_notification( return {"jsonrpc": "2.0", "method": NOTIFICATIONS_PROGRESS_METHOD, "params": params} +def make_stopped_notification(tool_name: str, token: str | None = None) -> dict[str, Any]: + params: dict[str, Any] = {"tool_name": tool_name} + if token is not None: + params["token"] = token + return { + "jsonrpc": "2.0", + "method": TOOL_STREAM_STOPPED_METHOD, + "params": params, + } + + def subscribe(callback: ToolStreamCallback) -> Callable[[], None]: """Subscribe to the tool-stream LCM topic and return a cleanup callable.""" transport: pLCMTransport[dict[str, Any]] = pLCMTransport(TOOL_STREAM_TOPIC) @@ -137,8 +149,28 @@ def __init__(self, tool_name: str) -> None: f"detached background thread." ) self._progress_token: str | int | None = context.get("progress_token") + # Capability hold token (set by the MCP server for capability-using + # skills); stamped on the stop frame so release is invocation-scoped. + self._acquire_token: str | None = context.get("acquire_token") self._progress: int = 0 + def rebind_acquire_token(self) -> None: + """Re-stamp this live stream with the current invocation's acquire token. + + Called when `start_tool` reopens an already-active stream -- a same-tool + capability takeover. The newer invocation's hold token must be the one + carried on the eventual stop frame, otherwise the registry would release + a stale token and leak the capability. No-op if there is no skill context + or the stream is already closed. + """ + context = current_skill_context() + if context is None: + return + with self._lock: + if self._closed.is_set(): + return + self._acquire_token = context.get("acquire_token") + def send(self, message: str) -> None: with self._lock: if self._closed.is_set(): @@ -166,11 +198,21 @@ def stop(self) -> None: self._closed.set() transport = self._transport self._transport = None - if transport is not None: - try: - transport.stop() - except Exception: - logger.exception("tool-stream transport stop failed", stream_id=self.id) + # Publish a final "stopped" frame so subscribers (e.g. McpServer's + # capability registry) know the background skill released its hold. + # If no `send()` ever happened we spin up a transport here so the + # lifecycle signal isn't lost. + if transport is None: + transport = pLCMTransport(TOOL_STREAM_TOPIC) + transport.start() + try: + transport.publish(make_stopped_notification(self.tool_name, self._acquire_token)) + except Exception: + logger.exception("tool-stream stopped publish failed", stream_id=self.id) + try: + transport.stop() + except Exception: + logger.exception("tool-stream transport stop failed", stream_id=self.id) @property def is_closed(self) -> bool: diff --git a/dimos/agents/skills/navigation.py b/dimos/agents/skills/navigation.py index d88bec452e..6c58c56c9c 100644 --- a/dimos/agents/skills/navigation.py +++ b/dimos/agents/skills/navigation.py @@ -18,6 +18,7 @@ from reactivex.disposable import Disposable from dimos.agents.annotation import skill +from dimos.agents.capabilities import CAP_MOVEMENT from dimos.core.core import rpc from dimos.core.module import Module from dimos.core.stream import In @@ -110,7 +111,13 @@ def tag_location(self, location_name: str) -> str: logger.info(f"Tagged {location}") return f"Tagged '{location_name}': ({position.x},{position.y})." - @skill + # TODO(capabilities): this skill is `instant`, so the `movement` hold is + # released the moment the call returns even though the tagged-location and + # semantic-map paths only fire set_goal() and keep navigating. Make it + # `background` and close the hold when the robot actually stops -- the + # planner already emits a goal-reached signal (see PatrollingModule) -- so + # patrol/follow/explore can't start over an active navigation goal. + @skill(uses=[CAP_MOVEMENT]) def navigate_with_text(self, query: str) -> str: """Navigate to a location by querying the existing semantic map using natural language. diff --git a/dimos/agents/skills/person_follow.py b/dimos/agents/skills/person_follow.py index 4175898e45..f96737a279 100644 --- a/dimos/agents/skills/person_follow.py +++ b/dimos/agents/skills/person_follow.py @@ -22,6 +22,7 @@ from turbojpeg import TurboJPEG from dimos.agents.annotation import skill +from dimos.agents.capabilities import CAP_MOVEMENT from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig @@ -34,7 +35,6 @@ from dimos.msgs.sensor_msgs.CameraInfo import CameraInfo from dimos.msgs.sensor_msgs.Image import Image, ImageFormat from dimos.msgs.sensor_msgs.PointCloud2 import PointCloud2 -from dimos.navigation.patrolling.patrolling_module_spec import PatrollingModuleSpec from dimos.navigation.visual.query import get_object_bbox_from_image from dimos.navigation.visual_servoing.detection_navigation import DetectionNavigation from dimos.navigation.visual_servoing.visual_servoing_2d import VisualServoing2D @@ -66,7 +66,6 @@ class PersonFollowSkillContainer(Module): _frequency: float = 20.0 # Hz - control loop frequency _max_lost_frames: int = 15 # number of frames to wait before declaring person lost - _patrolling_module_spec: PatrollingModuleSpec def __init__(self, **kwargs: Any) -> None: super().__init__(**kwargs) @@ -116,7 +115,7 @@ def stop(self) -> None: self._vl_model.stop() super().stop() - @skill + @skill(uses=[CAP_MOVEMENT], lifecycle="background") def follow_person( self, query: str, @@ -153,33 +152,45 @@ def follow_person( self._should_stop.clear() - with self._lock: - latest_image = self._latest_image + # Open the tool-stream up front so every early-return path can release + # the `movement` capability via the matching `stop_tool` below. The + # success path (background thread launched) leaves the stream open; + # `_follow_loop`'s teardown closes it. + self.start_tool("follow_person") + background_launched = False + try: + with self._lock: + latest_image = self._latest_image - if latest_image is None: - return "No image available to detect person." - - detection_image: Image | None = None - if initial_bbox is not None: - bbox: BBox = ( - initial_bbox[0], - initial_bbox[1], - initial_bbox[2], - initial_bbox[3], - ) - if initial_image is not None: - detection_image = _decode_base64_image(initial_image) - else: - detected = get_object_bbox_from_image( - self._vl_model, - latest_image, - query, - ) - if detected is None: - return f"Could not find '{query}' in the current view." - bbox = detected - - return self._follow_person(query, bbox, detection_image) + if latest_image is None: + return "No image available to detect person." + + detection_image: Image | None = None + if initial_bbox is not None: + bbox: BBox = ( + initial_bbox[0], + initial_bbox[1], + initial_bbox[2], + initial_bbox[3], + ) + if initial_image is not None: + detection_image = _decode_base64_image(initial_image) + else: + detected = get_object_bbox_from_image( + self._vl_model, + latest_image, + query, + ) + if detected is None: + return f"Could not find '{query}' in the current view." + bbox = detected + + result = self._follow_person(query, bbox, detection_image) + background_launched = self._thread is not None + return result + finally: + if not background_launched: + self.stop_tool("follow_person") @skill def stop_following(self) -> str: @@ -238,7 +249,9 @@ def _follow_person( logger.info(f"EdgeTAM initialized with {len(initial_detections)} detections") - self.start_tool("follow_person") + # The tool-stream was opened in the parent `follow_person` skill; + # leaving `self._thread` set is the signal the parent uses to keep it + # open for the background loop instead of closing on early return. self._thread = Thread(target=self._follow_loop, args=(tracker, query), daemon=True) self._thread.start() @@ -246,15 +259,6 @@ def _follow_person( "Found the person. Starting to follow. You can stop following by calling " "the 'stop_following' tool. You will receive streaming updates." ) - - if self._patrolling_module_spec.is_patrolling(): - message += ( - " Note: since the robot was patrolling, this has been stopped automatically " - "(the equivalent of calling the `stop_patrol` tool call) so you don't have " - "to do it. " - ) - self._patrolling_module_spec.stop_patrol() - return message def _follow_loop(self, tracker: "EdgeTAMProcessor", query: str) -> None: diff --git a/dimos/agents/system_prompt.py b/dimos/agents/system_prompt.py index 54f713f538..fd4830d0be 100644 --- a/dimos/agents/system_prompt.py +++ b/dimos/agents/system_prompt.py @@ -26,10 +26,14 @@ # SKILL COORDINATION +## Capability Conflicts +Some skills hold a shared capability (e.g. `movement`). A call that needs a busy capability waits briefly for a short one-shot action to finish, so asking for two such actions at once just runs them back to back. If a tool call still returns "Cannot start 'X': capability 'Y' is held by 'Z'": +- If Z is a background skill (one you stop with a separate tool, e.g. patrol, follow, explore), call its stop tool, then retry your original call. +- Otherwise Z is taking longer than usual; wait a moment, then retry. + ## Navigation Flow - Use `navigate_with_text` for most navigation. It searches tagged locations first, then visible objects, then the semantic map. - Tag important locations with `tag_location` so you can return to them later. -- During `start_exploration`, avoid calling other skills except `stop_movement`. - Always run `execute_sport_command("RecoveryStand")` after dynamic movements (flips, jumps, sit) before navigating. ## GPS Navigation Flow @@ -50,4 +54,6 @@ - Deliveries: announce yourself with `speak`, call `wait` for 5 seconds, then continue. - Pickups: ask for help with `speak`, wait for a response, then continue. +## Terseness +- Don't say things like "Let me know if there's anything else you'd like to do!" People will prompt you when they want. You don't need to ask for a prompt. """ diff --git a/dimos/agents/test_annotation.py b/dimos/agents/test_annotation.py new file mode 100644 index 0000000000..27f41af3df --- /dev/null +++ b/dimos/agents/test_annotation.py @@ -0,0 +1,44 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from dimos.agents.annotation import skill + + +def test_skill_bare_form_has_empty_uses_and_instant_lifecycle(): + @skill + def s() -> str: + return "ok" + + assert s.__skill__ is True + assert list(s.__skill_uses__) == [] + assert s.__skill_lifecycle__ == "instant" + + +def test_skill_parametrized_form_stores_uses_and_lifecycle(): + @skill(uses=["movement"], lifecycle="background") + def s() -> str: + return "ok" + + assert list(s.__skill_uses__) == ["movement"] + assert s.__skill_lifecycle__ == "background" + + +def test_skill_parametrized_with_no_uses_defaults_to_empty(): + @skill(lifecycle="background") + def s() -> str: + return "ok" + + assert list(s.__skill_uses__) == [] + assert s.__skill_lifecycle__ == "background" diff --git a/dimos/agents/test_capabilities.py b/dimos/agents/test_capabilities.py new file mode 100644 index 0000000000..9ce0daf08c --- /dev/null +++ b/dimos/agents/test_capabilities.py @@ -0,0 +1,160 @@ +# Copyright 2026 Dimensional Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import threading +import time + +from dimos.agents.capabilities import CapabilityRegistry + + +def test_acquire_then_release(): + reg = CapabilityRegistry() + assert reg.acquire(["movement"], tool_name="start_patrol", token="t1") is None + assert reg.snapshot() == {"movement": "start_patrol"} + released = reg.release_by_token("t1") + assert released == ["movement"] + assert reg.snapshot() == {} + + +def test_acquire_conflict_reports_existing_tool(): + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="start_patrol", token="t1") + conflict = reg.acquire(["movement"], tool_name="follow_person", token="t2") + assert conflict == ("movement", "start_patrol") + # State is unchanged after a refused acquire. + assert reg.snapshot() == {"movement": "start_patrol"} + + +def test_acquire_multi_cap_is_atomic_on_conflict(): + """If any cap conflicts, no caps are acquired (all-or-nothing).""" + reg = CapabilityRegistry() + reg.acquire(["audio"], tool_name="speak", token="t1") + conflict = reg.acquire(["movement", "audio"], tool_name="multi", token="t2") + assert conflict == ("audio", "speak") + # `movement` must NOT have leaked in. + assert "movement" not in reg.snapshot() + + +def test_same_tool_reacquire_takes_over(): + """Re-acquiring for the same tool succeeds and takes over the hold. + + The McpServer doesn't know whether a `tools/call` is a fresh request or the + LLM re-issuing the same call while a previous background invocation is still + active. The new invocation's token replaces the old one's, so the old + invocation's later release can't free the live hold. + """ + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="start_patrol", token="t1") + assert reg.acquire(["movement"], tool_name="start_patrol", token="t2") is None + assert reg.snapshot() == {"movement": "start_patrol"} + # The superseded token no longer owns anything. + assert reg.release_by_token("t1") == [] + assert reg.snapshot() == {"movement": "start_patrol"} + assert reg.release_by_token("t2") == ["movement"] + + +def test_takeover_release_does_not_free_new_holder(): + """Regression: a re-entrant same-tool invocation's stale stop must not + release the capability held by the invocation that took over. + + Mirrors the re-entrant `follow_person` case: follow #1 (token T1) is active, + follow #2 (token T2) takes over, then follow #1's loop tears down and emits + its stop frame (T1). That release must be a no-op so `movement` stays held. + """ + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="follow_person", token="T1") + reg.acquire(["movement"], tool_name="follow_person", token="T2") + # Old loop's stop frame arrives carrying the superseded token T1. + assert reg.release_by_token("T1") == [] + assert reg.snapshot() == {"movement": "follow_person"} + # New loop's own stop frees it. + assert reg.release_by_token("T2") == ["movement"] + assert reg.snapshot() == {} + + +def test_release_by_token_only_releases_matching(): + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="patrol", token="ta") + reg.acquire(["audio"], tool_name="speak", token="tb") + released = reg.release_by_token("ta") + assert released == ["movement"] + assert reg.snapshot() == {"audio": "speak"} + + +def test_release_by_unknown_token_is_noop(): + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="patrol", token="ta") + assert reg.release_by_token("nobody") == [] + assert reg.snapshot() == {"movement": "patrol"} + + +def test_acquire_default_is_nonblocking(): + """Without a timeout, acquire is the original fail-fast try-lock.""" + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="patrol", token="t1") + start = time.monotonic() + conflict = reg.acquire(["movement"], tool_name="follow", token="t2") + assert conflict == ("movement", "patrol") + assert time.monotonic() - start < 0.5 # returned immediately, did not wait + + +def test_acquire_blocks_until_holder_releases(): + """A blocking acquire waits for the holder to release, then succeeds.""" + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="weigh", token="t1") + + def _release_soon() -> None: + time.sleep(0.1) + reg.release_by_token("t1") + + start = time.monotonic() + releaser = threading.Thread(target=_release_soon) + releaser.start() + conflict = reg.acquire(["movement"], tool_name="secure", token="t2", timeout=2.0) + elapsed = time.monotonic() - start + releaser.join() + + assert conflict is None # acquired once the holder released + assert elapsed >= 0.05 # actually waited rather than failing fast + assert reg.snapshot() == {"movement": "secure"} + + +def test_acquire_times_out_returns_conflict(): + """If the holder never releases, a blocking acquire times out and reports it.""" + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="weigh", token="t1") + start = time.monotonic() + conflict = reg.acquire(["movement"], tool_name="secure", token="t2", timeout=0.1) + elapsed = time.monotonic() - start + assert conflict == ("movement", "weigh") + assert elapsed >= 0.05 # waited out the timeout + # The holder still holds it; nothing leaked. + assert reg.snapshot() == {"movement": "weigh"} + + +def test_acquire_can_wait_false_returns_immediately(): + """`can_wait` returning False refuses without waiting out the timeout.""" + reg = CapabilityRegistry() + reg.acquire(["movement"], tool_name="patrol", token="t1") + start = time.monotonic() + conflict = reg.acquire( + ["movement"], + tool_name="follow", + token="t2", + timeout=5.0, + can_wait=lambda _holder: False, + ) + assert conflict == ("movement", "patrol") + assert time.monotonic() - start < 0.5 # did not block for the 5s timeout diff --git a/dimos/core/module.py b/dimos/core/module.py index f2aed9d185..26a2b6f893 100644 --- a/dimos/core/module.py +++ b/dimos/core/module.py @@ -69,6 +69,8 @@ class SkillInfo: class_name: str func_name: str args_schema: str + uses: tuple[str, ...] = () + lifecycle: str = "instant" class PeekNotFound: @@ -438,9 +440,15 @@ def get_skills(self) -> list[SkillInfo]: attr = getattr(self, name) if callable(attr) and hasattr(attr, "__skill__"): schema = json.dumps(tool(attr).args_schema.model_json_schema()) + uses = tuple(getattr(attr, "__skill_uses__", ()) or ()) + lifecycle = getattr(attr, "__skill_lifecycle__", "instant") skills.append( SkillInfo( - class_name=self.__class__.__name__, func_name=name, args_schema=schema + class_name=self.__class__.__name__, + func_name=name, + args_schema=schema, + uses=uses, + lifecycle=lifecycle, ) ) return skills @@ -488,28 +496,21 @@ def start_tool(self, name: str) -> None: routed as `notifications/progress` frames bound to the originating `tools/call`. - Raises `RuntimeError` if a tool with the same name is already active on - this module (two concurrent streams for the same logical tool is almost - always a programmer error; `stop_tool` the previous one first). + If a stream named `name` is already active, this is a same-tool re-invoke + (a capability takeover): the live stream is re-stamped with this + invocation's acquire token -- so its eventual stop frame releases *this* + hold -- and no second stream is opened. Background skills can therefore + call `start_tool` unconditionally before any "already running" return. """ # Lazy import from dimos.agents.mcp.tool_stream import ToolStream - stream = ToolStream(name) with self._tools_lock: - # Defensive check to prevent duplicate tools with the same name. - if name in self._tools: - # Unwind the already-constructed stream before raising so the - # LCM transport doesn't leak. - try: - stream.stop() - except Exception: - logger.exception("failed to unwind duplicate ToolStream") - raise RuntimeError( - f"Tool {name!r} is already active on {type(self).__name__}. " - f"Call stop_tool({name!r}) first." - ) - self._tools[name] = stream + existing = self._tools.get(name) + if existing is not None: + existing.rebind_acquire_token() + return + self._tools[name] = ToolStream(name) def tool_update(self, name: str, message: str) -> None: """Publish `message` on the tool-stream channel named `name`. diff --git a/dimos/navigation/frontier_exploration/test_wavefront_frontier_goal_selector.py b/dimos/navigation/frontier_exploration/test_wavefront_frontier_goal_selector.py index 834897d396..c05646eb98 100644 --- a/dimos/navigation/frontier_exploration/test_wavefront_frontier_goal_selector.py +++ b/dimos/navigation/frontier_exploration/test_wavefront_frontier_goal_selector.py @@ -366,3 +366,23 @@ def test_performance_timing() -> None: assert result["goal_time"] < 1.5, f"Goal selection too slow: {result['goal_time']}s" print("\nPerformance test passed - all operations completed within time limits") + + +def test_exploration_loop_releases_movement_on_exit(explorer, mocker) -> None: + """`_exploration_loop` closes the `begin_exploration` tool-stream on every + exit path, so natural completion (no-gain / consecutive failures) releases + the `movement` capability -- not only the explicit `end_exploration` path. + """ + stop_spy = mocker.patch.object(explorer, "stop_tool") + + # Inner loop self-terminates normally. + mocker.patch.object(explorer, "_run_exploration_loop", return_value=None) + explorer._exploration_loop() + stop_spy.assert_called_once_with("begin_exploration") + + # An unexpected error still releases the hold via `finally`. + stop_spy.reset_mock() + mocker.patch.object(explorer, "_run_exploration_loop", side_effect=RuntimeError("boom")) + with pytest.raises(RuntimeError): + explorer._exploration_loop() + stop_spy.assert_called_once_with("begin_exploration") diff --git a/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py b/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py index 338d10d9b0..93af3b398a 100644 --- a/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py +++ b/dimos/navigation/frontier_exploration/wavefront_frontier_goal_selector.py @@ -30,6 +30,7 @@ from reactivex.disposable import Disposable from dimos.agents.annotation import skill +from dimos.agents.capabilities import CAP_MOVEMENT from dimos.constants import DEFAULT_THREAD_JOIN_TIMEOUT from dimos.core.core import rpc from dimos.core.module import Module, ModuleConfig @@ -764,6 +765,19 @@ def is_exploration_active(self) -> bool: return self.exploration_active def _exploration_loop(self) -> None: + """Run the exploration loop and release the movement hold on exit. + + Exploration can end several ways: explicit `end_exploration`, no-gain or + consecutive-failure self-termination, or an unexpected error. Closing the + tool-stream in `finally` releases the `movement` capability in every case + (`end_exploration`'s own `stop_tool` is then a safe no-op). + """ + try: + self._run_exploration_loop() + finally: + self.stop_tool("begin_exploration") + + def _run_exploration_loop(self) -> None: """Main exploration loop running in separate thread.""" # Track number of goals published goals_published = 0 @@ -833,21 +847,26 @@ def _exploration_loop(self) -> None: ) threading.Event().wait(2.0) - @skill + @skill(uses=[CAP_MOVEMENT], lifecycle="background") def begin_exploration(self) -> str: """Command the robot to move around and explore the area. Cancelled with end_exploration.""" + # Open (or re-stamp, on a same-tool takeover) the tool-stream before the + # loop starts, so the movement hold is always carried by a live stream + # and gets released whether exploration ends via `end_exploration` or + # self-terminates (see `_exploration_loop`'s finally). Opening it first + # also avoids a race where a fast-failing loop could `stop_tool` before + # the stream exists. + self.start_tool("begin_exploration") started = self.explore() if not started: return "Exploration skill is already active. Use end_exploration to stop before starting again." - return ( - "Started exploration skill. The robot is now moving. Use end_exploration " - "to stop. You also need to cancel before starting a new movement tool." - ) + return "Started exploration skill. The robot is now moving. Use end_exploration to stop." @skill def end_exploration(self) -> str: """Cancel the exploration. The robot will stop moving and remain where it is.""" stopped = self.stop_exploration() + self.stop_tool("begin_exploration") if stopped: return "Stopped exploration. The robot has stopped moving." else: diff --git a/dimos/navigation/patrolling/module.py b/dimos/navigation/patrolling/module.py index 533165683f..d6ef33c633 100644 --- a/dimos/navigation/patrolling/module.py +++ b/dimos/navigation/patrolling/module.py @@ -19,6 +19,7 @@ from dimos_lcm.std_msgs import Bool from dimos.agents.annotation import skill +from dimos.agents.capabilities import CAP_MOVEMENT from dimos.core.core import rpc from dimos.core.global_config import GlobalConfig, global_config from dimos.core.module import Module @@ -69,9 +70,12 @@ async def handle_global_costmap(self, msg: OccupancyGrid) -> None: async def handle_goal_reached(self, _msg: Bool) -> None: self._goal_reached_event.set() - @skill + @skill(uses=[CAP_MOVEMENT], lifecycle="background") async def start_patrol(self) -> str: """Start patrolling the known area. The robot will continuously pick patrol goals from the router and navigate to them until `stop_patrol` is called.""" + # Open (or re-stamp, on a same-tool takeover) the tool-stream before any + # early return so the movement hold is always carried by a live stream. + self.start_tool("start_patrol") if self._patrol_task is not None and not self._patrol_task.done(): return "Patrol is already running. Use `stop_patrol` to stop." @@ -103,6 +107,9 @@ async def _stop_patrolling(self) -> None: self._patrol_task = None self._planner_spec.set_replanning_enabled(True) self._planner_spec.reset_safe_goal_clearance() + # Closes the tool-stream and releases the `movement` capability via + # the dimos/tool_stopped frame consumed by McpServer. + self.stop_tool("start_patrol") if self._latest_pose is not None: self.goal_request.publish(self._latest_pose) diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py index 85ecc4e155..16c617c0f6 100644 --- a/dimos/robot/all_blueprints.py +++ b/dimos/robot/all_blueprints.py @@ -46,6 +46,7 @@ "demo-agent": "dimos.agents.demo_agent:demo_agent", "demo-agent-camera": "dimos.agents.demo_agent:demo_agent_camera", "demo-camera": "dimos.hardware.sensors.camera.module:demo_camera", + "demo-capabilities": "dimos.agents.demos.demo_capabilities:demo_capabilities", "demo-error-on-name-conflicts": "dimos.robot.unitree.demo_error_on_name_conflicts:demo_error_on_name_conflicts", "demo-google-maps-skill": "dimos.agents.skills.demo_google_maps_skill:demo_google_maps_skill", "demo-gps-nav": "dimos.agents.skills.demo_gps_nav:demo_gps_nav", @@ -134,7 +135,10 @@ "control-coordinator": "dimos.control.coordinator.ControlCoordinator", "cost-mapper": "dimos.mapping.costmapper.CostMapper", "demo-calculator-skill": "dimos.agents.skills.demo_calculator_skill.DemoCalculatorSkill", + "demo-monitoring": "dimos.agents.demos.demo_capabilities.DemoMonitoring", "demo-robot": "dimos.agents.skills.demo_robot.DemoRobot", + "demo-robot-actions": "dimos.agents.demos.demo_capabilities.DemoRobotActions", + "demo-sensors": "dimos.agents.demos.demo_capabilities.DemoSensors", "desk-static-tf-module": "dimos.perception.fiducial.blueprints.desk_marker_tf.DeskStaticTfModule", "detection2-d-module": "dimos.perception.detection.module2D.Detection2DModule", "detection3-d-module": "dimos.perception.detection.module3D.Detection3DModule", diff --git a/dimos/utils/cli/dimos.tcss b/dimos/utils/cli/dimos.tcss index 3ccbde957d..5dff153677 100644 --- a/dimos/utils/cli/dimos.tcss +++ b/dimos/utils/cli/dimos.tcss @@ -8,7 +8,7 @@ $red: #ff0000; $green: #00eeee; $yellow: #ffcc00; $blue: #5c9ff0; -$purple: #00eeee; +$purple: #b06bff; $cyan: #00eeee; $white: #b5e4f4; @@ -18,7 +18,7 @@ $bright-red: #ff0000; $bright-green: #00eeee; $bright-yellow: #f2ea8c; $bright-blue: #8cbdf2; -$bright-purple: #00eeee; +$bright-purple: #c890ff; $bright-cyan: #00eeee; $bright-white: #ffffff; diff --git a/dimos/utils/cli/human/humancli.py b/dimos/utils/cli/human/humancli.py index 0d8cf86b1c..45faef630f 100644 --- a/dimos/utils/cli/human/humancli.py +++ b/dimos/utils/cli/human/humancli.py @@ -14,7 +14,8 @@ from __future__ import annotations -from datetime import datetime +from collections import deque +from datetime import datetime, timedelta import json import textwrap import threading @@ -22,13 +23,17 @@ from langchain_core.messages import AIMessage, HumanMessage, SystemMessage, ToolCall, ToolMessage from rich.highlighter import JSONHighlighter +from rich.panel import Panel +from rich.text import Text from rich.theme import Theme from textual.app import App, ComposeResult from textual.binding import Binding from textual.containers import Container from textual.geometry import Size -from textual.widgets import Input, RichLog +from textual.strip import Strip +from textual.widgets import Input, RichLog, Static +from dimos.agents.mcp import tool_stream from dimos.core.transport import pLCMTransport from dimos.utils.cli import theme from dimos.utils.generic import truncate_display_string @@ -51,6 +56,107 @@ } ) +# How many of a tool's most recent stream lines to show inside its box. +RECENT_LINES = 5 +# Prefix `McpClient` puts on tool-stream updates it re-emits to `/agent`. +TOOL_MSG_PREFIX = "[tool:" +# Markers pairing a tool call with its result in the scrollback. +TOOL_CALL_MARKER = "▶" +TOOL_RESULT_MARKER = "↳" +# Backstop: finalize a stopped tool's box this long after the stop if no agent +# activity follows to trigger an idle transition. +STOP_FINALIZE_DELAY = 1.0 + + +def _format_elapsed(delta: timedelta) -> str: + total = int(delta.total_seconds()) + return f"{total // 60:02d}:{total % 60:02d}" + + +def _split_tool_message(content: Any) -> tuple[str, str] | None: + """Parse a `[tool:NAME] ` tool-stream message into (name, text).""" + if not isinstance(content, str) or not content.startswith(TOOL_MSG_PREFIX): + return None + end = content.find("]") + if end == -1: + return None + return content[len(TOOL_MSG_PREFIX) : end], content[end + 1 :].lstrip() + + +class ToolPanel: + """Live state for one streaming tool's box. + + ``entries`` holds the most recent ``(timestamp, kind, text)`` tuples where + ``kind`` is ``"tool"`` (a stream update) or ``"agent"`` (the agent's reply + to one), so each box shows the stream and the agent's annotations together. + """ + + def __init__(self, tool_name: str, start: datetime) -> None: + self.tool_name = tool_name + self.start = start + self.entries: deque[tuple[str, str, str]] = deque(maxlen=RECENT_LINES) + self.count = 0 + self.static: Static | None = None + + +class ToolPanelRegion: + """Docked region of live per-tool boxes. + + All methods run on the Textual main thread; the tool-stream subscription + hands updates over via ``call_from_thread``. + """ + + def __init__( + self, + container: Container, + flush_fn: Callable[[Any], None], + ) -> None: + self._container = container + self._flush = flush_fn + self._panels: dict[str, ToolPanel] = {} + + def update(self, tool_name: str, text: str, kind: str, timestamp: str) -> None: + panel = self._panels.get(tool_name) + if panel is None: + panel = ToolPanel(tool_name, datetime.now()) + self._panels[tool_name] = panel + panel.entries.append((timestamp, kind, text)) + panel.count = 1 + panel.static = Static(self._render(panel)) + self._container.mount(panel.static) + else: + panel.entries.append((timestamp, kind, text)) + panel.count += 1 + assert panel.static is not None + panel.static.update(self._render(panel)) + + def finalize(self, tool_name: str) -> None: + panel = self._panels.pop(tool_name, None) + if panel is None: + return + self._flush(self._render(panel, done=True)) + if panel.static is not None: + panel.static.remove() + + def _render(self, panel: ToolPanel, done: bool = False) -> Panel: + elapsed = _format_elapsed(datetime.now() - panel.start) + body = Text() + hidden = panel.count - len(panel.entries) + if hidden > 0: + body.append(f"(+{hidden} earlier)\n", style=theme.DIM) + for i, (timestamp, kind, text) in enumerate(panel.entries): + if i: + body.append("\n") + body.append(f"{timestamp} ", style=theme.TIMESTAMP) + if kind == "agent": + body.append("> ", style=theme.AGENT) + body.append(text, style=theme.AGENT) + else: + body.append(text) + status = f"{panel.count} updates" if done else "running" + title = f"{panel.tool_name} {status} {elapsed}" + return Panel(body, title=title, title_align="left", border_style=theme.PURPLE) + class ThinkingIndicator: """Manages a throbbing 'thinking...' chat message in a RichLog.""" @@ -59,7 +165,7 @@ def __init__( self, app: App[Any], chat_log: RichLog, - add_message_fn: Callable[[str, str, str, str], None], + add_message_fn: Callable[[str, str, str, str], object], ) -> None: self._app: App[Any] = app self._chat_log = chat_log @@ -140,6 +246,12 @@ class HumanCLIApp(App): # type: ignore[type-arg] scrollbar-size: 0 0; }} + #tool-panels {{ + height: auto; + max-height: 50%; + overflow-y: auto; + }} + Input {{ dock: bottom; }} @@ -162,6 +274,16 @@ def __init__(self, *args, **kwargs) -> None: # type: ignore[no-untyped-def] self._subscription_thread: threading.Thread | None = None self._idle_subscription_thread: threading.Thread | None = None self._thinking: ThinkingIndicator | None = None + self._tool_panels: ToolPanelRegion | None = None + self._tool_stream_cleanup: Callable[[], None] | None = None + # The tool whose box the next agent reply belongs to (None -> inline). + self._reply_target: str | None = None + # tool_call_id -> the scrollback strips of that call, so its result can + # be spliced in directly beneath it (calls/results can arrive in any order). + self._tool_call_anchors: dict[str, list[Strip]] = {} + # Tools that have stopped but whose box waits for the agent to catch up. + self._pending_stops: set[str] = set() + self._agent_is_idle = True self._running = False def compose(self) -> ComposeResult: @@ -170,6 +292,8 @@ def compose(self) -> ComposeResult: self.chat_log = RichLog(highlight=True, markup=True, wrap=False) yield self.chat_log + yield Container(id="tool-panels") + self.input_widget = Input(placeholder="Type a message...") yield self.input_widget @@ -186,6 +310,12 @@ def on_mount(self) -> None: assert self.chat_log is not None self._thinking = ThinkingIndicator(self, self.chat_log, self._add_message) + # Live boxes for streaming tools, fed straight off the tool-stream topic. + self._tool_panels = ToolPanelRegion( + self.query_one("#tool-panels", Container), self._flush_tool_panel + ) + self._tool_stream_cleanup = tool_stream.subscribe(self._on_tool_stream) + # Start subscription threads self._subscription_thread = threading.Thread(target=self._subscribe_to_agent, daemon=True) self._subscription_thread.start() @@ -204,6 +334,9 @@ def on_mount(self) -> None: def on_unmount(self) -> None: """Clean up when unmounting.""" self._running = False + if self._tool_stream_cleanup is not None: + self._tool_stream_cleanup() + self._tool_stream_cleanup = None def _subscribe_to_agent(self) -> None: """Subscribe to agent messages in a separate thread.""" @@ -211,6 +344,8 @@ def _subscribe_to_agent(self) -> None: def receive_msg(msg) -> None: # type: ignore[no-untyped-def] if not self._running: return + assert self._tool_panels is not None + assert self._thinking is not None timestamp = datetime.now().strftime("%H:%M:%S") @@ -228,55 +363,178 @@ def receive_msg(msg) -> None: # type: ignore[no-untyped-def] "tool_calls", [] ) - # Display the main content first - if content: + # A reply to a tool-stream update goes inside that tool's box so + # it reads as an annotation of the stream, not the agent talking + # to itself. Replies to a typed message stay inline. + if content and self._reply_target is not None and isinstance(content, str): + self.call_from_thread( + self._tool_panels.update, self._reply_target, content, "agent", timestamp + ) + elif content: self.call_from_thread( self._add_message, timestamp, "agent", content, theme.AGENT ) - # Display tool calls separately with different formatting + # Tool calls are real actions; always show them inline, and + # remember each one so its result can be grouped under it. if tool_calls: for tc in tool_calls: tool_info = self._format_tool_call(tc) self.call_from_thread( - self._add_message, timestamp, "tool", tool_info, theme.TOOL + self._write_tool_call, timestamp, tool_info, tc.get("id") ) - # If neither content nor tool calls, show a placeholder - if not content and not tool_calls: + # If neither content nor tool calls, show a placeholder (but not + # for the silent step that can follow a tool-stream update). + if not content and not tool_calls and self._reply_target is None: self.call_from_thread( self._add_message, timestamp, "agent", "", theme.DIM ) elif isinstance(msg, ToolMessage): self.call_from_thread( - self._add_message, timestamp, "tool", msg.content, theme.TOOL_RESULT + self._write_tool_result, + timestamp, + msg.content, + getattr(msg, "tool_call_id", None), ) elif isinstance(msg, HumanMessage): + # Tool-stream updates arrive here as `[tool:NAME] `. Route + # the update into the tool's box and remember the tool so the + # following agent reply lands in the same box. A real typed + # message clears the target and renders inline. + parsed = _split_tool_message(msg.content) + if parsed is not None: + name, text = parsed + self._reply_target = name + if text: + self.call_from_thread( + self._tool_panels.update, name, text, "tool", timestamp + ) + return + self._reply_target = None self.call_from_thread( self._add_message, timestamp, "human", msg.content, theme.HUMAN ) + # Keep the spinner up while this turn is processed (also re-shows + # it in the rare case a stale idle signal hid it post-submit). + self.call_from_thread(self._thinking.show) self._agent_transport.subscribe(receive_msg) def _subscribe_to_idle(self) -> None: def receive_idle(is_idle: bool) -> None: - assert self._thinking is not None - if not self._running: return - - self.call_from_thread(self._thinking.hide if is_idle else self._thinking.show) + self.call_from_thread(self._set_agent_idle, is_idle) self._agent_idle.subscribe(receive_idle) + def _set_agent_idle(self, is_idle: bool) -> None: + assert self._thinking is not None + self._agent_is_idle = is_idle + if is_idle: + # "thinking..." is only shown for human-initiated turns (on submit), + # so just hide here. Showing it on every busy signal would flash it + # for each tool-stream update, which the agent also runs through the + # graph. + self._thinking.hide() + # The queue is drained, so every stopped tool's trailing replies + # have now rendered -> finalize their boxes. A short delay lets the + # last reply (delivered on a separate topic) settle first. + if self._pending_stops: + self.set_timer(0.15, self._flush_pending_stops) + + def _flush_pending_stops(self) -> None: + assert self._tool_panels is not None + # Only finalize while the agent is idle. A stop can arrive while the + # tool's last update/reply is still queued (the real-time stop signal + # outruns the LLM-paced /agent stream); finalizing then would strand the + # trailing reply in a new, never-closing box. A later idle retries. + if not self._agent_is_idle or not self._pending_stops: + return + for name in self._pending_stops: + self._tool_panels.finalize(name) + self._pending_stops.clear() + + def _on_tool_stream(self, msg: dict[str, Any]) -> None: + """Watch the tool-stream topic for stop signals. + + Box content (updates and the agent's replies) is driven off the ordered + `/agent` stream so the two stay paired; this subscription only needs the + stop signal, which is the one event `/agent` does not carry (a + self-terminating background tool produces no message there). + """ + if not self._running: + return + if msg.get("method") != tool_stream.TOOL_STREAM_STOPPED_METHOD: + return + tool_name = (msg.get("params") or {}).get("tool_name") or "tool" + self.call_from_thread(self._record_stop, tool_name) + + def _record_stop(self, tool_name: str) -> None: + self._pending_stops.add(tool_name) + # Don't finalize off the current (possibly stale) idle flag: the tool's + # last update may have just been queued and not yet rendered. The idle + # handler finalizes once the agent has caught up; this timer is only a + # backstop for a tool that stops with no further agent activity. + self.set_timer(STOP_FINALIZE_DELAY, self._flush_pending_stops) + + def _flush_tool_panel(self, renderable: Any) -> None: + """Write a finalized tool box into the scrollback, keeping the + thinking indicator pinned to the bottom.""" + assert self._thinking is not None + reattach = self._thinking.detach_if_needed() + self.chat_log.write(renderable) # type: ignore[union-attr] + if reattach: + self._thinking.reattach() + def _format_tool_call(self, tool_call: ToolCall) -> str: """Format a tool call for display.""" name = tool_call.get("name", "unknown") args = tool_call.get("args", {}) args_str = json.dumps(args, separators=(",", ":")) - return f"▶ {name}({args_str})" - - def _add_message(self, timestamp: str, sender: str, content: str, color: str) -> None: + return f"{TOOL_CALL_MARKER} {name}({args_str})" + + def _write_tool_call(self, timestamp: str, tool_info: str, call_id: str | None) -> None: + strips = self._add_message(timestamp, "tool", tool_info, theme.TOOL) + if call_id and strips: + self._tool_call_anchors[call_id] = strips + + def _write_tool_result(self, timestamp: str, content: str, call_id: str | None) -> None: + text = f"{TOOL_RESULT_MARKER} {content}" + anchor = self._tool_call_anchors.pop(call_id, None) if call_id else None + if anchor is None: + # No matching call on screen (e.g. a lookout continuation) -> append. + self._add_message(timestamp, "tool", text, theme.TOOL_RESULT) + return + self._insert_after(anchor, timestamp, text) + + def _insert_after(self, anchor: list[Strip], timestamp: str, text: str) -> None: + """Render a tool result and splice it into the log right below the + matching tool call, even when lines were written in between.""" + log = self.chat_log + assert log is not None + new_strips = self._add_message(timestamp, "tool", text, theme.TOOL_RESULT) + if not new_strips: + return + # Pull the just-appended result back out so it can be re-homed. + new_ids = {id(s) for s in new_strips} + log.lines = [line for line in log.lines if id(line) not in new_ids] + # Locate the call's last line by identity (indices shift as the log + # grows) and splice the result in directly after it. + anchor_id = id(anchor[-1]) + insert_at = len(log.lines) + for i, line in enumerate(log.lines): + if id(line) == anchor_id: + insert_at = i + 1 + break + log.lines[insert_at:insert_at] = new_strips + log._line_cache.clear() + log.virtual_size = Size(log.virtual_size.width, len(log.lines)) + log.refresh() + log.scroll_end(animate=False) + + def _add_message(self, timestamp: str, sender: str, content: str, color: str) -> list[Strip]: assert self._thinking is not None reattach = self._thinking.detach_if_needed() @@ -308,6 +566,7 @@ def _add_message(self, timestamp: str, sender: str, content: str, color: str) -> # Split content into lines first (respecting explicit newlines) lines = content.split("\n") + before = len(self.chat_log.lines) # type: ignore[union-attr] for line_idx, line in enumerate(lines): # Wrap each line to fit the available width if line_idx == 0: @@ -334,8 +593,10 @@ def _add_message(self, timestamp: str, sender: str, content: str, color: str) -> # Empty line self.chat_log.write(indent + "│") # type: ignore[union-attr] + written = list(self.chat_log.lines[before:]) # type: ignore[union-attr] if reattach: self._thinking.reattach() + return written def _add_system_message(self, content: str) -> None: """Add a system message to the chat.""" @@ -375,11 +636,16 @@ def on_input_submitted(self, event: Input.Submitted) -> None: self._add_system_message(help_text) return - # Send to agent (message will be displayed when received back) + # Send to agent (message will be displayed when received back). Show + # "thinking..." now so a human turn always has a spinner, even while the + # agent first drains any queued tool-stream updates. + if self._thinking is not None: + self._thinking.show() self._human_transport.publish(message) def action_clear(self) -> None: """Clear the chat log.""" + self._tool_call_anchors.clear() self.chat_log.clear() # type: ignore[union-attr] def action_quit(self) -> None: # type: ignore[override] diff --git a/dimos/utils/cli/human/humanclianim.py b/dimos/utils/cli/human/humanclianim.py index 3bb8ea08c9..5200cd0144 100644 --- a/dimos/utils/cli/human/humanclianim.py +++ b/dimos/utils/cli/human/humanclianim.py @@ -13,17 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os -import random import sys import threading import time from terminaltexteffects import Color # type: ignore[attr-defined] +from terminaltexteffects.effects.effect_expand import Expand from dimos.utils.cli import theme -# Global to store the imported main function _humancli_main = None _import_complete = threading.Event() @@ -43,112 +41,17 @@ def import_cli_in_background() -> None: _import_complete.set() -def get_effect_config(effect_name: str): # type: ignore[no-untyped-def] - """Get hardcoded configuration for a specific effect""" - # Hardcoded configs for each effect - global_config = { - "final_gradient_stops": [Color(theme.ACCENT)], - } - - configs = { - "randomsequence": { - "speed": 0.075, - }, - "slide": {"direction": "left", "movement_speed": 1.5}, - "sweep": {"direction": "left"}, - "print": { - "print_speed": 10, - "print_head_return_speed": 10, - "final_gradient_stops": [Color(theme.ACCENT)], - }, - "pour": {"pour_speed": 9}, - "matrix": {"rain_symbols": "01", "rain_fall_speed_range": (4, 7)}, - "decrypt": {"typing_speed": 5, "decryption_speed": 3}, - "burn": {"fire_chars": "█", "flame_color": "ffffff"}, - "expand": {"expand_direction": "center"}, - "scattered": {"movement_speed": 0.5}, - "beams": {"movement_speed": 0.5, "beam_delay": 0}, - "middleout": {"center_movement_speed": 3, "full_movement_speed": 0.5}, - "rain": { - "rain_symbols": "░▒▓█", - "rain_fall_speed_range": (5, 10), - }, - "highlight": {"highlight_brightness": 3}, - } - - return {**configs.get(effect_name, {}), **global_config} # type: ignore[dict-item] - - def run_banner_animation() -> None: - """Run the ASCII banner animation before launching Textual""" - - # Check if we should animate - random_anim = ["scattered", "print", "expand", "slide", "rain"] - animation_style = os.environ.get("DIMOS_BANNER_ANIMATION", random.choice(random_anim)).lower() - - if animation_style == "none": - return # Skip animation - from terminaltexteffects.effects.effect_beams import Beams - from terminaltexteffects.effects.effect_burn import Burn - from terminaltexteffects.effects.effect_decrypt import Decrypt - from terminaltexteffects.effects.effect_expand import Expand - from terminaltexteffects.effects.effect_highlight import ( - Highlight, - ) - from terminaltexteffects.effects.effect_matrix import Matrix - from terminaltexteffects.effects.effect_middleout import ( - MiddleOut, - ) - from terminaltexteffects.effects.effect_overflow import ( - Overflow, - ) - from terminaltexteffects.effects.effect_pour import Pour - from terminaltexteffects.effects.effect_print import Print - from terminaltexteffects.effects.effect_rain import Rain - from terminaltexteffects.effects.effect_random_sequence import ( - RandomSequence, - ) - from terminaltexteffects.effects.effect_scattered import ( - Scattered, - ) - from terminaltexteffects.effects.effect_slide import Slide - from terminaltexteffects.effects.effect_sweep import Sweep - - # The DIMENSIONAL ASCII art ascii_art = "\n" + theme.ascii_logo.replace("\n", "\n ") - # Choose effect based on style - effect_map = { - "slide": Slide, - "sweep": Sweep, - "print": Print, - "pour": Pour, - "burn": Burn, - "matrix": Matrix, - "rain": Rain, - "scattered": Scattered, - "expand": Expand, - "decrypt": Decrypt, - "overflow": Overflow, - "randomsequence": RandomSequence, - "beams": Beams, - "middleout": MiddleOut, - "highlight": Highlight, - } - - EffectClass = effect_map.get(animation_style, Slide) # Clear screen before starting animation print("\033[2J\033[H", end="", flush=True) - # Get effect configuration - effect_config = get_effect_config(animation_style) + effect = Expand(ascii_art) + effect.effect_config.expand_direction = "center" # type: ignore[attr-defined] + effect.effect_config.final_gradient_stops = (Color(theme.ACCENT),) # type: ignore[attr-defined] - # Create and run the effect with config - effect = EffectClass(ascii_art) - for key, value in effect_config.items(): - setattr(effect.effect_config, key, value) # type: ignore[attr-defined] - - # Run the animation - terminal.print() handles all screen management + # Run the animation with effect.terminal_output() as terminal: # type: ignore[attr-defined] for frame in effect: # type: ignore[attr-defined] terminal.print(frame) @@ -161,8 +64,6 @@ def run_banner_animation() -> None: def main() -> None: - """Main entry point - run animation then launch the real CLI""" - # Start importing CLI in background (this is slow) import_thread = threading.Thread(target=import_cli_in_background, daemon=True) import_thread.start() @@ -171,10 +72,8 @@ def main() -> None: if not (len(sys.argv) > 1 and sys.argv[1] == "web"): run_banner_animation() - # Wait for import to complete - _import_complete.wait(timeout=10) # Max 10 seconds wait + _import_complete.wait(timeout=10) - # Launch the real CLI if _humancli_main: _humancli_main() else: