diff --git a/api/oss/src/core/tools/interfaces.py b/api/oss/src/core/tools/interfaces.py index 0e459619e6..0a61d59ee9 100644 --- a/api/oss/src/core/tools/interfaces.py +++ b/api/oss/src/core/tools/interfaces.py @@ -20,53 +20,53 @@ class ToolsGatewayInterface(ABC): @abstractmethod async def list_providers(self) -> List[ToolCatalogProvider]: ... - - @abstractmethod - async def list_integrations( - self, - *, - search: Optional[str] = None, - sort_by: Optional[str] = None, - limit: Optional[int] = None, - cursor: Optional[str] = None, - ) -> Tuple[List[ToolCatalogIntegration], Optional[str], int]: - """Returns (items, next_cursor, total_items).""" - ... - - @abstractmethod - async def get_integration( - self, - *, - integration_key: str, - ) -> Optional[ToolCatalogIntegration]: ... - - @abstractmethod - async def list_actions( - self, - *, - integration_key: str, - query: Optional[str] = None, - categories: Optional[List[str]] = None, - important: Optional[bool] = None, - limit: Optional[int] = None, - cursor: Optional[str] = None, - ) -> Tuple[List[ToolCatalogAction], Optional[str], int]: - """Returns (items, next_cursor, total_items).""" - ... - - @abstractmethod - async def get_action( - self, - *, - integration_key: str, + + @abstractmethod + async def list_integrations( + self, + *, + search: Optional[str] = None, + sort_by: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[ToolCatalogIntegration], Optional[str], int]: + """Returns (items, next_cursor, total_items).""" + ... + + @abstractmethod + async def get_integration( + self, + *, + integration_key: str, + ) -> Optional[ToolCatalogIntegration]: ... + + @abstractmethod + async def list_actions( + self, + *, + integration_key: str, + query: Optional[str] = None, + categories: Optional[List[str]] = None, + important: Optional[bool] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[ToolCatalogAction], Optional[str], int]: + """Returns (items, next_cursor, total_items).""" + ... + + @abstractmethod + async def get_action( + self, + *, + integration_key: str, action_key: str, ) -> Optional[ToolCatalogActionDetails]: ... @abstractmethod async def execute( self, - *, - request: ToolExecutionRequest, - ) -> ToolExecutionResponse: - """Execute a tool action.""" - ... + *, + request: ToolExecutionRequest, + ) -> ToolExecutionResponse: + """Execute a tool action.""" + ... diff --git a/api/oss/src/core/tools/service.py b/api/oss/src/core/tools/service.py index 4b30ca6121..bf8eed2b9d 100644 --- a/api/oss/src/core/tools/service.py +++ b/api/oss/src/core/tools/service.py @@ -1,4 +1,4 @@ -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple from uuid import UUID from oss.src.utils.logging import get_module_logger @@ -18,9 +18,9 @@ log = get_module_logger(__name__) - - -class ToolsService: + + +class ToolsService: def __init__( self, *, @@ -31,95 +31,95 @@ def __init__( self.adapter_registry = adapter_registry # ----------------------------------------------------------------------- - # Catalog browse - # ----------------------------------------------------------------------- - - async def list_providers(self) -> List[ToolCatalogProvider]: - """Return all providers across registered adapters.""" - results: List[ToolCatalogProvider] = [] - for _key, adapter in self.adapter_registry.items(): - providers = await adapter.list_providers() - results.extend(providers) - return results - - async def get_provider( - self, - *, - provider_key: str, - ) -> Optional[ToolCatalogProvider]: - """Return a single provider by key, or None if not found.""" - adapter = self.adapter_registry.get(provider_key) - providers = await adapter.list_providers() - for p in providers: - if p.key == provider_key: - return p - return None - - async def list_integrations( - self, - *, - provider_key: str, - # - search: Optional[str] = None, - sort_by: Optional[str] = None, - limit: Optional[int] = None, - cursor: Optional[str] = None, - ) -> Tuple[List[ToolCatalogIntegration], Optional[str], int]: - """List integrations for a provider with optional filtering and pagination.""" - adapter = self.adapter_registry.get(provider_key) - integrations, next_cursor, total = await adapter.list_integrations( - search=search, - sort_by=sort_by, - limit=limit, - cursor=cursor, - ) - return integrations, next_cursor, total - - async def get_integration( - self, - *, - provider_key: str, - integration_key: str, - ) -> Optional[ToolCatalogIntegration]: - """Return a single integration by key, or None if not found.""" - adapter = self.adapter_registry.get(provider_key) - return await adapter.get_integration(integration_key=integration_key) - - async def list_actions( - self, - *, - provider_key: str, - integration_key: str, - # - query: Optional[str] = None, - categories: Optional[List[str]] = None, - important: Optional[bool] = None, - limit: Optional[int] = None, - cursor: Optional[str] = None, - ) -> Tuple[List[ToolCatalogAction], Optional[str], int]: - """List actions for an integration with optional search and pagination.""" - adapter = self.adapter_registry.get(provider_key) - return await adapter.list_actions( - integration_key=integration_key, - query=query, - categories=categories, - important=important, - limit=limit, - cursor=cursor, - ) - - async def get_action( - self, - *, - provider_key: str, - integration_key: str, - action_key: str, - ) -> Optional[ToolCatalogActionDetails]: - """Return full action detail including input/output schema, or None if not found.""" - adapter = self.adapter_registry.get(provider_key) - return await adapter.get_action( - integration_key=integration_key, - action_key=action_key, + # Catalog browse + # ----------------------------------------------------------------------- + + async def list_providers(self) -> List[ToolCatalogProvider]: + """Return all providers across registered adapters.""" + results: List[ToolCatalogProvider] = [] + for _key, adapter in self.adapter_registry.items(): + providers = await adapter.list_providers() + results.extend(providers) + return results + + async def get_provider( + self, + *, + provider_key: str, + ) -> Optional[ToolCatalogProvider]: + """Return a single provider by key, or None if not found.""" + adapter = self.adapter_registry.get(provider_key) + providers = await adapter.list_providers() + for p in providers: + if p.key == provider_key: + return p + return None + + async def list_integrations( + self, + *, + provider_key: str, + # + search: Optional[str] = None, + sort_by: Optional[str] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[ToolCatalogIntegration], Optional[str], int]: + """List integrations for a provider with optional filtering and pagination.""" + adapter = self.adapter_registry.get(provider_key) + integrations, next_cursor, total = await adapter.list_integrations( + search=search, + sort_by=sort_by, + limit=limit, + cursor=cursor, + ) + return integrations, next_cursor, total + + async def get_integration( + self, + *, + provider_key: str, + integration_key: str, + ) -> Optional[ToolCatalogIntegration]: + """Return a single integration by key, or None if not found.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.get_integration(integration_key=integration_key) + + async def list_actions( + self, + *, + provider_key: str, + integration_key: str, + # + query: Optional[str] = None, + categories: Optional[List[str]] = None, + important: Optional[bool] = None, + limit: Optional[int] = None, + cursor: Optional[str] = None, + ) -> Tuple[List[ToolCatalogAction], Optional[str], int]: + """List actions for an integration with optional search and pagination.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.list_actions( + integration_key=integration_key, + query=query, + categories=categories, + important=important, + limit=limit, + cursor=cursor, + ) + + async def get_action( + self, + *, + provider_key: str, + integration_key: str, + action_key: str, + ) -> Optional[ToolCatalogActionDetails]: + """Return full action detail including input/output schema, or None if not found.""" + adapter = self.adapter_registry.get(provider_key) + return await adapter.get_action( + integration_key=integration_key, + action_key=action_key, ) # ----------------------------------------------------------------------- @@ -127,10 +127,10 @@ async def get_action( # ----------------------------------------------------------------------- async def query_connections( - self, - *, - project_id: UUID, - # + self, + *, + project_id: UUID, + # provider_key: Optional[str] = None, integration_key: Optional[str] = None, is_active: Optional[bool] = True, @@ -153,10 +153,10 @@ async def list_connections( project_id=project_id, provider_key=provider_key, integration_key=integration_key, - ) - - async def get_connection( - self, + ) + + async def get_connection( + self, *, project_id: UUID, connection_id: UUID, @@ -198,12 +198,12 @@ async def create_connection( project_id=project_id, user_id=user_id, # - connection_create=connection_create, - ) - - async def delete_connection( - self, - *, + connection_create=connection_create, + ) + + async def delete_connection( + self, + *, project_id: UUID, connection_id: UUID, ) -> bool: @@ -211,9 +211,9 @@ async def delete_connection( project_id=project_id, connection_id=connection_id, ) - - async def revoke_connection( - self, + + async def revoke_connection( + self, *, project_id: UUID, connection_id: UUID, @@ -226,7 +226,7 @@ async def revoke_connection( async def refresh_connection( self, *, - project_id: UUID, + project_id: UUID, connection_id: UUID, # force: bool = False, @@ -239,27 +239,27 @@ async def refresh_connection( # ----------------------------------------------------------------------- # Tool execution - # ----------------------------------------------------------------------- - - async def execute_tool( - self, - *, - provider_key: str, - integration_key: str, - action_key: str, - provider_connection_id: str, - user_id: Optional[str] = None, - arguments: Dict[str, Any], - ) -> ToolExecutionResponse: - """Execute a tool action using the provider adapter.""" - adapter = self.adapter_registry.get(provider_key) - - return await adapter.execute( - request=ToolExecutionRequest( - integration_key=integration_key, - action_key=action_key, - provider_connection_id=provider_connection_id, - user_id=user_id, - arguments=arguments, - ), - ) + # ----------------------------------------------------------------------- + + async def execute_tool( + self, + *, + provider_key: str, + integration_key: str, + action_key: str, + provider_connection_id: str, + user_id: Optional[str] = None, + arguments: Dict[str, Any], + ) -> ToolExecutionResponse: + """Execute a tool action using the provider adapter.""" + adapter = self.adapter_registry.get(provider_key) + + return await adapter.execute( + request=ToolExecutionRequest( + integration_key=integration_key, + action_key=action_key, + provider_connection_id=provider_connection_id, + user_id=user_id, + arguments=arguments, + ), + ) diff --git a/api/oss/src/core/webhooks/delivery.py b/api/oss/src/core/webhooks/delivery.py index 280c3e1a8b..9ca44f3e87 100644 --- a/api/oss/src/core/webhooks/delivery.py +++ b/api/oss/src/core/webhooks/delivery.py @@ -8,7 +8,7 @@ import httpx -from agenta.sdk.utils.resolvers import resolve_json_selector +from agenta.sdk.utils.resolvers import resolve_target_fields from oss.src.core.webhooks.types import ( EVENT_CONTEXT_FIELDS, @@ -23,8 +23,6 @@ log = get_module_logger(__name__) -MAX_RESOLVE_DEPTH = 10 - NON_OVERRIDABLE_HEADERS = { "content-type", "content-length", @@ -92,29 +90,6 @@ def _merge_headers( return merged -def resolve_payload_fields( - fields: Any, - context: Dict[str, Any], - *, - _depth: int = 0, -) -> Any: - if _depth > MAX_RESOLVE_DEPTH: - return None - if isinstance(fields, dict): - return { - k: resolve_payload_fields(v, context, _depth=_depth + 1) - for k, v in fields.items() - } - if isinstance(fields, list): - return [ - resolve_payload_fields(item, context, _depth=_depth + 1) for item in fields - ] - try: - return resolve_json_selector(fields, context) - except Exception: - return None - - def prepare_webhook_request( *, project_id: UUID, @@ -147,7 +122,7 @@ def prepare_webhook_request( } resolved_fields = payload_fields if payload_fields is not None else "$" - payload = resolve_payload_fields(resolved_fields, context) + payload = resolve_target_fields(resolved_fields, context) base_data = WebhookDeliveryData( event_type=typed_event_type, diff --git a/api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py b/api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py index 1ca605df49..0f35b272a3 100644 --- a/api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py +++ b/api/oss/tests/pytest/unit/webhooks/test_webhooks_tasks.py @@ -5,11 +5,14 @@ from unittest.mock import patch -from oss.src.core.webhooks.delivery import ( +from agenta.sdk.utils.resolvers import ( MAX_RESOLVE_DEPTH, + resolve_target_fields, +) + +from oss.src.core.webhooks.delivery import ( NON_OVERRIDABLE_HEADERS, _merge_headers, - resolve_payload_fields, ) from oss.src.core.webhooks.types import ( EVENT_CONTEXT_FIELDS, @@ -35,18 +38,18 @@ "scope": {"project_id": "proj-1"}, } -_RESOLVE_PATH = "oss.src.core.webhooks.delivery.resolve_json_selector" +_RESOLVE_PATH = "agenta.sdk.utils.resolvers.resolve_json_selector" # --------------------------------------------------------------------------- -# resolve_payload_fields +# resolve_target_fields # --------------------------------------------------------------------------- class TestResolvePayloadFields: def test_dict_recurses_into_values(self): with patch(_RESOLVE_PATH, side_effect=lambda expr, ctx: f"resolved:{expr}"): - result = resolve_payload_fields( + result = resolve_target_fields( {"key": "$.event.event_id"}, _MOCK_CONTEXT, ) @@ -54,7 +57,7 @@ def test_dict_recurses_into_values(self): def test_list_recurses_into_items(self): with patch(_RESOLVE_PATH, side_effect=lambda expr, ctx: f"resolved:{expr}"): - result = resolve_payload_fields( + result = resolve_target_fields( ["$.event.event_id", "$.scope.project_id"], _MOCK_CONTEXT, ) @@ -65,12 +68,12 @@ def test_list_recurses_into_items(self): def test_primitive_delegates_to_resolve_json_selector(self): with patch(_RESOLVE_PATH, return_value="abc123") as mock_resolve: - result = resolve_payload_fields("$.event.event_id", _MOCK_CONTEXT) + result = resolve_target_fields("$.event.event_id", _MOCK_CONTEXT) assert result == "abc123" mock_resolve.assert_called_once_with("$.event.event_id", _MOCK_CONTEXT) def test_depth_exceeds_limit_returns_none(self): - result = resolve_payload_fields( + result = resolve_target_fields( "$.event.event_id", _MOCK_CONTEXT, _depth=MAX_RESOLVE_DEPTH + 1, @@ -79,7 +82,7 @@ def test_depth_exceeds_limit_returns_none(self): def test_depth_at_limit_still_resolves(self): with patch(_RESOLVE_PATH, return_value="ok"): - result = resolve_payload_fields( + result = resolve_target_fields( "$.event.event_id", _MOCK_CONTEXT, _depth=MAX_RESOLVE_DEPTH, @@ -88,7 +91,7 @@ def test_depth_at_limit_still_resolves(self): def test_resolve_error_returns_none(self): with patch(_RESOLVE_PATH, side_effect=ValueError("bad selector")): - result = resolve_payload_fields("$.bad[", _MOCK_CONTEXT) + result = resolve_target_fields("$.bad[", _MOCK_CONTEXT) assert result is None def test_error_leaf_in_dict_does_not_affect_other_keys(self): @@ -98,7 +101,7 @@ def side_effect(expr, ctx): return "good" with patch(_RESOLVE_PATH, side_effect=side_effect): - result = resolve_payload_fields( + result = resolve_target_fields( {"ok": "$.event.event_id", "bad": "$.bad["}, _MOCK_CONTEXT, ) @@ -106,14 +109,14 @@ def side_effect(expr, ctx): def test_dollar_selector_resolves_full_context(self): with patch(_RESOLVE_PATH, return_value=_MOCK_CONTEXT) as mock_resolve: - result = resolve_payload_fields("$", _MOCK_CONTEXT) + result = resolve_target_fields("$", _MOCK_CONTEXT) assert result == _MOCK_CONTEXT mock_resolve.assert_called_once_with("$", _MOCK_CONTEXT) def test_nested_dict_depth_tracking(self): # Three levels deep should still work (depth starts at 0) with patch(_RESOLVE_PATH, return_value="leaf"): - result = resolve_payload_fields( + result = resolve_target_fields( {"a": {"b": {"c": "$.event.event_id"}}}, _MOCK_CONTEXT, ) diff --git a/sdks/python/agenta/sdk/utils/resolvers.py b/sdks/python/agenta/sdk/utils/resolvers.py index b7b51ed5c4..a512a27489 100644 --- a/sdks/python/agenta/sdk/utils/resolvers.py +++ b/sdks/python/agenta/sdk/utils/resolvers.py @@ -12,6 +12,8 @@ log = get_module_logger(__name__) +MAX_RESOLVE_DEPTH = 10 + # ========= Scheme detection ========= @@ -132,3 +134,33 @@ def resolve_json_selector(value: Any, data: Dict[str, Any]) -> Any: log.debug("Failed to resolve JSON selector %r: %s", value, exc) return None return value + + +def resolve_target_fields( + template: Any, + context: Dict[str, Any], + *, + _depth: int = 0, +) -> Any: + """Resolve a template into a target by resolving its selector leaves. + + Walks ``template`` (arbitrary JSON); each leaf is passed through + ``resolve_json_selector`` against *context* (``$``/``/`` selectors resolved, + everything else returned literally). Null-on-miss, depth-capped at + ``MAX_RESOLVE_DEPTH``. + """ + if _depth > MAX_RESOLVE_DEPTH: + return None + if isinstance(template, dict): + return { + k: resolve_target_fields(v, context, _depth=_depth + 1) + for k, v in template.items() + } + if isinstance(template, list): + return [ + resolve_target_fields(item, context, _depth=_depth + 1) for item in template + ] + try: + return resolve_json_selector(template, context) + except Exception: + return None