diff --git a/api/ee/src/core/access/permissions/types.py b/api/ee/src/core/access/permissions/types.py index c3ab36b719..6cf7ee1647 100644 --- a/api/ee/src/core/access/permissions/types.py +++ b/api/ee/src/core/access/permissions/types.py @@ -190,6 +190,11 @@ class Permission(str, Enum): EDIT_TOOLS = "edit_tools" RUN_TOOLS = "run_tools" + # Triggers + VIEW_TRIGGERS = "view_triggers" + EDIT_TRIGGERS = "edit_triggers" + RUN_TRIGGERS = "run_triggers" + @classmethod def default_permissions(cls, role): VIEWER_PERMISSIONS = [ @@ -217,6 +222,7 @@ def default_permissions(cls, role): cls.VIEW_EVALUATION_METRICS, cls.VIEW_EVALUATION_QUEUES, cls.VIEW_TOOLS, + cls.VIEW_TRIGGERS, ] ANNOTATOR_PERMISSIONS = VIEWER_PERMISSIONS + [ cls.CREATE_EVALUATION, @@ -230,6 +236,7 @@ def default_permissions(cls, role): cls.EDIT_EVALUATION_QUEUES, cls.EDIT_SPANS, cls.RUN_TOOLS, + cls.RUN_TRIGGERS, ] EDITOR_PERMISSIONS = ANNOTATOR_PERMISSIONS + [ cls.EDIT_APPLICATIONS, @@ -251,6 +258,7 @@ def default_permissions(cls, role): cls.EDIT_TESTSETS, cls.EDIT_INVOCATIONS, cls.EDIT_TOOLS, + cls.EDIT_TRIGGERS, ] DEVELOPER_PERMISSIONS = EDITOR_PERMISSIONS + [ cls.VIEW_API_KEYS, diff --git a/api/ee/tests/pytest/acceptance/triggers/__init__.py b/api/ee/tests/pytest/acceptance/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/ee/tests/pytest/acceptance/triggers/test_triggers_catalog.py b/api/ee/tests/pytest/acceptance/triggers/test_triggers_catalog.py new file mode 100644 index 0000000000..005bdc367f --- /dev/null +++ b/api/ee/tests/pytest/acceptance/triggers/test_triggers_catalog.py @@ -0,0 +1,160 @@ +"""EE acceptance tests for the triggers events catalog. + +Mirrors the OSS suite (oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py) +but exercises /triggers/catalog/* as a business-plan, developer-role account. +Under EE the catalog is gated on the VIEW_TOOLS permission (the triggers domain +shares the gateway permission surface with tools); a developer role carries +VIEW_TOOLS, so this verifies the endpoint behaves once the gate is satisfied. + +Provider-catalog reads need no Composio credentials (empty catalog is valid). +Event browse / config-schema fetch make real Composio calls and are gated on +COMPOSIO_API_KEY being present in the runner's environment. + +Requires a running API. +""" + +import os +from uuid import uuid4 + +import pytest +import requests + +from utils.constants import BASE_TIMEOUT + + +_COMPOSIO_ENABLED = bool(os.getenv("COMPOSIO_API_KEY")) +_requires_composio = pytest.mark.skipif( + not _COMPOSIO_ENABLED, + reason="needs live Composio credentials (COMPOSIO_API_KEY)", +) + + +def _create_developer_business_account(admin_api): + uid = uuid4().hex[:12] + email = f"triggers-dev-{uid}@test.agenta.ai" + resp = admin_api( + "POST", + "/admin/simple/accounts/", + json={ + "accounts": { + "u": { + "user": {"email": email}, + "options": { + "create_api_keys": True, + "return_api_keys": True, + "seed_defaults": False, + }, + "subscription": {"plan": "cloud_v0_business"}, + "organization_memberships": [ + { + "organization_ref": {"ref": "org"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + "workspace_memberships": [ + { + "workspace_ref": {"ref": "wrk"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + "project_memberships": [ + { + "project_ref": {"ref": "prj"}, + "user_ref": {"ref": "user"}, + "role": "developer", + } + ], + } + } + }, + ) + assert resp.status_code == 200, resp.text + account = resp.json()["accounts"]["u"] + return { + "email": email, + "credentials": f"ApiKey {account['api_keys']['key']}", + } + + +def _delete_account_by_email(admin_api, *, email): + resp = admin_api( + "DELETE", + "/admin/simple/accounts/", + json={"accounts": {"u": {"user": {"email": email}}}, "confirm": "delete"}, + ) + assert resp.status_code == 204, resp.text + + +@pytest.fixture(scope="class") +def triggers_api(admin_api, ag_env): + account = _create_developer_business_account(admin_api) + + def _request(method: str, endpoint: str, **kwargs): + headers = kwargs.pop("headers", {}) + headers.setdefault("Authorization", account["credentials"]) + return requests.request( + method=method, + url=f"{ag_env['api_url']}{endpoint}", + headers=headers, + timeout=BASE_TIMEOUT, + **kwargs, + ) + + yield _request + + _delete_account_by_email(admin_api, email=account["email"]) + + +class TestTriggersCatalogProviders: + def test_list_providers_returns_200(self, triggers_api): + response = triggers_api("GET", "/triggers/catalog/providers/") + assert response.status_code == 200 + + def test_list_providers_response_shape(self, triggers_api): + body = triggers_api("GET", "/triggers/catalog/providers/").json() + assert "count" in body + assert "providers" in body + assert isinstance(body["providers"], list) + assert body["count"] == len(body["providers"]) + + @pytest.mark.skipif( + _COMPOSIO_ENABLED, + reason="catalog is non-empty when Composio is enabled", + ) + def test_list_providers_empty_when_composio_disabled(self, triggers_api): + body = triggers_api("GET", "/triggers/catalog/providers/").json() + assert body["count"] == 0 + assert body["providers"] == [] + + +@_requires_composio +class TestTriggersCatalogEvents: + def test_browse_events_returns_200(self, triggers_api): + response = triggers_api( + "GET", + "/triggers/catalog/providers/composio/integrations/github/events/", + ) + assert response.status_code == 200 + body = response.json() + assert "events" in body + assert isinstance(body["events"], list) + + def test_fetch_event_config_schema(self, triggers_api): + listing = triggers_api( + "GET", + "/triggers/catalog/providers/composio/integrations/github/events/", + ).json() + if not listing["events"]: + pytest.skip("no github events available from Composio") + + event_key = listing["events"][0]["key"] + response = triggers_api( + "GET", + f"/triggers/catalog/providers/composio/integrations/github/events/{event_key}", + ) + assert response.status_code == 200 + event = response.json()["event"] + assert event["key"] == event_key + assert "trigger_config" in event diff --git a/api/entrypoints/routers.py b/api/entrypoints/routers.py index c235e817fd..96c11eb9a8 100644 --- a/api/entrypoints/routers.py +++ b/api/entrypoints/routers.py @@ -142,6 +142,10 @@ from oss.src.core.tools.registry import ToolsGatewayRegistry from oss.src.core.tools.service import ToolsService from oss.src.apis.fastapi.tools.router import ToolsRouter +from oss.src.core.triggers.providers.composio import ComposioTriggersAdapter +from oss.src.core.triggers.registry import TriggersGatewayRegistry +from oss.src.core.triggers.service import TriggersService +from oss.src.apis.fastapi.triggers.router import TriggersRouter from oss.src.apis.fastapi.shared.utils import SupportHeadersMiddleware @@ -215,6 +219,9 @@ async def lifespan(*args, **kwargs): for adapter in _composio_connections_adapters.values(): await adapter.close() + for adapter in _composio_triggers_adapters.values(): + await adapter.close() + await _transactions_engine.close() await _analytics_engine.close() await _streams_engine.close() @@ -308,6 +315,11 @@ async def lifespan(*args, **kwargs): "description": "External tool connections and OAuth integrations available to applications.", }, # -- + { + "name": "Triggers", + "description": "Inbound provider event triggers and their watchable event catalog.", + }, + # -- { "name": "Folders", "description": "Organize applications and other resources into folder hierarchies.", @@ -616,6 +628,22 @@ async def lifespan(*args, **kwargs): adapter_registry=tools_adapter_registry, ) +# Triggers adapter + service +_composio_triggers_adapters = {} +if env.composio.enabled: + _composio_triggers_adapters["composio"] = ComposioTriggersAdapter( + api_key=env.composio.api_key, # type: ignore[arg-type] # guarded by .enabled + api_url=env.composio.api_url, + ) + +triggers_adapter_registry = TriggersGatewayRegistry( + adapters=_composio_triggers_adapters, +) + +triggers_service = TriggersService( + adapter_registry=triggers_adapter_registry, +) + _t_services_done = time.perf_counter() - _t_services print(f"[STARTUP] Service initialization completed (+{_t_services_done:.3f}s)") _t_routers = time.perf_counter() @@ -730,6 +758,10 @@ async def lifespan(*args, **kwargs): tools_service=tools_service, ) +triggers = TriggersRouter( + triggers_service=triggers_service, +) + simple_traces = SimpleTracesRouter( simple_traces_service=simple_traces_service, ) @@ -1097,6 +1129,19 @@ async def lifespan(*args, **kwargs): include_in_schema=False, ) +app.include_router( + router=triggers.router, + prefix="/triggers", + tags=["Triggers"], +) + +app.include_router( + router=triggers.router, + prefix="/preview/triggers", + tags=["Triggers"], + include_in_schema=False, +) + app.include_router( router=evaluations.admin_router, prefix="/admin/evaluations", diff --git a/api/oss/src/apis/fastapi/triggers/__init__.py b/api/oss/src/apis/fastapi/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/src/apis/fastapi/triggers/models.py b/api/oss/src/apis/fastapi/triggers/models.py new file mode 100644 index 0000000000..93e9d5ab4e --- /dev/null +++ b/api/oss/src/apis/fastapi/triggers/models.py @@ -0,0 +1,36 @@ +from typing import List, Optional + +from pydantic import BaseModel + +from oss.src.core.triggers.dtos import ( + TriggerCatalogEvent, + TriggerCatalogEventDetails, + TriggerCatalogProvider, +) + + +# --------------------------------------------------------------------------- +# Trigger Catalog +# --------------------------------------------------------------------------- + + +class TriggerCatalogProviderResponse(BaseModel): + count: int = 0 + provider: Optional[TriggerCatalogProvider] = None + + +class TriggerCatalogProvidersResponse(BaseModel): + count: int = 0 + providers: List[TriggerCatalogProvider] = [] + + +class TriggerCatalogEventResponse(BaseModel): + count: int = 0 + event: Optional[TriggerCatalogEventDetails] = None + + +class TriggerCatalogEventsResponse(BaseModel): + count: int = 0 + total: int = 0 + cursor: Optional[str] = None + events: List[TriggerCatalogEvent] = [] diff --git a/api/oss/src/apis/fastapi/triggers/router.py b/api/oss/src/apis/fastapi/triggers/router.py new file mode 100644 index 0000000000..5270682dc4 --- /dev/null +++ b/api/oss/src/apis/fastapi/triggers/router.py @@ -0,0 +1,319 @@ +from functools import wraps +from typing import Optional + +import httpx +from fastapi import APIRouter, HTTPException, Query, Request, status +from fastapi.responses import JSONResponse + +from oss.src.utils.exceptions import intercept_exceptions +from oss.src.utils.logging import get_module_logger +from oss.src.utils.caching import get_cache, set_cache +from oss.src.utils.common import is_ee + +from oss.src.apis.fastapi.triggers.models import ( + TriggerCatalogEventResponse, + TriggerCatalogEventsResponse, + TriggerCatalogProviderResponse, + TriggerCatalogProvidersResponse, +) +from oss.src.core.triggers.exceptions import AdapterError +from oss.src.core.triggers.service import TriggersService + + +if is_ee(): + from ee.src.core.access.permissions.types import Permission + from ee.src.core.access.permissions.service import ( + check_action_access, + FORBIDDEN_EXCEPTION, + ) + +log = get_module_logger(__name__) + + +def handle_adapter_exceptions(): + """Convert only upstream 401 AdapterError failures to 424 Failed Dependency.""" + + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except AdapterError as e: + cause = e.__cause__ + if not ( + isinstance(cause, httpx.HTTPStatusError) + and cause.response is not None + and cause.response.status_code == status.HTTP_401_UNAUTHORIZED + ): + raise + + raise HTTPException( + status_code=status.HTTP_424_FAILED_DEPENDENCY, + detail=e.message, + ) from e + + return wrapper + + return decorator + + +class TriggersRouter: + def __init__( + self, + *, + triggers_service: TriggersService, + ): + self.triggers_service = triggers_service + + self.router = APIRouter() + + # --- Trigger Catalog --- + self.router.add_api_route( + "/catalog/providers/", + self.list_providers, + methods=["GET"], + operation_id="list_trigger_providers", + response_model=TriggerCatalogProvidersResponse, + response_model_exclude_none=True, + ) + self.router.add_api_route( + "/catalog/providers/{provider_key}", + self.get_provider, + methods=["GET"], + operation_id="fetch_trigger_provider", + response_model=TriggerCatalogProviderResponse, + response_model_exclude_none=True, + ) + self.router.add_api_route( + "/catalog/providers/{provider_key}/integrations/{integration_key}/events/", + self.list_events, + methods=["GET"], + operation_id="list_trigger_events", + response_model=TriggerCatalogEventsResponse, + response_model_exclude_none=True, + ) + self.router.add_api_route( + "/catalog/providers/{provider_key}/integrations/{integration_key}/events/{event_key}", + self.get_event, + methods=["GET"], + operation_id="fetch_trigger_event", + response_model=TriggerCatalogEventResponse, + response_model_exclude_none=True, + ) + + # ----------------------------------------------------------------------- + # Trigger Catalog + # ----------------------------------------------------------------------- + + @intercept_exceptions() + @handle_adapter_exceptions() + async def list_providers( + self, + request: Request, + ) -> TriggerCatalogProvidersResponse: + if is_ee(): + has_permission = await check_action_access( + project_id=request.state.project_id, + user_uid=request.state.user_id, + permission=Permission.VIEW_TRIGGERS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + cached = await get_cache( + project_id=None, # catalog is global; not per-project + namespace="triggers:catalog:providers", + key={}, + model=TriggerCatalogProvidersResponse, + ) + if cached: + return cached + + providers = await self.triggers_service.list_providers() + items = list(providers) + + response = TriggerCatalogProvidersResponse( + count=len(items), + providers=items, + ) + + await set_cache( + project_id=None, + namespace="triggers:catalog:providers", + key={}, + value=response, + ttl=5 * 60, + ) + + return response + + @intercept_exceptions() + @handle_adapter_exceptions() + async def get_provider( + self, + request: Request, + provider_key: str, + ) -> TriggerCatalogProviderResponse: + if is_ee(): + has_permission = await check_action_access( + user_uid=request.state.user_id, + project_id=request.state.project_id, + permission=Permission.VIEW_TRIGGERS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + cache_key = {"provider_key": provider_key} + cached = await get_cache( + project_id=None, + namespace="triggers:catalog:provider", + key=cache_key, + model=TriggerCatalogProviderResponse, + ) + if cached: + return cached + + provider = await self.triggers_service.get_provider( + provider_key=provider_key, + ) + if not provider: + return JSONResponse( + status_code=404, + content={"detail": "Provider not found"}, + ) + + response = TriggerCatalogProviderResponse( + count=1, + provider=provider, + ) + + await set_cache( + project_id=None, + namespace="triggers:catalog:provider", + key=cache_key, + value=response, + ttl=5 * 60, + ) + + return response + + @intercept_exceptions() + @handle_adapter_exceptions() + async def list_events( + self, + request: Request, + provider_key: str, + integration_key: str, + *, + query: Optional[str] = Query(default=None), + limit: Optional[int] = Query(default=None), + cursor: Optional[str] = Query(default=None), + ) -> TriggerCatalogEventsResponse: + if is_ee(): + has_permission = await check_action_access( + user_uid=request.state.user_id, + project_id=request.state.project_id, + permission=Permission.VIEW_TRIGGERS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + cache_key = { + "provider_key": provider_key, + "integration_key": integration_key, + "query": query, + "limit": limit, + "cursor": cursor, + } + cached = await get_cache( + project_id=None, + namespace="triggers:catalog:events", + key=cache_key, + model=TriggerCatalogEventsResponse, + ) + if cached: + return cached + + events, next_cursor, total = await self.triggers_service.list_events( + provider_key=provider_key, + integration_key=integration_key, + query=query, + limit=limit, + cursor=cursor, + ) + items = list(events) + + response = TriggerCatalogEventsResponse( + count=len(items), + total=total, + cursor=next_cursor, + events=items, + ) + + await set_cache( + project_id=None, + namespace="triggers:catalog:events", + key=cache_key, + value=response, + ttl=5 * 60, + ) + + return response + + @intercept_exceptions() + @handle_adapter_exceptions() + async def get_event( + self, + request: Request, + provider_key: str, + integration_key: str, + event_key: str, + ) -> TriggerCatalogEventResponse: + if is_ee(): + has_permission = await check_action_access( + user_uid=request.state.user_id, + project_id=request.state.project_id, + permission=Permission.VIEW_TRIGGERS, + ) + if not has_permission: + raise FORBIDDEN_EXCEPTION + + cache_key = { + "provider_key": provider_key, + "integration_key": integration_key, + "event_key": event_key, + } + cached = await get_cache( + project_id=None, + namespace="triggers:catalog:event", + key=cache_key, + model=TriggerCatalogEventResponse, + ) + if cached: + return cached + + event = await self.triggers_service.get_event( + provider_key=provider_key, + integration_key=integration_key, + event_key=event_key, + ) + if not event: + return JSONResponse( + status_code=404, + content={"detail": "Event not found"}, + ) + + response = TriggerCatalogEventResponse( + count=1, + event=event, + ) + + await set_cache( + project_id=None, + namespace="triggers:catalog:event", + key=cache_key, + value=response, + ttl=5 * 60, + ) + + return response diff --git a/api/oss/src/core/triggers/__init__.py b/api/oss/src/core/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/src/core/triggers/dtos.py b/api/oss/src/core/triggers/dtos.py new file mode 100644 index 0000000000..656a7ce56b --- /dev/null +++ b/api/oss/src/core/triggers/dtos.py @@ -0,0 +1,49 @@ +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import BaseModel + + +# --------------------------------------------------------------------------- +# Trigger Enums +# --------------------------------------------------------------------------- + + +class TriggerProviderKind(str, Enum): + COMPOSIO = "composio" + + +# --------------------------------------------------------------------------- +# Trigger Catalog +# +# The catalog leaf is an **event** (Composio "trigger type"), the analogue of a +# tools **action**. An event carries a ``trigger_config`` JSON Schema, the +# analogue of an action's ``input_parameters``. +# --------------------------------------------------------------------------- + + +class TriggerCatalogEvent(BaseModel): + key: str + # + name: str + description: Optional[str] = None + # + provider: Optional[str] = None + integration: Optional[str] = None + # + categories: List[str] = [] + logo: Optional[str] = None + + +class TriggerCatalogEventDetails(TriggerCatalogEvent): + # FROZEN (WS-PRE): the Event DTO carries the event's trigger_config JSON Schema + # — the inbound analogue of an action's input_parameters. + trigger_config: Optional[Dict[str, Any]] = None + payload: Optional[Dict[str, Any]] = None + + +class TriggerCatalogProvider(BaseModel): + key: TriggerProviderKind + # + name: str + description: Optional[str] = None diff --git a/api/oss/src/core/triggers/exceptions.py b/api/oss/src/core/triggers/exceptions.py new file mode 100644 index 0000000000..473b4094a4 --- /dev/null +++ b/api/oss/src/core/triggers/exceptions.py @@ -0,0 +1,36 @@ +from typing import Optional + + +class TriggersError(Exception): + """Base exception for the triggers domain.""" + + def __init__(self, message: str = "Triggers error"): + self.message = message + super().__init__(self.message) + + +class ProviderNotFoundError(TriggersError): + """Raised when the requested provider_key has no registered adapter.""" + + def __init__(self, provider_key: str): + self.provider_key = provider_key + super().__init__(f"Provider not found: {provider_key}") + + +class AdapterError(TriggersError): + """Raised when an adapter operation fails.""" + + def __init__( + self, + *, + provider_key: str, + operation: str, + detail: Optional[str] = None, + ): + self.provider_key = provider_key + self.operation = operation + self.detail = detail + msg = f"Adapter error ({provider_key}.{operation})" + if detail: + msg += f": {detail}" + super().__init__(msg) diff --git a/api/oss/src/core/triggers/interfaces.py b/api/oss/src/core/triggers/interfaces.py new file mode 100644 index 0000000000..2b07ca835f --- /dev/null +++ b/api/oss/src/core/triggers/interfaces.py @@ -0,0 +1,75 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict, List, Optional, Tuple +from uuid import UUID + +from oss.src.core.triggers.dtos import ( + TriggerCatalogEvent, + TriggerCatalogEventDetails, + TriggerCatalogProvider, +) + + +class TriggersGatewayInterface(ABC): + """Port for external trigger providers (Composio, ...). + + FROZEN (WS-PRE) — consumed by WS3 (subscriptions) and WS5 (web catalog). + The catalog reads (``list_events``/``get_event``) back the events catalog; + the subscription verbs build/manage the provider-side trigger instance + (``ti_*``) that WP3 stores on a local subscription row. + """ + + @abstractmethod + async def list_providers(self) -> List[TriggerCatalogProvider]: ... + + @abstractmethod + async def list_events( + self, + *, + integration_key: str, + query: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[TriggerCatalogEvent], Optional[str], int]: + """Returns (items, next_cursor, total_items).""" + ... + + @abstractmethod + async def get_event( + self, + *, + integration_key: str, + event_key: str, + ) -> Optional[TriggerCatalogEventDetails]: + """Return one event's detail, carrying its trigger_config JSON Schema.""" + ... + + @abstractmethod + async def create_subscription( + self, + *, + project_id: UUID, + event_key: str, + connected_account_id: str, + trigger_config: Dict[str, Any], + ) -> str: + """Create the provider-side trigger instance; returns its id (``ti_*``).""" + ... + + @abstractmethod + async def set_subscription_status( + self, + *, + trigger_id: str, + enabled: bool, + ) -> None: + """Enable or disable the provider-side trigger instance.""" + ... + + @abstractmethod + async def delete_subscription( + self, + *, + trigger_id: str, + ) -> None: + """Permanently delete the provider-side trigger instance.""" + ... diff --git a/api/oss/src/core/triggers/providers/__init__.py b/api/oss/src/core/triggers/providers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/src/core/triggers/providers/composio/__init__.py b/api/oss/src/core/triggers/providers/composio/__init__.py new file mode 100644 index 0000000000..9841fc07c1 --- /dev/null +++ b/api/oss/src/core/triggers/providers/composio/__init__.py @@ -0,0 +1,18 @@ +# Avoid importing adapter here to prevent SDK dependency issues in standalone scripts. +# Import directly when needed: +# from oss.src.core.triggers.providers.composio.adapter import ComposioTriggersAdapter + +__all__ = [ + "ComposioTriggersAdapter", +] + + +def __getattr__(name): + """Lazy import to avoid SDK dependency on module import.""" + if name == "ComposioTriggersAdapter": + from oss.src.core.triggers.providers.composio.adapter import ( + ComposioTriggersAdapter, + ) + + return ComposioTriggersAdapter + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/api/oss/src/core/triggers/providers/composio/adapter.py b/api/oss/src/core/triggers/providers/composio/adapter.py new file mode 100644 index 0000000000..20fd9dd212 --- /dev/null +++ b/api/oss/src/core/triggers/providers/composio/adapter.py @@ -0,0 +1,187 @@ +from typing import Any, Dict, List, Optional +from uuid import UUID + +import httpx + +from oss.src.utils.logging import get_module_logger + +from oss.src.core.triggers.dtos import ( + TriggerCatalogProvider, + TriggerProviderKind, +) +from oss.src.core.triggers.interfaces import TriggersGatewayInterface +from oss.src.core.triggers.exceptions import AdapterError +from oss.src.core.triggers.providers.composio.catalog import ( + ComposioTriggersCatalogClient, +) + + +log = get_module_logger(__name__) + +COMPOSIO_DEFAULT_API_URL = "https://backend.composio.dev/api/v3" + + +class ComposioTriggersAdapter(ComposioTriggersCatalogClient, TriggersGatewayInterface): + """Composio V3 triggers adapter — uses httpx directly (no SDK). + + Modeled on ``ComposioToolsAdapter``: own httpx client, ``_get/_post/_delete`` + helpers, slug passthrough. Catalog operations (list/get events) come from + ``ComposioTriggersCatalogClient``; subscription (trigger-instance) management + is implemented here and consumed by WP3. + + REST paths (E5 — verified vs the live Composio API reference): + list events GET /triggers_types?toolkit_slugs={i} + get event GET /triggers_types/{slug} + create/upsert POST /trigger_instances/{slug}/upsert + enable/disable PATCH /trigger_instances/manage/{trigger_id} + delete DELETE /trigger_instances/manage/{trigger_id} + """ + + def __init__( + self, + *, + api_key: str, + api_url: str = COMPOSIO_DEFAULT_API_URL, + ): + self.api_key = api_key + self.api_url = api_url.rstrip("/") + # Shared client — one connection pool for the adapter's lifetime. + # Call close() on shutdown (wired in entrypoints/routers.py lifespan). + self._client = httpx.AsyncClient(timeout=30.0) + + async def close(self) -> None: + """Close the shared HTTP client and release connection pool resources.""" + await self._client.aclose() + + def _headers(self) -> Dict[str, str]: + return { + "x-api-key": self.api_key, + "Content-Type": "application/json", + } + + async def _post( + self, + path: str, + *, + json: Optional[Dict[str, Any]] = None, + ) -> Any: + resp = await self._client.post( + f"{self.api_url}{path}", + headers=self._headers(), + json=json or {}, + ) + if not resp.is_success: + log.error("Composio POST %s → %s: %s", path, resp.status_code, resp.text) + resp.raise_for_status() + return resp.json() + + async def _patch( + self, + path: str, + *, + json: Optional[Dict[str, Any]] = None, + ) -> Any: + resp = await self._client.patch( + f"{self.api_url}{path}", + headers=self._headers(), + json=json or {}, + ) + if not resp.is_success: + log.error("Composio PATCH %s → %s: %s", path, resp.status_code, resp.text) + resp.raise_for_status() + return resp.json() + + async def _delete(self, path: str) -> bool: + resp = await self._client.delete( + f"{self.api_url}{path}", + headers=self._headers(), + ) + resp.raise_for_status() + return True + + # ----------------------------------------------------------------------- + # Catalog — provider listing + # ----------------------------------------------------------------------- + + async def list_providers(self) -> List[TriggerCatalogProvider]: + return [ + TriggerCatalogProvider( + key=TriggerProviderKind.COMPOSIO, + name="Composio", + description="Third-party event triggers via Composio", + ) + ] + + # list_events and get_event are inherited from ComposioTriggersCatalogClient + # and satisfy the TriggersGatewayInterface catalog contract. + + # ----------------------------------------------------------------------- + # Subscriptions (provider-side trigger instances — ti_*) — consumed by WP3 + # ----------------------------------------------------------------------- + + async def create_subscription( + self, + *, + project_id: UUID, + event_key: str, + connected_account_id: str, + trigger_config: Dict[str, Any], + ) -> str: + """Create/upsert the provider-side trigger instance; return its id (ti_*).""" + payload: Dict[str, Any] = { + "connected_account_id": connected_account_id, + "trigger_config": trigger_config or {}, + } + try: + result = await self._post( + f"/trigger_instances/{event_key}/upsert", + json=payload, + ) + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="create_subscription", + detail=str(e), + ) from e + + trigger_id = result.get("trigger_id") or result.get("id") + if not trigger_id: + raise AdapterError( + provider_key="composio", + operation="create_subscription", + detail=f"No trigger_id in upsert response for event '{event_key}'", + ) + return trigger_id + + async def set_subscription_status( + self, + *, + trigger_id: str, + enabled: bool, + ) -> None: + status = "enable" if enabled else "disable" + try: + await self._patch( + f"/trigger_instances/manage/{trigger_id}", + json={"status": status}, + ) + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="set_subscription_status", + detail=str(e), + ) from e + + async def delete_subscription( + self, + *, + trigger_id: str, + ) -> None: + try: + await self._delete(f"/trigger_instances/manage/{trigger_id}") + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="delete_subscription", + detail=str(e), + ) from e diff --git a/api/oss/src/core/triggers/providers/composio/catalog.py b/api/oss/src/core/triggers/providers/composio/catalog.py new file mode 100644 index 0000000000..f773fab8ec --- /dev/null +++ b/api/oss/src/core/triggers/providers/composio/catalog.py @@ -0,0 +1,188 @@ +"""Composio triggers catalog operations — mixin for ComposioTriggersAdapter. + +Provides catalog HTTP calls (list events, get one event) backed by +``self._client``, ``self.api_key``, and ``self.api_url`` which must be supplied +by the concrete subclass (ComposioTriggersAdapter). + +Mirrors ``core/tools/providers/composio/catalog.py`` with ``action → event``: +the tools "action" leaf becomes the triggers "event" leaf (a Composio *trigger +type*), and an action's ``input_parameters`` schema becomes an event's +``trigger_config`` schema. The ``cursor`` value is Composio's native +``next_cursor`` string, passed through as-is. +""" + +from typing import Any, Dict, List, Optional, Tuple + +import httpx + +from oss.src.utils.logging import get_module_logger +from oss.src.core.triggers.dtos import ( + TriggerCatalogEvent, + TriggerCatalogEventDetails, +) +from oss.src.core.triggers.exceptions import AdapterError + + +log = get_module_logger(__name__) + +DEFAULT_PAGE_SIZE = 20 +MAX_PAGE_SIZE = 1000 + + +class ComposioTriggersCatalogClient: + """Catalog mixin for ComposioTriggersAdapter — cursor-based pagination. + + Subclass must set ``self.api_key``, ``self.api_url``, and ``self._client`` + (an ``httpx.AsyncClient``) before calling any method. + """ + + # Annotated for type-checkers; filled in by ComposioTriggersAdapter.__init__ + api_key: str + api_url: str + _client: httpx.AsyncClient + + async def list_events( + self, + *, + integration_key: str, + query: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[TriggerCatalogEvent], Optional[str], int]: + """Fetch one page of events (Composio trigger types) for an integration. + + E5 (verified vs live Composio API reference): GET /triggers_types, + filtered by ``toolkit_slugs``. + """ + page_limit = min(limit, MAX_PAGE_SIZE) if limit else DEFAULT_PAGE_SIZE + + params: Dict[str, Any] = { + "toolkit_slugs": integration_key, + "limit": page_limit, + } + if query: + params["query"] = query + if cursor: + params["cursor"] = cursor + + try: + resp = await self._client.get( + f"{self.api_url}/triggers_types", + headers={"x-api-key": self.api_key, "Content-Type": "application/json"}, + params=params, + timeout=30.0, + ) + resp.raise_for_status() + data = resp.json() + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="list_events", + detail=str(e), + ) from e + + items_raw: List[Dict[str, Any]] = ( + data.get("items", []) if isinstance(data, dict) else data + ) + next_cursor: Optional[str] = ( + data.get("next_cursor") if isinstance(data, dict) else None + ) + total_items: int = ( + data.get("total_items", len(items_raw)) + if isinstance(data, dict) + else len(items_raw) + ) + + items = [_parse_event(item, integration_key) for item in items_raw] + + log.debug( + "[composio] list_events(%s) cursor=%s items=%d total=%d next=%s", + integration_key, + cursor, + len(items), + total_items, + next_cursor, + ) + + return items, next_cursor, total_items + + async def get_event( + self, + *, + integration_key: str, + event_key: str, + ) -> Optional[TriggerCatalogEventDetails]: + """Fetch one event (trigger type) by slug, with its trigger_config schema. + + E5 (verified vs live Composio API reference): GET /triggers_types/{slug}. + Returns None when the event does not exist (404). + """ + try: + resp = await self._client.get( + f"{self.api_url}/triggers_types/{event_key}", + headers={"x-api-key": self.api_key, "Content-Type": "application/json"}, + timeout=15.0, + ) + if resp.status_code == 404: + return None + resp.raise_for_status() + except httpx.HTTPStatusError as e: + if e.response.status_code == 404: + return None + raise AdapterError( + provider_key="composio", + operation="get_event", + detail=str(e), + ) from e + except httpx.HTTPError as e: + raise AdapterError( + provider_key="composio", + operation="get_event", + detail=str(e), + ) from e + + return _parse_event_detail(resp.json(), integration_key) + + +# --------------------------------------------------------------------------- +# Parsers (module-level — no instance state needed) +# --------------------------------------------------------------------------- + + +def _toolkit_slug(item: Dict[str, Any], fallback: str) -> str: + toolkit = item.get("toolkit") + if isinstance(toolkit, dict): + return toolkit.get("slug") or toolkit.get("name") or fallback + if isinstance(toolkit, str): + return toolkit + return fallback + + +def _parse_event(item: Dict[str, Any], integration_key: str) -> TriggerCatalogEvent: + return TriggerCatalogEvent( + key=item.get("slug", ""), + name=item.get("name", ""), + description=item.get("description"), + provider="composio", + integration=_toolkit_slug(item, integration_key), + ) + + +def _parse_event_detail( + item: Dict[str, Any], + integration_key: str, +) -> TriggerCatalogEventDetails: + # The event's required config is the JSON Schema under "config" — the inbound + # analogue of an action's "input_parameters". + trigger_config = item.get("config") or item.get("trigger_config") + payload = item.get("payload") + + return TriggerCatalogEventDetails( + key=item.get("slug", ""), + name=item.get("name", ""), + description=item.get("description"), + provider="composio", + integration=_toolkit_slug(item, integration_key), + trigger_config=trigger_config, + payload=payload, + ) diff --git a/api/oss/src/core/triggers/registry.py b/api/oss/src/core/triggers/registry.py new file mode 100644 index 0000000000..4e641f6202 --- /dev/null +++ b/api/oss/src/core/triggers/registry.py @@ -0,0 +1,27 @@ +from typing import Dict, ItemsView + +from oss.src.core.triggers.interfaces import TriggersGatewayInterface +from oss.src.core.triggers.exceptions import ProviderNotFoundError + + +class TriggersGatewayRegistry: + """Dispatches to the correct adapter based on provider_key.""" + + def __init__( + self, + *, + adapters: Dict[str, TriggersGatewayInterface], + ): + self._adapters = adapters + + def get(self, provider_key: str) -> TriggersGatewayInterface: + adapter = self._adapters.get(provider_key) + if not adapter: + raise ProviderNotFoundError(provider_key) + return adapter + + def keys(self) -> list[str]: + return list(self._adapters.keys()) + + def items(self) -> ItemsView[str, TriggersGatewayInterface]: + return self._adapters.items() diff --git a/api/oss/src/core/triggers/service.py b/api/oss/src/core/triggers/service.py new file mode 100644 index 0000000000..bc08263c2f --- /dev/null +++ b/api/oss/src/core/triggers/service.py @@ -0,0 +1,86 @@ +from typing import List, Optional, Tuple + +from oss.src.utils.logging import get_module_logger + +from oss.src.core.triggers.dtos import ( + TriggerCatalogEvent, + TriggerCatalogEventDetails, + TriggerCatalogProvider, +) +from oss.src.core.triggers.registry import TriggersGatewayRegistry + + +log = get_module_logger(__name__) + + +class TriggersService: + """Triggers domain orchestration. + + WP1 scope is the read-only events catalog. Subscriptions/deliveries CRUD and + ingress/dispatch land in later WPs (WP3/WP4) and will extend this service. + """ + + def __init__( + self, + *, + adapter_registry: TriggersGatewayRegistry, + ): + self.adapter_registry = adapter_registry + + # ----------------------------------------------------------------------- + # Catalog browse + # ----------------------------------------------------------------------- + + async def list_providers(self) -> List[TriggerCatalogProvider]: + """Return all providers across registered adapters.""" + results: List[TriggerCatalogProvider] = [] + for _key, adapter in self.adapter_registry.items(): + providers = await adapter.list_providers() + results.extend(providers) + return results + + async def get_provider( + self, + *, + provider_key: str, + ) -> Optional[TriggerCatalogProvider]: + """Return a single provider by key, or None if not found.""" + adapter = self.adapter_registry.get(provider_key) + providers = await adapter.list_providers() + for p in providers: + if p.key == provider_key: + return p + return None + + async def list_events( + self, + *, + provider_key: str, + integration_key: str, + # + query: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[TriggerCatalogEvent], Optional[str], int]: + """List events for an integration with optional search and pagination.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.list_events( + integration_key=integration_key, + query=query, + limit=limit, + cursor=cursor, + ) + + async def get_event( + self, + *, + provider_key: str, + integration_key: str, + event_key: str, + ) -> Optional[TriggerCatalogEventDetails]: + """Return full event detail including its trigger_config schema, or None.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.get_event( + integration_key=integration_key, + event_key=event_key, + ) diff --git a/api/oss/src/dbs/postgres/triggers/__init__.py b/api/oss/src/dbs/postgres/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/tests/pytest/acceptance/triggers/__init__.py b/api/oss/tests/pytest/acceptance/triggers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py b/api/oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py new file mode 100644 index 0000000000..bdfa94897c --- /dev/null +++ b/api/oss/tests/pytest/acceptance/triggers/test_triggers_catalog.py @@ -0,0 +1,78 @@ +"""Acceptance tests for GET /triggers/catalog/* endpoints (events catalog). + +The provider-catalog endpoints are reachable without any external API key: an +empty catalog is a valid response (no Composio adapter is registered when +``env.composio`` is unset). The event-browse / config-schema fetch make real +Composio calls, so those tests are gated on COMPOSIO_API_KEY being present in +the runner's environment (the same env the API reads). +""" + +import os + +import pytest + + +_COMPOSIO_ENABLED = bool(os.getenv("COMPOSIO_API_KEY")) +_requires_composio = pytest.mark.skipif( + not _COMPOSIO_ENABLED, + reason="needs live Composio credentials (COMPOSIO_API_KEY)", +) + + +class TestTriggersCatalogProviders: + def test_list_providers_returns_200(self, authed_api): + response = authed_api("GET", "/triggers/catalog/providers/") + assert response.status_code == 200 + + def test_list_providers_response_shape(self, authed_api): + body = authed_api("GET", "/triggers/catalog/providers/").json() + assert "count" in body + assert "providers" in body + assert isinstance(body["providers"], list) + + def test_list_providers_count_matches_list(self, authed_api): + body = authed_api("GET", "/triggers/catalog/providers/").json() + assert body["count"] == len(body["providers"]) + + @pytest.mark.skipif( + _COMPOSIO_ENABLED, + reason="catalog is non-empty when Composio is enabled", + ) + def test_list_providers_empty_when_composio_disabled(self, authed_api): + """With env.composio unset, no adapter is registered → empty catalog.""" + body = authed_api("GET", "/triggers/catalog/providers/").json() + assert body["count"] == 0 + assert body["providers"] == [] + + +@_requires_composio +class TestTriggersCatalogEvents: + def test_browse_events_returns_200(self, authed_api): + response = authed_api( + "GET", + "/triggers/catalog/providers/composio/integrations/github/events/", + ) + assert response.status_code == 200 + body = response.json() + assert "events" in body + assert isinstance(body["events"], list) + + def test_fetch_event_config_schema(self, authed_api): + """A single event carries its trigger_config JSON Schema.""" + listing = authed_api( + "GET", + "/triggers/catalog/providers/composio/integrations/github/events/", + ).json() + if not listing["events"]: + pytest.skip("no github events available from Composio") + + event_key = listing["events"][0]["key"] + response = authed_api( + "GET", + f"/triggers/catalog/providers/composio/integrations/github/events/{event_key}", + ) + assert response.status_code == 200 + event = response.json()["event"] + assert event["key"] == event_key + # trigger_config is the inbound analogue of an action's input_parameters + assert "trigger_config" in event