Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions dimos/agents/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import inspect
import threading
import time
from typing import Any, TypeVar, cast
from typing import Any, Literal, TypeVar, cast, overload

from dimos.core.core import rpc
from dimos.utils.logging_config import setup_logger
Expand All @@ -29,6 +29,9 @@

_SKILL_CONTEXT = threading.local()

SkillLifecycle = Literal["instant", "background"]
_VALID_LIFECYCLES = ("instant", "background")


def current_skill_context() -> dict[str, Any] | None:
"""Return the per-call context for the currently executing `@skill`.
Expand Down Expand Up @@ -66,7 +69,7 @@ def _stamp_and_log(func_name: str, result: Any, elapsed_ms: float) -> Any:
return result


def skill(func: F) -> F:
def _make_skill(func: F, uses: list[str], lifecycle: SkillLifecycle) -> F:
if inspect.iscoroutinefunction(func):

@functools.wraps(func)
Expand Down Expand Up @@ -108,4 +111,42 @@ def sync_context_wrapper(*args: Any, **kwargs: Any) -> Any:

wrapped = rpc(context_wrapper)
wrapped.__skill__ = True # type: ignore[attr-defined]
wrapped.__skill_uses__ = list(uses) # type: ignore[attr-defined]
wrapped.__skill_lifecycle__ = lifecycle # type: ignore[attr-defined]
return cast("F", wrapped)


@overload
def skill(func: F) -> F: ...
@overload
def skill(*, uses: list[str] | None = ..., lifecycle: SkillLifecycle = ...) -> Callable[[F], F]: ...


def skill(
func: F | None = None,
*,
uses: list[str] | None = None,
lifecycle: SkillLifecycle = "instant",
) -> F | Callable[[F], F]:
"""Mark a method as an agent-callable skill.

Supports both bare-form `@skill` and parameterized `@skill(uses=[...], lifecycle=...)`.

`uses` declares capabilities the skill needs (e.g. `["movement"]`). The MCP
server uses these to refuse the call when another skill is already holding
a required capability. Default: no capabilities.

`lifecycle` is `"instant"` (default) for skills that finish their work before
returning, or `"background"` for skills that kick off background work and
return early -- those must use `start_tool`/`stop_tool` so the matching
stop-tool frame can release their capabilities.
"""
if lifecycle not in _VALID_LIFECYCLES:
raise ValueError(f"lifecycle must be one of {_VALID_LIFECYCLES}, got {lifecycle!r}")
if func is not None:
return _make_skill(func, uses=[], lifecycle="instant")

def decorator(f: F) -> F:
return _make_skill(f, uses=list(uses or []), lifecycle=lifecycle)

return decorator
139 changes: 139 additions & 0 deletions dimos/agents/capabilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright 2026 Dimensional Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Capability registry for skill-level mutual exclusion.

A skill declares the capabilities it occupies via `@skill(uses=[...])`. The MCP
server consults a process-wide `CapabilityRegistry` before dispatching each
`tools/call`. When a required capability is held by another skill, the server
either waits briefly for it to clear (for short, self-completing holders) or
refuses with a plain text "Cannot start X: capability Y is held by Z" result, in
which case the LLM decides what to do (typically: call the appropriate stop tool,
then retry).

Capabilities are plain strings. Today the only declared capability is
`CAP_MOVEMENT`. New capabilities should be added as constants here so they are
discoverable from one place.
"""

from __future__ import annotations

from collections.abc import Callable
import threading
import time
from typing import NamedTuple

CAP_MOVEMENT = "movement"


class _Hold(NamedTuple):
"""Who currently holds a capability.

`tool_name` is the human-meaningful skill name (used for conflict messages
and the same-tool takeover decision). `token` is unique per invocation and
scopes release, so a stale invocation can't free a live hold.
"""

tool_name: str
token: str


class CapabilityRegistry:
"""In-memory map of capability -> holding invocation.

All methods are thread-safe. The registry is intentionally simple: every
capability is exclusive (no shared/exclusive distinction yet). On conflict
`acquire` either refuses immediately (the default try-lock) or, when given a
`timeout`, blocks until the conflicting hold clears or the timeout expires.

A hold is identified per-invocation by an opaque `token`, not by the tool
name. Two invocations of the *same* tool don't conflict -- the later one
takes over the hold -- but release is scoped to the token, so the earlier
invocation's teardown can't release the capability out from under the later
one. Different tools sharing a capability still conflict.
"""

