Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions dimos/agents/annotation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import inspect
import threading
import time
from typing import Any, TypeVar, cast
from typing import Any, TypeVar, cast, overload

from dimos.core.core import rpc
from dimos.utils.logging_config import setup_logger
Expand Down Expand Up @@ -66,7 +66,10 @@ def _stamp_and_log(func_name: str, result: Any, elapsed_ms: float) -> Any:
return result


def skill(func: F) -> F:
def _decorate_skill(func: F, *, lane: str | None = None) -> F:
if lane == "":
raise ValueError("skill lane must be a non-empty string or None")

if inspect.iscoroutinefunction(func):

@functools.wraps(func)
Expand Down Expand Up @@ -108,4 +111,25 @@ def sync_context_wrapper(*args: Any, **kwargs: Any) -> Any:

wrapped = rpc(context_wrapper)
wrapped.__skill__ = True # type: ignore[attr-defined]
wrapped.__skill_lane__ = lane # type: ignore[attr-defined]
return cast("F", wrapped)


@overload
def skill(func: F) -> F: ...


@overload
def skill(*, lane: str | None = None) -> Callable[[F], F]: ...


def skill(func: F | None = None, *, lane: str | None = None) -> F | Callable[[F], F]:
"""Mark a method as an agent skill.

`lane` optionally names a sequential execution lane. MCP callers serialize
calls within the same lane while leaving unlaned or differently-laned skills
free to run concurrently.
"""
if func is None:
return lambda wrapped_func: _decorate_skill(wrapped_func, lane=lane)
return _decorate_skill(func, lane=lane)
31 changes: 26 additions & 5 deletions dimos/agents/mcp/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
app.state.rpc_calls = {}
app.state.sse_queues = []
app.state.event_loop = None
app.state.lane_locks = {}


def _jsonrpc_result(req_id: Any, result: Any) -> dict[str, Any]:
Expand Down Expand Up @@ -89,13 +90,19 @@ def _handle_tools_list(req_id: Any, skills: list[SkillInfo]) -> dict[str, Any]:
tool: dict[str, Any] = {"name": s.func_name, "inputSchema": schema}
if description:
tool["description"] = description
if s.lane is not None:
tool["_meta"] = {"lane": s.lane}
tools.append(tool)

return _jsonrpc_result(req_id, {"tools": tools})


