Skip to content
67 changes: 51 additions & 16 deletions sdks/python/agenta/sdk/decorators/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,27 @@
retrieve_handler,
retrieve_interface,
retrieve_configuration,
parse_uri,
)
from agenta.sdk.engines.running.handlers import remote_forward_v0

import agenta as ag


log = get_module_logger(__name__)


def _is_custom_hook(uri: Optional[str]) -> bool:
"""True for a custom hook URI (any provider/version), e.g. agenta:custom:hook:v0."""
if not uri:
return False
try:
_provider, kind, key, _version = parse_uri(uri)
except Exception:
return False
return kind == "custom" and key == "hook"


class InvokeFn(Protocol):
async def __call__(
self,
Expand Down Expand Up @@ -133,6 +146,8 @@ def __init__(
#
revision: Optional[dict] = None,
# -------------------------------------------------------------------- #
remote: bool = False,
# -------------------------------------------------------------------- #
**kwargs,
):
# -------------------------------------------------------------------- #
Expand Down Expand Up @@ -194,6 +209,8 @@ def __init__(

self.handler = None

self.remote = remote

self.middlewares = [
VaultMiddleware(),
ResolverMiddleware(),
Expand All @@ -205,22 +222,25 @@ def __init__(
self.uri = _data.uri

if self.uri is not None:
self._retrieve_handler(self.uri)

if self.handler:
registered = retrieve_interface(self.uri)
if registered:
# merge registered interface into revision data, keeping caller overrides
merged = registered.model_dump(exclude_none=True)
merged.update(self.revision.data.model_dump(exclude_none=True))
self.revision.data = WorkflowRevisionData(**merged)
self.uri = self.revision.data.uri

registered_config = retrieve_configuration(self.uri)
if registered_config and not self.revision.data.parameters:
self.revision.data.parameters = registered_config.parameters

self.parameters = self.revision.data.parameters
# A user custom hook must run its own installed handler (local) or
# forward to its url (remote); the URI must not resolve to a managed
# handler that would shadow the function attached by the decorator.
if not _is_custom_hook(self.uri):
self._retrieve_handler(self.uri)

registered = retrieve_interface(self.uri)
if registered:
# merge registered interface into revision data, keeping caller overrides
merged = registered.model_dump(exclude_none=True)
merged.update(self.revision.data.model_dump(exclude_none=True))
self.revision.data = WorkflowRevisionData(**merged)
self.uri = self.revision.data.uri

registered_config = retrieve_configuration(self.uri)
if registered_config and not self.revision.data.parameters:
self.revision.data.parameters = registered_config.parameters

self.parameters = self.revision.data.parameters

def __call__(self, handler: Optional[Callable[..., Any]] = None) -> Workflow:
if self.handler is None and handler is not None:
Expand Down Expand Up @@ -373,6 +393,21 @@ async def invoke(
)
running_ctx.parameters = self.parameters

# remote=True forwards to the workflow url; otherwise run the
# installed handler. Seeding it here lets the resolver keep the
# decorator's handler instead of re-resolving by URI.
running_ctx.handler = remote_forward_v0 if self.remote else self.handler
log.info(
"workflow handler bound",
uri=self.uri,
remote=self.remote,
handler=getattr(
running_ctx.handler,
"__name__",
type(running_ctx.handler).__name__,
),
)

async def terminal(req: WorkflowInvokeRequest):
return None

Expand Down
15 changes: 15 additions & 0 deletions sdks/python/agenta/sdk/engines/running/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,21 @@ def __init__(self, message: str, stacktrace: Optional[str] = None):
)


class CustomHookHandlerNotDefinedV0Error(ErrorStatus):
code: int = 500
type: str = f"{ERRORS_BASE_URL}#v0:workflows:custom-hook-handler-not-defined"

def __init__(self) -> None:
super().__init__(
code=self.code,
type=self.type,
message=(
"Custom hook has no handler. Define a local handler on the workflow, "
"or set remote=True to forward to its configured url."
),
)


class CustomCodeServerV0Error(ErrorStatus):
code: int = 500
type: str = f"{ERRORS_BASE_URL}#v0:workflows:custom-code-server-error"
Expand Down
84 changes: 55 additions & 29 deletions sdks/python/agenta/sdk/engines/running/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from agenta.sdk.engines.running.templates import EVALUATOR_TEMPLATES
from agenta.sdk.engines.running.errors import (
CustomCodeServerV0Error,
CustomHookHandlerNotDefinedV0Error,
ErrorStatus,
InvalidConfigurationParametersV0Error,
InvalidConfigurationParameterV0Error,
Expand Down Expand Up @@ -2223,8 +2224,15 @@ async def chat_v0(
return message.model_dump(exclude_none=True) # type: ignore


@instrument(ignore_inputs=["parameters"])
async def hook_v0(
def _extract_revision_field(value: Optional[Data], field: str) -> Optional[Any]:
if isinstance(value, dict):
data = value.get("data") if "data" in value else value
if isinstance(data, dict):
return data.get(field)
return None


async def remote_forward_v0(
request: Optional[Data] = None,
revision: Optional[Data] = None,
#
Expand All @@ -2235,37 +2243,21 @@ async def hook_v0(
trace: Optional[Data] = None,
testcase: Optional[Data] = None,
) -> Any:
"""
Webhook-based application handler for CUSTOM app types.

Forwards the request to an external webhook URL and returns the response.
The webhook URL is read from the workflow interface (``url`` field in
revision data), not from ``parameters``.

Args:
request: Optional canonical request envelope.
revision: Optional revision data containing the webhook URL.
parameters: Configuration parameters forwarded to the webhook.
inputs: Inputs to forward to the webhook.
outputs: Optional outputs to forward to the webhook.
trace: Optional trace data to forward to the webhook.
testcase: Optional testcase data to forward to the webhook.
"""Run a workflow remotely by forwarding the request to its ``url``.

Returns:
The response from the webhook.
Selected when a workflow declares ``remote=True``. Reads ``url`` (and optional
``headers``) from the revision data, POSTs to ``{url}/invoke``, and returns the
response. This is execution-location logic, not a per-URI handler.
"""
from agenta.sdk.contexts.running import RunningContext

def _extract_webhook_url(value: Optional[Data]) -> Optional[str]:
if isinstance(value, dict):
data = value.get("data") if "data" in value else value
if isinstance(data, dict):
url = data.get("url")
return str(url) if url else None
return None

ctx = RunningContext.get()
webhook_url = _extract_webhook_url(revision) or _extract_webhook_url(ctx.revision)
webhook_url = _extract_revision_field(revision, "url") or _extract_revision_field(
ctx.revision, "url"
)
headers = _extract_revision_field(revision, "headers") or _extract_revision_field(
ctx.revision, "headers"
)

if not webhook_url:
raise MissingConfigurationParameterV0Error(path="url")
Expand All @@ -2280,6 +2272,12 @@ def _extract_webhook_url(value: Optional[Data]) -> Optional[str]:
got=webhook_url,
) from exc

# The stored url is the service base (pre-/invoke); the invoke surface lives
# at /invoke and is always appended.
target_url = f"{webhook_url.rstrip('/')}/invoke"

log.info("remote_forward_v0 POST", url=target_url)
Comment thread
jp-agenta marked this conversation as resolved.

json_payload = {
"inputs": inputs or {},
"parameters": parameters or {},
Expand All @@ -2291,11 +2289,19 @@ def _extract_webhook_url(value: Optional[Data]) -> Optional[str]:
if testcase is not None:
json_payload["testcase"] = testcase

# httpx requires str->str headers; coerce values from revision data.
request_headers = (
{str(k): str(v) for k, v in headers.items()}
if isinstance(headers, dict)
else None
)

async with httpx.AsyncClient() as client:
try:
response = await client.post(
url=webhook_url,
url=target_url,
json=json_payload,
headers=request_headers,
timeout=httpx.Timeout(30.0, connect=5.0),
Comment thread
jp-agenta marked this conversation as resolved.
)
except Exception as e:
Expand Down Expand Up @@ -2327,6 +2333,26 @@ def _extract_webhook_url(value: Optional[Data]) -> Optional[str]:
return response_bytes.decode("utf-8")


async def hook_v0(
request: Optional[Data] = None,
revision: Optional[Data] = None,
#
parameters: Optional[Data] = None,
inputs: Optional[Data] = None,
outputs: Optional[Union[Data, str]] = None,
#
trace: Optional[Data] = None,
testcase: Optional[Data] = None,
) -> Any:
"""Placeholder for the custom-hook URI. Reaching it is a misconfiguration.

A custom hook must run its own installed handler (local) or forward to its url
(``remote=True``). The URI never resolves to this function in either path, so
being here means a custom hook was invoked without a defined handler.
"""
raise CustomHookHandlerNotDefinedV0Error()
Comment thread
jp-agenta marked this conversation as resolved.


def _resolve_reference_value(reference: Any, request: Dict[str, Any]) -> Any:
"""Resolve a reference that may be a JSONPath/Pointer selector or a literal value.

Expand Down
6 changes: 5 additions & 1 deletion sdks/python/agenta/sdk/middlewares/running/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,11 @@ async def __call__(
except Exception:
raise

handler = await resolve_handler(uri=(revision.uri if revision else None))
# Keep a handler the decorator already installed (local handler or remote
# forwarder); only resolve from the URI registry for pure URI dispatch.
handler = ctx.handler or await resolve_handler(
uri=(revision.uri if revision else None)
)

ctx.revision = (
{"data": revision.model_dump(mode="json", exclude_none=True)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@

from agenta.sdk.contexts.running import RunningContext, running_context_manager
from agenta.sdk.models.workflows import WorkflowRevisionData
from agenta.sdk.workflows.errors import FeedbackV0Error
from agenta.sdk.workflows.errors import (
CustomHookHandlerNotDefinedV0Error,
FeedbackV0Error,
)
from agenta.sdk.workflows.handlers import (
code_v0,
hook_v0,
remote_forward_v0,
llm_v0,
match_v0,
feedback_v0,
Expand Down Expand Up @@ -146,7 +150,7 @@ def test_hook_calls_local_server_and_returns_json(self):
url = f"http://127.0.0.1:{port}/eval"

try:
_hook_v0 = hook_v0.__wrapped__
_hook_v0 = remote_forward_v0
revision = WorkflowRevisionData(uri="agenta:custom:hook:v0", url=url)
ctx = RunningContext(revision=revision.model_dump(mode="json"))

Expand Down Expand Up @@ -188,7 +192,7 @@ def log_message(self, *args):
url = f"http://127.0.0.1:{port}/eval"

try:
_hook_v0 = hook_v0.__wrapped__
_hook_v0 = remote_forward_v0
revision = WorkflowRevisionData(uri="agenta:custom:hook:v0", url=url)
ctx = RunningContext(revision=revision.model_dump(mode="json"))

Expand Down Expand Up @@ -229,7 +233,7 @@ def log_message(self, *args):
url = f"http://127.0.0.1:{port}/eval"

try:
_hook_v0 = hook_v0.__wrapped__
_hook_v0 = remote_forward_v0
revision = WorkflowRevisionData(uri="agenta:custom:hook:v0", url=url)
ctx = RunningContext(revision=revision.model_dump(mode="json"))

Expand All @@ -247,6 +251,11 @@ def log_message(self, *args):
assert "testcase" in received_payload
assert received_payload["testcase"] == {"correct_answer": "4"}

def test_hook_v0_stub_raises(self):
"""hook_v0 is a placeholder; reaching it is a misconfiguration."""
with pytest.raises(CustomHookHandlerNotDefinedV0Error):
run(hook_v0(inputs={"q": "x"}))


# ---------------------------------------------------------------------------
# TestCodeV0Acceptance
Expand Down
7 changes: 4 additions & 3 deletions sdks/python/oss/tests/pytest/utils/test_hook_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
5. Error handling — non-200 status codes, client-side network errors, oversized response.

async handlers are called via asyncio.run() so no pytest-asyncio marker is needed.
The @instrument() decorator is bypassed via __wrapped__.
Forwarding now lives in remote_forward_v0 (undecorated), called directly.
"""

import asyncio
Expand All @@ -27,9 +27,10 @@
WebhookClientV0Error,
WebhookServerV0Error,
)
from agenta.sdk.workflows.handlers import hook_v0
from agenta.sdk.workflows.handlers import remote_forward_v0

_hook_v0 = hook_v0.__wrapped__
# Forwarding moved from hook_v0 to remote_forward_v0 (undecorated plumbing).
_hook_v0 = remote_forward_v0


# ---------------------------------------------------------------------------
Expand Down
7 changes: 7 additions & 0 deletions web/AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,13 @@ const items = useMemo(() => [
<AccordionTreePanel items={items} />
```

### Keep in-code comments terse

**Hard rule.** At most ONE short line per comment. No multi-line blocks narrating *why*
in prose, no restating what the code shows. Before writing any comment, ask "can this be
one line?" — if not, cut it. Longer comments only for a genuinely surprising constraint
(documented bug, race, ordering requirement), and even then a sentence or two max.

## Packages, entities, and code placement

The `@agenta/*` workspace packages share UI, state, and utilities across OSS and EE. The
Expand Down
Loading
Loading