def __init__(self) -> None:
self._holders: dict[str, _Hold] = {}
self._cond = threading.Condition()

def acquire(
self,
caps: list[str],
tool_name: str,
token: str,
*,
timeout: float = 0.0,
can_wait: Callable[[str], bool] | None = None,
) -> tuple[str, str] | None:
"""Atomic all-or-nothing acquire of `caps` for one invocation.

Conflicts only with a *different* `tool_name`; a same-tool re-acquire is
a takeover that overwrites the holder with the new `token`.

With the default `timeout=0.0` this is a non-blocking try-lock: it returns
`None` on success or, on conflict, `(cap, current_tool)` for the first
conflicting capability (no caps are acquired in that case).

With `timeout > 0` it blocks up to `timeout` seconds for a conflicting hold
to clear, re-attempting the all-or-nothing acquire whenever a hold is
released. `can_wait(holder_tool_name)` decides whether to wait on a given
holder; if it returns `False` the conflict is returned immediately instead
of waiting (used to refuse, rather than block on, holders that won't release
on their own).
"""
deadline = time.monotonic() + timeout
with self._cond:
while True:
conflict = self._acquire_locked(caps, tool_name, token)
if conflict is None:
return None
if can_wait is not None and not can_wait(conflict[1]):
return conflict
remaining = deadline - time.monotonic()
if remaining <= 0:
return conflict
self._cond.wait(remaining)

def _acquire_locked(
self, caps: list[str], tool_name: str, token: str
) -> tuple[str, str] | None:
"""Atomic check-and-set of `caps`; the caller must hold `self._cond`."""
for cap in caps:
current = self._holders.get(cap)
if current is not None and current.tool_name != tool_name:
return (cap, current.tool_name)
for cap in caps:
self._holders[cap] = _Hold(tool_name, token)
return None

def release_by_token(self, token: str) -> list[str]:
"""Release every capability whose current holder matches `token`.

A stale token (one already taken over by a newer invocation of the same
tool) matches nothing and releases nothing. Wakes any callers blocked in
`acquire` so they can re-attempt.
"""
with self._cond:
released = [cap for cap, h in self._holders.items() if h.token == token]
for cap in released:
del self._holders[cap]
if released:
self._cond.notify_all()
return released

def snapshot(self) -> dict[str, str]:
"""Map of capability -> holding tool name (for logging/diagnostics)."""
with self._cond:
return {cap: h.tool_name for cap, h in self._holders.items()}
95 changes: 95 additions & 0 deletions dimos/agents/demos/agent_demo_instructions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# Agent Capability Demo Instructions

This demo runs a simulated warehouse inspection robot. It has no hardware
dependencies. All work is simulated with sleeps and tool-stream updates.

## Start

Terminal 1:

```bash
uv run dimos run demo-capabilities
```

Terminal 2:

```bash
uv run humancli
```

## Manual Prompts

Type these in `humancli`.

### Sequential vs. Parallel Instant Tools

```text
Read the battery, read the temperature, and capture a photo at the same time.
```

Expected: the three timestamp windows should overlap substantially.

### Background Tool Streams Without Conflict

```text
Start patrolling and start an environment scan.
```

Expected: `start_patrol` starts and streams `visiting waypoint N` updates.
`start_environment_scan` also starts and streams air readings. These should run
together because environment scanning has no capability.

### Capability Conflict

```text
Without stopping patrol first, try to turn in place 90 degrees. If there is a conflict, report the exact error and do not recover.
```

Expected: `turn_in_place` is refused because `movement` is held by
`start_patrol`. The response should include text like:

```text
Cannot start 'turn_in_place': capability 'movement' is held by 'start_patrol'.
```

### Release and Retry

```text
Stop patrol, then turn in place 90 degrees.
```

Expected: `stop_patrol` closes the patrol tool stream, releasing `movement`.
`turn_in_place` then succeeds with a 2-second timestamp window.

### Self-Terminating Background Tool

```text
Do a lap and start an environment scan.
```

Expected: `do_a_lap` streams four checkpoint updates over about 8 seconds and
then stops by itself, releasing `movement`. The environment scan keeps running
until stopped.

Stop the scan when done:

```text
Stop the environment scan.
```

### Different Capability

```text
Weigh the sample box and secure it at the same time.
```

Expected: one of the two payload tools may be refused while the other holds `payload`.

### Competing Background Movement Tools

```text
Start patrol and do a lap at the same time.
```

Expected: one movement tool starts. The other should be refused because
`movement` is already held by the first movement tool.
Loading
Loading