async def _handle_tools_call(
req_id: Any, params: dict[str, Any], rpc_calls: dict[str, Any]
req_id: Any,
params: dict[str, Any],
skills: list[SkillInfo],
rpc_calls: dict[str, Any],
lane_locks: dict[str, asyncio.Lock],
) -> dict[str, Any]:
name = params.get("name", "")
args: dict[str, Any] = params.get("arguments") or {}
Expand All @@ -107,6 +114,8 @@ async def _handle_tools_call(
logger.warning("MCP tool not found", tool=name)
return _jsonrpc_result_text(req_id, f"Tool not found: {name}")

lane = next((s.lane for s in skills if s.func_name == name), None)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The O(n) linear scan for the lane duplicates work already done when the rpc_calls dict was built. A {s.func_name: s.lane for s in skills}.get(name) dict comprehension or a pre-built lane map alongside rpc_calls would make this O(1).

Suggested change
lane = next((s.lane for s in skills if s.func_name == name), None)
lane = {s.func_name: s.lane for s in skills}.get(name)

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!


logger.info("MCP tool call", tool=name, args=args, progress_token=progress_token)
t0 = time.monotonic()

Expand All @@ -116,10 +125,16 @@ async def _handle_tools_call(
if progress_token is not None:
call_kwargs["_mcp_context"] = {"progress_token": progress_token}

async def run_rpc_call() -> Any:
return await asyncio.get_event_loop().run_in_executor(None, lambda: rpc_call(**call_kwargs))

try:
result = await asyncio.get_event_loop().run_in_executor(
None, lambda: rpc_call(**call_kwargs)
)
if lane is None:
result = await run_rpc_call()
else:
lock = lane_locks.setdefault(lane, asyncio.Lock())
async with lock:
result = await run_rpc_call()
Comment on lines +135 to +137
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Loop-bound asyncio.Lock in shared global dict

app.state.lane_locks is a module-level dict that persists across the process lifetime. In Python 3.10+, asyncio.Lock inherits from _LoopBoundMixin, which binds the lock to the first event loop that calls acquire() on it — subsequent calls from a different event loop raise RuntimeError: bound to a different event loop. Each asyncio.run() invocation (as used in tests) creates its own event loop, so after test_mcp_module_serializes_same_lane_calls completes, the "motion" lock remains in the dict bound to the now-closed loop. Any future test that also exercises the "motion" lane in a fresh asyncio.run() will immediately fail. The root cause is that lane_locks is not threaded through handle_request the way skills and rpc_calls are. Adding it as an optional parameter (defaulting to app.state.lane_locks) would let tests pass their own empty dict.

except Exception as e:
logger.exception("MCP tool error", tool=name, duration=f"{time.monotonic() - t0:.3f}s")
return _jsonrpc_result_text(req_id, f"Error running tool '{name}': {e}")
Expand Down Expand Up @@ -158,7 +173,13 @@ async def handle_request(
if method == "tools/list":
return _handle_tools_list(req_id, skills)
if method == "tools/call":
return await _handle_tools_call(req_id, params, rpc_calls)
return await _handle_tools_call(
req_id,
params,
skills,
rpc_calls,
app.state.lane_locks,
)
return _jsonrpc_error(req_id, -32601, f"Unknown: {method}")


Expand Down
56 changes: 56 additions & 0 deletions dimos/agents/mcp/test_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import asyncio
import json
import threading
import time
from unittest.mock import MagicMock

from dimos.agents.mcp.mcp_server import handle_request
Expand Down Expand Up @@ -68,6 +70,60 @@ def test_mcp_module_request_flow() -> None:
rpc_calls["add"].assert_called_once_with(x=2, y=3)


def test_mcp_module_lists_skill_lane_metadata() -> None:
schema = json.dumps({"type": "object", "properties": {}})
skills = [
SkillInfo(class_name="TestSkills", func_name="drive", args_schema=schema, lane="motion")
]
rpc_calls = _make_rpc_calls(skills, {"drive": "ok"})

response = asyncio.run(handle_request({"method": "tools/list", "id": 1}, skills, rpc_calls))

assert response["result"]["tools"][0]["_meta"] == {"lane": "motion"}


def test_mcp_module_serializes_same_lane_calls() -> None:
schema = json.dumps({"type": "object", "properties": {}})
skills = [
SkillInfo(class_name="TestSkills", func_name="drive", args_schema=schema, lane="motion")
]
rpc_calls = _make_rpc_calls(skills, {})
active = 0
max_active = 0
lock = threading.Lock()

def slow_drive() -> str:
nonlocal active, max_active
with lock:
active += 1
max_active = max(max_active, active)
time.sleep(0.05)
with lock:
active -= 1
return "ok"

rpc_calls["drive"].side_effect = slow_drive

async def run_two_calls() -> None:
await asyncio.gather(
handle_request(
{"method": "tools/call", "id": 1, "params": {"name": "drive", "arguments": {}}},
skills,
rpc_calls,
),
handle_request(
{"method": "tools/call", "id": 2, "params": {"name": "drive", "arguments": {}}},
skills,
rpc_calls,
),
)

asyncio.run(run_two_calls())

assert rpc_calls["drive"].call_count == 2
assert max_active == 1


def test_mcp_module_injects_progress_token_as_mcp_context() -> None:
"""When the client sends `_meta.progressToken`, the RPC call receives it as
an `_mcp_context` kwarg so the `@skill` wrapper can stash it in the
Expand Down
17 changes: 17 additions & 0 deletions dimos/agents/test_skill_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,20 @@ def my_skill() -> SkillResult:
assert sentinel.duration_ms == 999.0 # untouched
# Decorator overwrites with actual measured elapsed (very small).
assert result.duration_ms != 999.0

def test_parameterized_decorator_records_lane(self):
@skill(lane="motion")
def go_home() -> SkillResult:
return SkillResult.ok("done")

assert go_home.__skill__ is True
assert go_home.__skill_lane__ == "motion"
assert go_home().is_success()

def test_bare_decorator_records_no_lane(self):
@skill
def inspect_scene() -> SkillResult:
return SkillResult.ok("done")

assert inspect_scene.__skill__ is True
assert inspect_scene.__skill_lane__ is None
6 changes: 5 additions & 1 deletion dimos/core/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class SkillInfo:
class_name: str
func_name: str
args_schema: str
lane: str | None = None


class PeekNotFound:
Expand Down Expand Up @@ -440,7 +441,10 @@ def get_skills(self) -> list[SkillInfo]:
schema = json.dumps(tool(attr).args_schema.model_json_schema())
skills.append(
SkillInfo(
class_name=self.__class__.__name__, func_name=name, args_schema=schema
class_name=self.__class__.__name__,
func_name=name,
args_schema=schema,
lane=getattr(attr, "__skill_lane__", None),
)
)
return skills
Expand Down