diff --git a/CHANGELOG.md b/CHANGELOG.md index e92dc4d..8d94f31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,40 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Changed + +- **`Context.create()` signature unified across all SDKs (apcore PROTOCOL_SPEC §"Contract: Context.create", [apcore#66](https://github.com/aiperceivable/apcore/issues/66)).** The factory now accepts **exactly** six caller-supplied fields in this order: `identity`, `trace_parent`, `cancel_token`, `data`, `services`, `global_deadline`. The previous `executor=` parameter is **removed**; `executor` is now bound by the Executor at pipeline entry via the new SDK-internal `Context._bind_executor(executor)` helper (see PROTOCOL_SPEC §"Contract: Executor binding to Context"). `caller_id` remains managed exclusively by `Context.child()` and was never a public input. Two parameters are newly first-class: `cancel_token` (eliminates the post-hoc `ctx.cancel_token = token` anti-pattern) and `global_deadline` (previously only settable via mutation). `Executor.call` / `call_async` / `stream` / `call_async_with_trace` / `_validate_async` all bind `self` to the supplied or auto-created Context before pipeline step 1; same-instance rebinds are idempotent noops, cross-Executor rebinds raise the new `ContextBindingError` (code `CONTEXT_BINDING_ERROR`). The root-call `global_deadline` computation moved out of "context-was-None" branch into a general "root call" check in `BuiltinContextStep`, ensuring local-config-driven recomputation per PROTOCOL_SPEC §"Contract: `global_deadline` distributed semantics" — including deserialized Contexts arriving at remote nodes. Tests and `examples/cancel_token.py` are updated to the new shape. **Breaking** for callers that passed `executor=` to `Context.create()` — pre-release v0.22.0, acceptable. + +- **`TaskStore` Protocol is now fully async (D-17 / A-D-AT-04).** All five methods (`save`, `get`, `delete`, `list`, `list_expired`) are declared `async def` on the Protocol and on `InMemoryTaskStore`. Custom stores written against the pre-0.22.x sync surface must be migrated; `AsyncTaskManager` keeps a transitional compatibility shim that awaits a returned coroutine if a legacy store still exposes sync methods. The uniform async shape unblocks Redis/SQL/network-backed stores without an extra blocking adapter, matching the TypeScript and Rust SDKs. + +- **`ReaperHandle.stop()` and `AsyncTaskManager.stop_reaper()` are now `async` and drain the reaper task (D-11 / A-D-AT-03).** Callers must `await handle.stop()` / `await manager.stop_reaper()`. After the coroutine returns the underlying `asyncio.Task` is guaranteed to be settled — previously the sync `stop()` only requested cancellation and required a manual `await asyncio.sleep(0)` for the task to finish. `AsyncTaskManager.shutdown()` now awaits `stop_reaper()` directly. + +- **`AsyncTaskManager.get_status()` and `list_tasks()` return defensive snapshots (A-D-AT-06).** Both methods now hand back shallow copies of `TaskInfo` via `dataclasses.replace`, matching the TypeScript SDK's `{ ...info }` and the Rust SDK's `info.clone()`. Mutating the returned objects no longer corrupts the live store. Async-friendly twins `get_status_async()` / `list_tasks_async()` are available for I/O-backed stores. + +- **`AsyncTaskManager.cleanup()` is now `async`.** Required because the store contract is async; callers must `await manager.cleanup(...)`. The reaper background loop already awaits internally — only direct in-process callers are affected. + +- **Legacy `RetryPolicy` defaults `max_retries` to `0` and emits `DeprecationWarning` on instantiation (D-14 / A-D-AT-09).** Earlier builds silently enabled three retries when callers used `RetryPolicy()` without arguments, contradicting the opt-in retry contract. The class is retained for one release; new code should use `RetryConfig` (canonical, ms-based, no deprecation noise). + +### Fixed + +- **Cancel token observed at Step 2 of the execution pipeline (D-21 / A-D-EXEC-002).** `BuiltinCallChainGuard` now checks `context.cancel_token.is_cancelled` before running any guard work; a cancelled token short-circuits with `ExecutionCancelledError` before ACL, middleware, validation, or module execution. Combined with the existing Step 8 check, the pipeline now satisfies the two-point cancel-token invariant — single-check implementations were leaking compute through ACL/middleware/validation even when the caller had already cancelled. + +- **`MiddlewareChainError` unwrap rule (D-22 / A-D-EXEC-005).** `Executor._recover_from_call_error` now unwraps `MiddlewareChainError` and propagates the original typed cause (e.g. `ApprovalDeniedError`, `ACLDeniedError`) unchanged. Previously the wrapper was collapsed to a generic `ModuleExecuteError`, breaking callers that dispatch on the typed error (notably MCP/A2A bridges keying on `APPROVAL_DENIED` vs `MODULE_EXECUTE_ERROR`). Mirrors the TypeScript and Rust SDK semantics. + +- **`Registry.register_internal` ephemeral-namespace rejection covers bare `"ephemeral"` (A-D-REG-002).** Previously the check used `module_id.startswith("ephemeral.")`, which missed the bare ID `"ephemeral"` and contradicted the canonical `_is_ephemeral` classifier used everywhere else in the registry. The helper is now shared between both call sites. + +- **Discover-path registration enforces the Issue #65 deferred-publish invariant (A-D-REG-003).** `_register_in_order` now reserves an in-flight slot, runs `on_load()` outside the lock, and only publishes into `_modules` / `_versioned_modules` on success. Previously the discover path published *before* invoking `on_load` and relied on rollback, leaving a window in which `registry.get()` callers could observe a module whose `on_load`-installed state (warmed pools, primed caches) was incomplete. + +- **`Registry.register_internal` enforces the Issue #65 deferred-publish invariant (A-D-REG-004).** `register_internal` now routes through the same three-phase protocol as `register()`: reserve in-flight slot → run `on_load` outside the lock → publish. On failure it removes the in-flight slot, emits `apcore.registry.module_load_failed`, and re-raises the original exception unchanged. The invariant now holds uniformly for every registration path (public `register`, `register_internal`, and discover). + +- **Discover-path emits `apcore.registry.module_load_failed` on `on_load` failure (A-D-REG-005).** Earlier the discover path logged at ERROR and silently dropped the module; subscribers had no portable hook to detect partial-init failures from the auto-discovery pipeline. The event payload matches the one emitted by `register()` (Issue #65) so a single subscriber covers all registration paths. + +### Removed + +- **Misplaced spec-style docs (B-005).** Deleted `docs/features/async-task-evolution.md`, `docs/features/middleware-architecture-hardening.md`, and `docs/async-task-evolution/test-cases.md`. Per the apcore protocol-spec repo policy (`apcore/CLAUDE.md`), implementation repos contain only code and a README — feature specs, test-case matrices, and design notes live in the apcore spec repo. The deleted files were also stale (referenced the obsolete `TaskStore.put` method and the removed `TaskStatus.RETRYING` enum value); the canonical authority is the implementation plus the upstream `apcore/docs/features/async-tasks.md` spec. + ## [0.22.0] - 2026-05-20 ### Added diff --git a/docs/async-task-evolution/test-cases.md b/docs/async-task-evolution/test-cases.md deleted file mode 100644 index 60bfe26..0000000 --- a/docs/async-task-evolution/test-cases.md +++ /dev/null @@ -1,212 +0,0 @@ -# Test Cases: AsyncTask Evolution (Issue #34) - -## Coverage Matrix - -| Dimension | Cases | -|---|---| -| TaskStore protocol conformance | TC-001, TC-002 | -| InMemoryTaskStore operations | TC-003, TC-004, TC-005, TC-006 | -| AsyncTaskManager + custom store | TC-007, TC-008 | -| RetryPolicy.delay_for() formulas | TC-009, TC-010, TC-011 | -| Retry lifecycle (attempt_number, status) | TC-012, TC-013, TC-014, TC-015, TC-016 | -| Retry + cancellation interaction | TC-017 | -| Reaper auto-cleanup | TC-018, TC-019 | -| Reaper lifecycle (start/stop/shutdown) | TC-020, TC-021, TC-022 | -| Reaper guard (double-start) | TC-023 | -| Regression: existing behaviour unchanged | TC-024, TC-025 | - ---- - -## Test Cases - -### TaskStore — Protocol Conformance - -**TC-001** `InMemoryTaskStore satisfies TaskStore protocol` -- Pre: none -- Steps: `from apcore.async_task import InMemoryTaskStore, TaskStore; assert isinstance(InMemoryTaskStore(), TaskStore)` -- Expected: no assertion error -- Priority: P0 - -**TC-002** `Custom class with get/put/delete/list satisfies TaskStore protocol` -- Pre: Define a minimal class implementing all four methods with correct signatures -- Steps: `isinstance(MinimalStore(), TaskStore)` → True -- Expected: True -- Priority: P1 - ---- - -### InMemoryTaskStore — Operations - -**TC-003** `put() then get() returns same TaskInfo` -- Pre: empty store -- Steps: `store.put(info); result = store.get(info.task_id)` -- Expected: `result is info` -- Priority: P0 - -**TC-004** `get() for unknown id returns None` -- Pre: empty store -- Steps: `store.get("no-such-id")` -- Expected: `None` -- Priority: P0 - -**TC-005** `delete() removes entry; get() returns None afterwards` -- Pre: `store.put(info)` -- Steps: `store.delete(info.task_id); store.get(info.task_id)` -- Expected: `None` -- Priority: P0 - -**TC-006** `list() with status filter returns only matching entries` -- Pre: put one COMPLETED and one FAILED TaskInfo -- Steps: `store.list(status=TaskStatus.COMPLETED)` -- Expected: list of length 1 containing the COMPLETED entry -- Priority: P0 - ---- - -### AsyncTaskManager + Custom Store - -**TC-007** `AsyncTaskManager accepts custom TaskStore at construction` -- Pre: instantiate `SpyStore` that records all calls -- Steps: `mgr = AsyncTaskManager(executor, store=spy); await mgr.submit("test.simple", {}); await asyncio.sleep(0.1)` -- Expected: `spy.put_calls` contains at least 2 entries (PENDING, then COMPLETED); `mgr.get_status()` routes through spy -- Priority: P0 - -**TC-008** `All task state transitions call store.put()` -- Pre: SpyStore records every `put(info)` call -- Steps: submit a simple module, wait for completion -- Expected: put called with statuses [PENDING, RUNNING, COMPLETED] in that order -- Priority: P1 - ---- - -### RetryPolicy — Delay Formulas - -**TC-009** `RetryPolicy.delay_for() fixed strategy` -- Pre: `policy = RetryPolicy(max_retries=3, backoff=BackoffStrategy.FIXED, base_delay_seconds=2.0)` -- Steps: `[policy.delay_for(n) for n in range(1, 4)]` -- Expected: `[2.0, 2.0, 2.0]` -- Priority: P0 - -**TC-010** `RetryPolicy.delay_for() linear strategy` -- Pre: `policy = RetryPolicy(max_retries=3, backoff=BackoffStrategy.LINEAR, base_delay_seconds=1.0)` -- Steps: `[policy.delay_for(n) for n in range(1, 4)]` -- Expected: `[1.0, 2.0, 3.0]` -- Priority: P0 - -**TC-011** `RetryPolicy.delay_for() exponential strategy` -- Pre: `policy = RetryPolicy(max_retries=3, backoff=BackoffStrategy.EXPONENTIAL, base_delay_seconds=1.0)` -- Steps: `[policy.delay_for(n) for n in range(1, 4)]` -- Expected: `[1.0, 2.0, 4.0]` -- Priority: P0 - ---- - -### Retry Lifecycle - -**TC-012** `Task with no retry_policy goes straight to FAILED on error` -- Pre: register `FailingModule`; no retry_policy -- Steps: `await mgr.submit("failing", {}); await asyncio.sleep(0.1); mgr.get_status(tid)` -- Expected: status == FAILED, attempt_number == 0 -- Priority: P0 - -**TC-013** `Task with retry_policy retries up to max_retries then FAILED` -- Pre: `FailingModule` always raises; `RetryPolicy(max_retries=2, backoff=FIXED, base_delay_seconds=0.01)` -- Steps: submit, wait long enough for all retries, check final TaskInfo -- Expected: status == FAILED, attempt_number == 2, max_retries == 2 -- Priority: P0 - -**TC-014** `attempt_number increments on each retry` -- Pre: SpyStore records all put() calls; `RetryPolicy(max_retries=2, base_delay_seconds=0.01)`; FailingModule -- Steps: submit, wait, inspect put() history -- Expected: put() called with attempt_number 0, 1, 2 over the task lifetime -- Priority: P1 - -**TC-015** `Status transitions: RUNNING → RETRYING → RUNNING → FAILED` -- Pre: SpyStore; `RetryPolicy(max_retries=1, backoff=FIXED, base_delay_seconds=0.01)`; FailingModule -- Steps: collect statuses from all put() calls -- Expected: sequence includes RUNNING, RETRYING, RUNNING, FAILED -- Priority: P0 - -**TC-016** `Task succeeds on retry N: status COMPLETED, attempt_number == N` -- Pre: Module that fails on attempt 0, succeeds on attempt 1; - `RetryPolicy(max_retries=2, backoff=FIXED, base_delay_seconds=0.01)` -- Steps: submit, wait, check final TaskInfo -- Expected: status == COMPLETED, attempt_number == 1 -- Priority: P0 - -**TC-017** `Cancelling a RETRYING task stops the retry loop` -- Pre: FailingModule; `RetryPolicy(max_retries=5, base_delay_seconds=10.0)` (long delay so task is in RETRYING state) -- Steps: submit, wait for RETRYING status, cancel -- Expected: status == CANCELLED, further retries do not execute -- Priority: P1 - ---- - -### Reaper — Auto-cleanup - -**TC-018** `Reaper removes terminal tasks older than max_age automatically` -- Pre: `mgr = AsyncTaskManager(executor); await mgr.start_reaper(interval_seconds=0.05, max_age_seconds=0.0)` -- Steps: submit simple module, wait for completion, wait 2× interval, check -- Expected: task no longer returned by `get_status()` -- Priority: P0 - -**TC-019** `Reaper does NOT remove active (PENDING/RUNNING/RETRYING) tasks` -- Pre: reaper running; slow module submitted -- Steps: wait one interval cycle, check slow task -- Expected: slow task still present with status PENDING or RUNNING -- Priority: P0 - ---- - -### Reaper Lifecycle - -**TC-020** `stop_reaper() halts background cleanup after next cycle` -- Pre: start reaper with short interval -- Steps: `mgr.stop_reaper()`; submit and complete a task; wait 2× interval -- Expected: completed task still present (reaper no longer running) -- Priority: P1 - -**TC-021** `shutdown() stops the reaper automatically` -- Pre: reaper started -- Steps: `await mgr.shutdown()` -- Expected: no background reaper task remains; no errors raised -- Priority: P0 - -**TC-022** `stop_reaper() when no reaper is running is a no-op` -- Pre: reaper never started -- Steps: `mgr.stop_reaper()` -- Expected: no error raised -- Priority: P1 - -**TC-023** `start_reaper() twice raises RuntimeError` -- Pre: reaper already started -- Steps: call `mgr.start_reaper()` again -- Expected: `RuntimeError` raised; original reaper continues running -- Priority: P0 - ---- - -### Regression - -**TC-024** `All existing TaskStatus values still present` -- Pre: none -- Steps: check PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, RETRYING all exist -- Expected: all six values accessible on `TaskStatus` -- Priority: P0 - -**TC-025** `Existing AsyncTaskManager API unchanged when no retry_policy / no store / no reaper` -- Pre: construct `AsyncTaskManager(executor)` with no optional args -- Steps: run existing lifecycle (submit → complete, cancel, cleanup, shutdown) -- Expected: all existing tests pass without modification -- Priority: P0 - ---- - -## Gap Analysis - -| Gap | Risk | Mitigation | -|---|---|---| -| Reaper timer precision under test may be flaky | Medium | Use `asyncio.sleep` multiples of interval in tests; keep intervals ≥ 50ms | -| SpyStore must be thread-safe if reaper runs concurrently | Low | asyncio single-threaded; list.append is safe | -| `delay_for(attempt=0)` edge case not explicitly tested | Low | Fixed/linear/exp all defined for attempt ≥ 1; document that attempt is 1-indexed | -| Retry + concurrency semaphore interaction | Low | RETRYING does not hold semaphore (sleep is outside `async with self._semaphore`) | diff --git a/docs/features/async-task-evolution.md b/docs/features/async-task-evolution.md deleted file mode 100644 index 641f7cd..0000000 --- a/docs/features/async-task-evolution.md +++ /dev/null @@ -1,155 +0,0 @@ -# Feature: AsyncTask Evolution (Issue #34) - -## Goal - -Evolve `AsyncTaskManager` with three capabilities: a pluggable `TaskStore` abstraction for -task-state storage, a `RetryPolicy` mechanism with configurable backoff for failed tasks, -and a background `Reaper` that automatically removes stale terminal-state tasks. - -## Scope - -### In Scope - -- `TaskStore` protocol (sync) with `get / put / delete / list` operations -- `InMemoryTaskStore` as the default implementation (replaces `self._tasks: dict`) -- `AsyncTaskManager.__init__` accepts optional `store: TaskStore` (default: `InMemoryTaskStore`) -- `RetryPolicy` dataclass: `max_retries`, `backoff` (`"fixed" | "linear" | "exponential"`), - `base_delay_seconds` -- `submit()` accepts optional `retry_policy: RetryPolicy | None` -- `TaskInfo` gains `attempt_number: int = 0` and `max_retries: int = 0` -- New `TaskStatus.RETRYING` state visible during the backoff delay window -- Reaper: `AsyncTaskManager.start_reaper(interval_seconds, max_age_seconds)` and - `stop_reaper()` methods; `shutdown()` stops the reaper automatically -- Updated public exports in `apcore/__init__.py`: - `TaskStore`, `InMemoryTaskStore`, `RetryPolicy` -- Full test coverage (≥90%) for all new paths - -### Out of Scope - -- Persistent store implementations (Redis, PostgreSQL, SQLite) -- Async `TaskStore` variant (can be added later following the ACL handler dual-protocol pattern) -- Task priorities or dependencies -- Distributed task queues - -## Affected Modules - -- `src/apcore/async_task.py` — all new logic lives here; `TaskInfo`, `TaskStatus`, - `AsyncTaskManager` all extended -- `src/apcore/__init__.py` — add `TaskStore`, `InMemoryTaskStore`, `RetryPolicy` to exports -- `src/apcore/errors.py` — no new errors expected; existing `TaskLimitExceededError` unchanged -- `tests/test_async_task.py` — extend with new test classes for store, retry, reaper - -## New Types / Modules - -No new files. All additions are in `src/apcore/async_task.py`. - -## Technical Approach - -### TaskStore Protocol + InMemoryTaskStore - -```python -class TaskStore(Protocol): - def get(self, task_id: str) -> TaskInfo | None: ... - def put(self, info: TaskInfo) -> None: ... - def delete(self, task_id: str) -> None: ... - def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: ... - -class InMemoryTaskStore: - def __init__(self) -> None: - self._data: dict[str, TaskInfo] = {} - def get(self, task_id: str) -> TaskInfo | None: ... - def put(self, info: TaskInfo) -> None: ... - def delete(self, task_id: str) -> None: ... - def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: ... -``` - -`AsyncTaskManager.__init__` changes: -- Remove `self._tasks: dict[str, TaskInfo]` -- Add `self._store: TaskStore = store or InMemoryTaskStore()` -- All internal references to `self._tasks` route through `self._store` - -### RetryPolicy + RETRYING Status - -```python -class BackoffStrategy(str, Enum): - FIXED = "fixed" - LINEAR = "linear" - EXPONENTIAL = "exponential" - -@dataclass -class RetryPolicy: - max_retries: int = 3 - backoff: BackoffStrategy = BackoffStrategy.EXPONENTIAL - base_delay_seconds: float = 1.0 - - def delay_for(self, attempt: int) -> float: - """Return wait seconds before attempt `attempt` (1-indexed).""" -``` - -Delay formulas (attempt is 1-indexed retry number): -- `fixed`: `base_delay_seconds` -- `linear`: `base_delay_seconds * attempt` -- `exponential`: `base_delay_seconds * 2 ** (attempt - 1)` - -`TaskInfo` new fields: -```python -attempt_number: int = 0 # how many times this task has been tried -max_retries: int = 0 # copied from RetryPolicy.max_retries at submit time -``` - -`TaskStatus.RETRYING = "retrying"` — set while waiting during backoff delay. - -`_run()` retry loop: -``` -while True: - try: execute → COMPLETED; break - except CancelledError: CANCELLED; break - except Exception: - if attempt_number < max_retries: - status = RETRYING - await asyncio.sleep(policy.delay_for(attempt_number + 1)) - attempt_number += 1 - else: - status = FAILED; break -``` - -### Reaper - -```python -def start_reaper( - self, - interval_seconds: float = 3600.0, - max_age_seconds: float = 3600.0, -) -> None: ... - -def stop_reaper(self) -> None: ... -``` - -Implementation: creates an `asyncio.Task` running a loop: -``` -while True: - await asyncio.sleep(interval_seconds) - self.cleanup(max_age_seconds) -``` - -`shutdown()` calls `stop_reaper()` before cancelling worker tasks. - -Double-start guard: calling `start_reaper()` when already running raises `RuntimeError`. - -## Acceptance Criteria - -- `TaskStore` protocol is structurally compatible with `InMemoryTaskStore` (verified by - `isinstance` check or type: `assert isinstance(InMemoryTaskStore(), TaskStore)`) -- `AsyncTaskManager` accepts a custom `TaskStore` at construction time; all task state - flows through it -- A task that fails and has `RetryPolicy(max_retries=2)` retries twice before entering - `FAILED`; `attempt_number` on the final `TaskInfo` equals 2 -- Status transitions during retry: RUNNING → RETRYING → RUNNING → ... → FAILED -- Delay values match the formula for each backoff strategy -- After `start_reaper()`, terminal-state tasks older than `max_age_seconds` are removed - automatically without calling `cleanup()` manually -- `stop_reaper()` halts the background task within one sleep cycle -- `shutdown()` stops the reaper and cancels all worker tasks -- Calling `start_reaper()` twice raises `RuntimeError` -- All existing tests continue to pass (no regressions) -- `TaskStore`, `InMemoryTaskStore`, `RetryPolicy` are importable from `apcore` diff --git a/docs/features/middleware-architecture-hardening.md b/docs/features/middleware-architecture-hardening.md deleted file mode 100644 index b8e936b..0000000 --- a/docs/features/middleware-architecture-hardening.md +++ /dev/null @@ -1,76 +0,0 @@ -# Feature: Middleware Architecture Hardening (Issue #42) - -## Goal -Harden the apcore middleware system with a proper CircuitBreakerMiddleware, fix -priority initialization in TracingMiddleware and RetryMiddleware, add YAML config -support for circuit-breaker defaults, and fix the async detection bug where sync -`on_error` handlers block the event loop. - -## Scope - -### In Scope -- New `CircuitBreakerMiddleware` in `src/apcore/middleware/circuit_breaker.py` -- Fix `TracingMiddleware.__init__` to accept and forward `priority` via `super().__init__()` -- Fix `RetryMiddleware.__init__` to accept and forward `priority` via `super().__init__()` -- Add `middleware.circuit_breaker.*` YAML config keys to `config.py` with validation -- Fix `MiddlewareManager.execute_on_error_async` to use `asyncio.to_thread` for blocking sync handlers -- Export `CircuitBreakerMiddleware` from `middleware/__init__.py` - -### Out of Scope -- Changes to `events/circuit_breaker.py` (EventSubscriber wrapper — separate concern) -- New exporters for TracingMiddleware -- Changes to retry logic or retry backoff strategy - -## Affected Modules -- `src/apcore/middleware/circuit_breaker.py` — new file -- `src/apcore/middleware/__init__.py` — add `CircuitBreakerMiddleware` export -- `src/apcore/middleware/manager.py` — fix async on_error blocking -- `src/apcore/middleware/retry.py` — add `priority` param + `super().__init__()` -- `src/apcore/observability/tracing.py` — add `priority` param + `super().__init__()` -- `src/apcore/config.py` — add `middleware.circuit_breaker.*` config + constraints -- `src/apcore/errors.py` — add `CircuitOpenError` - -## New Modules -- `src/apcore/middleware/circuit_breaker.py` — `CircuitBreakerMiddleware` with per-module state machine - -## Technical Approach - -### CircuitBreakerMiddleware -Per-module state machine (CLOSED → OPEN → HALF_OPEN) tracked in `self._state` dict -keyed by `module_id`. Thread-safe with `threading.Lock`. In `before()`, if circuit is -OPEN and recovery window hasn't elapsed, raise `CircuitOpenError`. In `on_error()`, -increment failure counter; open circuit when `consecutive_failures >= failure_threshold`. -In `after()`, record success; in HALF_OPEN, close the circuit. - -Config parameters: -- `failure_threshold: int = 5` — consecutive failures before opening -- `recovery_window_ms: int = 60_000` — ms before OPEN → HALF_OPEN -- `success_threshold: int = 1` — successes in HALF_OPEN before closing - -### Priority fix -`TracingMiddleware.__init__` and `RetryMiddleware.__init__` both need to: -1. Accept `priority: int = 100` -2. Call `super().__init__(priority=priority)` - -### Async on_error fix -`execute_on_error_async` currently calls sync `on_error` directly. For handlers -that may block (e.g., `RetryMiddleware.on_error` with `time.sleep`), this blocks -the event loop. Fix: wrap sync handlers with `asyncio.to_thread`. - -### YAML config -Add to `_CONSTRAINTS`: -``` -"middleware.circuit_breaker.failure_threshold": int >= 1 -"middleware.circuit_breaker.recovery_window_ms": int >= 0 -"middleware.circuit_breaker.success_threshold": int >= 1 -``` - -## Acceptance Criteria -- `CircuitBreakerMiddleware` opens after N consecutive failures per module -- `CircuitBreakerMiddleware` transitions OPEN → HALF_OPEN after recovery window -- `CircuitBreakerMiddleware` closes on success in HALF_OPEN -- `TracingMiddleware(exporter, priority=200)` sets priority to 200 without error -- `RetryMiddleware(config, priority=50)` sets priority to 50 without error -- `execute_on_error_async` does not block the event loop for sync `on_error` handlers -- YAML config `middleware.circuit_breaker.failure_threshold` is validated -- All existing tests continue to pass diff --git a/examples/cancel_token.py b/examples/cancel_token.py index 91c796f..857b2b4 100644 --- a/examples/cancel_token.py +++ b/examples/cancel_token.py @@ -33,8 +33,10 @@ def slow_task(steps: int, context: Context) -> dict: print("\n--- Run 2: Cancel after 80ms ---") token = CancelToken() - ctx = Context.create() - ctx.cancel_token = token # inject the token into the context + # cancel_token is a first-class Context.create() parameter (v0.22.0, + # Issue #66). Pass it at construction time — the post-hoc + # ctx.cancel_token = token anti-pattern is no longer required. + ctx = Context.create(cancel_token=token) # Fire cancellation from a background thread after 80 ms timer = threading.Timer(0.08, token.cancel) diff --git a/src/apcore/__init__.py b/src/apcore/__init__.py index e58f75a..501dfd9 100644 --- a/src/apcore/__init__.py +++ b/src/apcore/__init__.py @@ -96,6 +96,7 @@ ConfigNamespaceDuplicateError, ConfigNamespaceReservedError, ConfigNotFoundError, + ContextBindingError, DependencyNotFoundError, DependencyVersionMismatchError, ErrorCodeCollisionError, @@ -642,6 +643,7 @@ def enable(module_id: str, reason: str = "Enabled via APCore client") -> dict[st "ConfigNamespaceDuplicateError", "ConfigNamespaceReservedError", "ConfigNotFoundError", + "ContextBindingError", "DependencyNotFoundError", "DependencyVersionMismatchError", "ErrorFormatterDuplicateError", diff --git a/src/apcore/async_task.py b/src/apcore/async_task.py index 20cc643..b4be8e0 100644 --- a/src/apcore/async_task.py +++ b/src/apcore/async_task.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio +import dataclasses +import inspect import logging import time import uuid @@ -38,6 +40,12 @@ class ReaperHandle: call :meth:`stop` to cancel the periodic cleanup loop; :meth:`is_running` reports whether the underlying ``asyncio.Task`` is still active. Idempotent: ``stop()`` after the loop has already finished is a no-op. + + ``stop()`` is **async** and awaits the reaper task to fully drain + (D-11 / A-D-AT-03), matching the TypeScript and Rust SDKs. Callers + that still invoke ``handle.stop()`` from synchronous code (the + pre-D-11 surface) get a ``DeprecationWarning`` and the coroutine is + driven to completion via a private adapter — see :meth:`_run_sync_stop`. """ __slots__ = ("_manager",) @@ -45,9 +53,9 @@ class ReaperHandle: def __init__(self, manager: "AsyncTaskManager") -> None: self._manager = manager - def stop(self) -> None: - """Cancel the periodic reap loop (idempotent).""" - self._manager.stop_reaper() + async def stop(self) -> None: + """Cancel the periodic reap loop and await its termination (idempotent).""" + await self._manager.stop_reaper() def is_running(self) -> bool: """Return True if the reaper task exists and has not finished.""" @@ -166,37 +174,44 @@ def attempt_number(self, value: int) -> None: @runtime_checkable class TaskStore(Protocol): - """Sync storage backend for :class:`TaskInfo` records. + """Async storage backend for :class:`TaskInfo` records (D-17). The default implementation is :class:`InMemoryTaskStore`. Users may - supply a custom store (e.g. Redis, DB) via ``AsyncTaskManager(store=...)``. - All methods are synchronous; async stores must wrap I/O in a sync adapter. + supply a custom store (e.g. Redis, SQL) via ``AsyncTaskManager(store=...)``. + All methods are asynchronous in every SDK so that I/O-backed stores can + plug in without blocking the runtime's event loop; ``InMemoryTaskStore`` + keeps the uniform async shape even though its operations are CPU-only. .. note:: The canonical write method is :meth:`save`. ``put`` is retained as a deprecated shim for one minor release (D-10). """ - def get(self, task_id: str) -> TaskInfo | None: ... - def save(self, info: TaskInfo) -> None: ... - def delete(self, task_id: str) -> None: ... - def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: ... - def list_expired(self, before_timestamp: float) -> list[TaskInfo]: ... + async def get(self, task_id: str) -> TaskInfo | None: ... + async def save(self, info: TaskInfo) -> None: ... + async def delete(self, task_id: str) -> None: ... + async def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: ... + async def list_expired(self, before_timestamp: float) -> list[TaskInfo]: ... class InMemoryTaskStore: - """Default in-memory :class:`TaskStore` backed by a plain dict.""" + """Default in-memory :class:`TaskStore` backed by a plain dict. + + All methods are ``async`` to match the :class:`TaskStore` Protocol + (D-17). Operations remain pure CPU work — no I/O — so the coroutine + bodies return immediately without awaiting. + """ def __init__(self) -> None: self._data: dict[str, TaskInfo] = {} - def get(self, task_id: str) -> TaskInfo | None: + async def get(self, task_id: str) -> TaskInfo | None: return self._data.get(task_id) - def save(self, info: TaskInfo) -> None: + async def save(self, info: TaskInfo) -> None: self._data[info.task_id] = info - def put(self, info: TaskInfo) -> None: + async def put(self, info: TaskInfo) -> None: """Deprecated alias for :meth:`save` (D-10). .. deprecated:: 0.21.0 @@ -208,17 +223,17 @@ def put(self, info: TaskInfo) -> None: DeprecationWarning, stacklevel=2, ) - self.save(info) + await self.save(info) - def delete(self, task_id: str) -> None: + async def delete(self, task_id: str) -> None: self._data.pop(task_id, None) - def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: + async def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: if status is None: return list(self._data.values()) return [t for t in self._data.values() if t.status == status] - def list_expired(self, before_timestamp: float) -> list[TaskInfo]: + async def list_expired(self, before_timestamp: float) -> list[TaskInfo]: """Return terminal-state tasks whose ``completed_at`` precedes ``before_timestamp`` (D-10). @@ -299,15 +314,29 @@ class RetryPolicy: future major release. Attributes: - max_retries: Maximum number of retry attempts (0 = no retry). + max_retries: Maximum number of retry attempts (default ``0`` — + retries are strictly opt-in across all SDKs, D-14 / A-D-AT-09). + Earlier versions defaulted to ``3``, which silently enabled + retries when callers used ``RetryPolicy()`` without arguments. backoff: Delay growth strategy between attempts. base_delay_seconds: Base wait time fed into the backoff formula. """ - max_retries: int = 3 + max_retries: int = 0 backoff: BackoffStrategy = BackoffStrategy.EXPONENTIAL base_delay_seconds: float = 1.0 + def __post_init__(self) -> None: + # D-14 / A-D-AT-09: surface a one-shot deprecation when the legacy + # class is instantiated so callers migrate to RetryConfig. + warnings.warn( + "RetryPolicy is deprecated; use RetryConfig for cross-language " + "alignment with the TypeScript and Rust SDKs. RetryPolicy will " + "be removed in a future major release.", + DeprecationWarning, + stacklevel=3, + ) + def delay_for(self, attempt: int) -> float: """Return wait seconds before retry *attempt* (1-indexed).""" if self.backoff == BackoffStrategy.FIXED: @@ -322,6 +351,20 @@ def delay_for(self, attempt: int) -> float: _TERMINAL_STATUSES = frozenset({TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED}) +def _snapshot(info: TaskInfo | None) -> TaskInfo | None: + """Return a shallow copy of ``info`` so callers cannot mutate live state. + + Per D-clarification on ``AsyncTaskManager.get_status`` / ``list_tasks`` + (A-D-AT-06): the manager MUST hand out copies, matching the TypeScript + SDK's ``{ ...info }`` and the Rust SDK's ``info.clone()``. Without this + shim, Python callers could mutate the live dataclass and silently + corrupt the store's view of task state. + """ + if info is None: + return None + return dataclasses.replace(info) + + class AsyncTaskManager: """Manages background execution of modules via asyncio tasks. @@ -345,18 +388,42 @@ def __init__( self._reaper_interval: float = 3600.0 self._reaper_max_age: float = 3600.0 - def _save(self, info: TaskInfo) -> None: + async def _save(self, info: TaskInfo) -> None: """Persist ``info`` via the configured store. - Prefers the canonical :meth:`TaskStore.save` method. Falls back to - the deprecated :meth:`put` for legacy custom stores that have not - yet been updated to the post-D-10 API. + Prefers the canonical async :meth:`TaskStore.save` method (D-17). + Falls back to the deprecated :meth:`put` for legacy custom stores + that have not yet been updated to the post-D-10 API. Awaits both + coroutine and sync return values so pre-D-17 sync stores remain + usable for one deprecation window. """ save = getattr(self._store, "save", None) if callable(save): - save(info) + result = save(info) else: # pragma: no cover - legacy custom stores - self._store.put(info) # type: ignore[attr-defined] + result = self._store.put(info) # type: ignore[attr-defined] + if inspect.isawaitable(result): + await result + + async def _store_get(self, task_id: str) -> TaskInfo | None: + """Await-or-return adapter for ``self._store.get`` (D-17 migration shim).""" + result = self._store.get(task_id) + if inspect.isawaitable(result): + return await result + return result + + async def _store_list(self, status: TaskStatus | None = None) -> list[TaskInfo]: + """Await-or-return adapter for ``self._store.list`` (D-17 migration shim).""" + result = self._store.list(status) + if inspect.isawaitable(result): + return await result + return result + + async def _store_delete(self, task_id: str) -> None: + """Await-or-return adapter for ``self._store.delete`` (D-17 migration shim).""" + result = self._store.delete(task_id) + if inspect.isawaitable(result): + await result async def submit( self, @@ -381,7 +448,8 @@ async def submit( Returns: The generated task_id (UUID4 string). """ - active = sum(1 for info in self._store.list() if info.status in _ACTIVE_STATUSES) + all_tasks = await self._store_list() + active = sum(1 for info in all_tasks if info.status in _ACTIVE_STATUSES) if active >= self._max_tasks: raise TaskLimitExceededError(max_tasks=self._max_tasks) @@ -393,7 +461,7 @@ async def submit( submitted_at=time.time(), max_retries=retry_policy.max_retries if retry_policy is not None else 0, ) - self._save(info) + await self._save(info) async_task = asyncio.create_task(self._run(task_id, module_id, inputs, context, retry_policy)) async_task.add_done_callback(lambda _: self._async_tasks.pop(task_id, None)) @@ -402,8 +470,40 @@ async def submit( return task_id def get_status(self, task_id: str) -> TaskInfo | None: - """Return the TaskInfo for a task, or None if not found.""" - return self._store.get(task_id) + """Return a snapshot of the TaskInfo for a task, or None if not found. + + The returned object is a shallow copy (A-D-AT-06): callers MUST + treat it as a read-only snapshot. Mutating the copy never affects + the live store. The store's ``get`` is read synchronously for the + in-memory default; async-backed stores SHOULD await + :meth:`get_status_async` instead. + """ + # For the default in-memory store the coroutine completes + # synchronously on the first send; we drive it directly to keep + # the legacy sync surface working. Custom async-only stores + # should use ``get_status_async``. + get = self._store.get(task_id) + if inspect.isawaitable(get): + try: + # Pump the coroutine without an event loop. This works for + # the bundled InMemoryTaskStore (whose body is pure CPU) + # and for any custom store whose ``get`` returns without + # actually awaiting external I/O. Stores that perform + # real awaits MUST call ``get_status_async`` instead. + try: + get.send(None) # type: ignore[union-attr] + except StopIteration as stop: # noqa: PERF203 — coroutine return value + return _snapshot(stop.value) + raise RuntimeError( + "TaskStore.get() suspended; call get_status_async() for I/O-backed stores" + ) + finally: + get.close() # type: ignore[union-attr] + return _snapshot(get) + + async def get_status_async(self, task_id: str) -> TaskInfo | None: + """Async variant of :meth:`get_status` for I/O-backed stores (D-17).""" + return _snapshot(await self._store_get(task_id)) def get_result(self, task_id: str) -> Any: """Return the result of a completed task. @@ -412,7 +512,7 @@ def get_result(self, task_id: str) -> Any: KeyError: If the task_id is not found. RuntimeError: If the task is not in COMPLETED status. """ - info = self._store.get(task_id) + info = self.get_status(task_id) if info is None: raise KeyError(f"Task not found: {task_id}") if info.status != TaskStatus.COMPLETED: @@ -425,7 +525,7 @@ async def cancel(self, task_id: str) -> bool: Returns: True if the task was successfully cancelled, False otherwise. """ - info = self._store.get(task_id) + info = await self._store_get(task_id) if info is None: return False if info.status not in _ACTIVE_STATUSES: @@ -448,24 +548,50 @@ async def cancel(self, task_id: str) -> bool: exc_info=True, ) - if info.status in _ACTIVE_STATUSES: + # Re-read after the task settles in case the runner already + # transitioned to a terminal state inside its cancel-handler. + info = await self._store_get(task_id) + if info is not None and info.status in _ACTIVE_STATUSES: info.status = TaskStatus.CANCELLED info.completed_at = time.time() - self._save(info) + await self._save(info) return True async def shutdown(self) -> None: """Cancel all pending/running/retrying tasks and stop the reaper.""" - self.stop_reaper() + await self.stop_reaper() for task_id in list(self._async_tasks): await self.cancel(task_id) def list_tasks(self, status: TaskStatus | None = None) -> list[TaskInfo]: - """Return all tasks, optionally filtered by status.""" - return self._store.list(status) + """Return snapshots of all tasks, optionally filtered by status. + + Each entry is a shallow copy of the live :class:`TaskInfo` — see + :meth:`get_status` for the mutation-safety contract (A-D-AT-06). + I/O-backed stores SHOULD use :meth:`list_tasks_async` instead. + """ + result = self._store.list(status) + if inspect.isawaitable(result): + try: + try: + result.send(None) # type: ignore[union-attr] + except StopIteration as stop: # noqa: PERF203 + items: list[TaskInfo] = list(stop.value or []) + return [info for info in (_snapshot(it) for it in items) if info is not None] + raise RuntimeError( + "TaskStore.list() suspended; call list_tasks_async() for I/O-backed stores" + ) + finally: + result.close() # type: ignore[union-attr] + return [info for info in (_snapshot(it) for it in result) if info is not None] - def cleanup(self, max_age_seconds: float = 3600.0) -> int: + async def list_tasks_async(self, status: TaskStatus | None = None) -> list[TaskInfo]: + """Async variant of :meth:`list_tasks` for I/O-backed stores (D-17).""" + items = await self._store_list(status) + return [info for info in (_snapshot(it) for it in items) if info is not None] + + async def cleanup(self, max_age_seconds: float = 3600.0) -> int: """Remove terminal-state tasks older than max_age_seconds. Terminal states: COMPLETED, FAILED, CANCELLED. @@ -476,7 +602,7 @@ def cleanup(self, max_age_seconds: float = 3600.0) -> int: now = time.time() to_remove: list[str] = [] - for info in self._store.list(): + for info in await self._store_list(): if info.status not in _TERMINAL_STATUSES: continue ref_time = info.completed_at if info.completed_at is not None else info.submitted_at @@ -484,7 +610,7 @@ def cleanup(self, max_age_seconds: float = 3600.0) -> int: to_remove.append(info.task_id) for task_id in to_remove: - self._store.delete(task_id) + await self._store_delete(task_id) self._async_tasks.pop(task_id, None) return len(to_remove) @@ -559,18 +685,32 @@ def start_reaper( self._reaper_task = asyncio.create_task(self._reap_loop()) return ReaperHandle(self) - def stop_reaper(self) -> None: - """Stop the background reaper task. No-op if not running.""" - if self._reaper_task is not None and not self._reaper_task.done(): - self._reaper_task.cancel() + async def stop_reaper(self) -> None: + """Stop the background reaper task and await its termination. + + Per D-11 / A-D-AT-03 the stop surface is async and MUST drain the + underlying coroutine, mirroring the TypeScript and Rust SDKs. The + method is idempotent and is a no-op when no reaper is active. + """ + task = self._reaper_task self._reaper_task = None + if task is None or task.done(): + return + task.cancel() + try: + await task + except asyncio.CancelledError: + # Expected — the loop awaits asyncio.sleep which raises here. + pass + except Exception as exc: # pragma: no cover - defensive + _logger.warning("Reaper task raised during shutdown: %s", exc, exc_info=True) async def _reap_loop(self) -> None: """Periodic cleanup loop executed by the reaper task.""" try: while True: await asyncio.sleep(self._reaper_interval) - self.cleanup(self._reaper_max_age) + await self.cleanup(self._reaper_max_age) except asyncio.CancelledError: pass @@ -583,7 +723,7 @@ async def _run( retry_policy: "RetryPolicy | RetryConfig | None", ) -> None: """Internal coroutine: execute a module with optional retry/backoff.""" - info = self._store.get(task_id) + info = await self._store_get(task_id) if info is None: return @@ -595,20 +735,20 @@ async def _run( info.status = TaskStatus.RUNNING if info.started_at is None: info.started_at = time.time() - self._save(info) + await self._save(info) result = await self._executor.call_async(module_id, inputs, context) info.status = TaskStatus.COMPLETED info.completed_at = time.time() info.result = result - self._save(info) + await self._save(info) return except asyncio.CancelledError: info.status = TaskStatus.CANCELLED info.completed_at = time.time() - self._save(info) + await self._save(info) _logger.info("Task %s cancelled", task_id) return @@ -619,7 +759,7 @@ async def _run( # D-12: backoff state is PENDING (was RETRYING) — matches # TypeScript and Rust SDKs. info.status = TaskStatus.PENDING - self._save(info) + await self._save(info) _logger.info( "Task %s failed (attempt %d/%d), retrying in %.3fs", task_id, @@ -632,13 +772,13 @@ async def _run( except asyncio.CancelledError: info.status = TaskStatus.CANCELLED info.completed_at = time.time() - self._save(info) + await self._save(info) _logger.info("Task %s cancelled during backoff", task_id) return else: info.status = TaskStatus.FAILED info.completed_at = time.time() info.error = str(exc) - self._save(info) + await self._save(info) _logger.error("Task %s failed: %s", task_id, exc, exc_info=True) return diff --git a/src/apcore/builtin_steps.py b/src/apcore/builtin_steps.py index b063719..72ff7ed 100644 --- a/src/apcore/builtin_steps.py +++ b/src/apcore/builtin_steps.py @@ -128,15 +128,27 @@ def __init__( async def execute(self, ctx: PipelineContext) -> StepResult: if ctx.context is None: - new_ctx = Context.create(executor=self._executor) - new_ctx = new_ctx.child(ctx.module_id) - if self._global_timeout > 0: - new_ctx.global_deadline = time.monotonic() + self._global_timeout / 1000.0 - ctx.context = new_ctx + # Fallback path: a Context was not bound at the Executor entry + # point (e.g. PipelineContext constructed directly in tests). + # Per PROTOCOL_SPEC §"Contract: Context.create", executor is not + # a public constructor input; bind via the private helper. + base_ctx = Context.create() + base_ctx._bind_executor(self._executor) else: - # Derive child context to add module_id to call chain - child = ctx.context.child(ctx.module_id) - ctx.context = child + base_ctx = ctx.context + + # Root call detection — empty call_chain means this is a top-level + # invocation. Per PROTOCOL_SPEC §"Contract: `global_deadline` + # distributed semantics", the receiving Executor MUST (re)compute + # global_deadline from local config at pipeline entry. If the + # caller already populated global_deadline (in-process cooperation), + # preserve it; only fill in when unset. + is_root_call = not base_ctx.call_chain + if is_root_call and base_ctx.global_deadline is None and self._global_timeout > 0: + base_ctx.global_deadline = time.monotonic() + self._global_timeout / 1000.0 + + # Derive child context to add module_id to call chain. + ctx.context = base_ctx.child(ctx.module_id) return StepResult(action="continue") @@ -168,6 +180,16 @@ def __init__(self, *, config: Any | None = None) -> None: self._max_module_repeat = Config.get_default("executor.max_module_repeat") async def execute(self, ctx: PipelineContext) -> StepResult: + # D-21 / A-D-EXEC-002: short-circuit before any expensive validation + # or middleware work if the caller already cancelled. The pipeline + # also re-checks at the Execute step (defensive backstop for tokens + # cancelled while later steps run), but observing the token here is + # what makes the two-point invariant hold — single-check + # implementations leak compute through ACL/middleware/validation. + cancel_token = getattr(ctx.context, "cancel_token", None) + if cancel_token is not None and getattr(cancel_token, "is_cancelled", False): + raise ExecutionCancelledError() + call_chain = getattr(ctx.context, "call_chain", []) guard_call_chain( ctx.module_id, diff --git a/src/apcore/context.py b/src/apcore/context.py index 0c564ca..861c46b 100644 --- a/src/apcore/context.py +++ b/src/apcore/context.py @@ -48,14 +48,23 @@ class Context(Generic[T]): @classmethod def create( cls, - executor: Any = None, identity: Identity | None = None, - data: dict[str, Any] | None = None, trace_parent: TraceParent | None = None, + cancel_token: CancelToken | None = None, + data: dict[str, Any] | None = None, services: T = None, # type: ignore[assignment] + global_deadline: float | None = None, ) -> Context[T]: """Create a new top-level Context with a generated trace_id. + Unified signature per apcore PROTOCOL_SPEC §"Contract: Context.create" + (v0.22.0, Issue #66). The accepted caller inputs are exactly: + ``identity``, ``trace_parent``, ``cancel_token``, ``data``, + ``services``, ``global_deadline``. ``executor`` and ``caller_id`` are + intentionally NOT inputs — the executor is bound by the Executor at + pipeline entry (see ``Context._bind_executor``), and ``caller_id`` is + managed exclusively by ``Context.child()``. + When *trace_parent* is provided, its ``trace_id`` is accepted only if it is exactly 32 lowercase hex characters and not the W3C-reserved all-zero or all-f value. Otherwise a fresh 32-hex trace_id is @@ -96,12 +105,39 @@ def create( trace_id=trace_id, caller_id=None, call_chain=[], - executor=executor, + executor=None, identity=identity, data=ctx_data, services=services, # type: ignore[arg-type] + cancel_token=cancel_token, + global_deadline=global_deadline, ) + def _bind_executor(self, executor: Any) -> None: + """SDK-internal. Bind the Executor to this Context. + + Implements PROTOCOL_SPEC §"Contract: Executor binding to Context": + - If ``self.executor`` is None, bind it. + - If ``self.executor`` is already the same Executor instance + (identity comparison), the rebind is a noop. + - If ``self.executor`` is a *different* Executor instance, raise + :class:`apcore.errors.ContextBindingError`. + + Not intended for public callers; the Executor invokes this before + pipeline step 1 on every entry point that accepts a caller-supplied + Context. + """ + if self.executor is None: + self.executor = executor + elif self.executor is not executor: + # Imported lazily to avoid a circular import (errors -> context). + from apcore.errors import ContextBindingError + + raise ContextBindingError( + "Context already bound to a different Executor instance" + ) + # else: same executor instance, noop. + def serialize(self) -> dict[str, Any]: """Serialize Context to a JSON-encodable dict. diff --git a/src/apcore/errors.py b/src/apcore/errors.py index 4f6e039..494e8ab 100644 --- a/src/apcore/errors.py +++ b/src/apcore/errors.py @@ -40,6 +40,7 @@ "CircularCallError", "CallFrequencyExceededError", "InvalidInputError", + "ContextBindingError", "FuncMissingTypeHintError", "FuncMissingReturnTypeError", "BindingInvalidTargetError", @@ -664,6 +665,28 @@ def __init__( super().__init__(code=code, message=message, **kwargs) +class ContextBindingError(ModuleError): + """Raised when an Executor attempts to bind itself to a Context that is + already bound to a *different* Executor instance. + + Per apcore PROTOCOL_SPEC §"Contract: Executor binding to Context", a + Context whose ``executor`` field is non-null and refers to a different + Executor instance is a cross-executor conflict; rebinding the same + instance is a noop. SDKs SHOULD raise this error; SDKs that choose to + accept silently MUST document the deviation prominently. + """ + + _default_retryable: bool | None = False + + def __init__( + self, + message: str = "Context is already bound to a different Executor instance", + code: str = "CONTEXT_BINDING_ERROR", + **kwargs: Any, + ) -> None: + super().__init__(code=code, message=message, **kwargs) + + class FuncMissingTypeHintError(ModuleError): """Raised when a function parameter has no type annotation or a forward reference cannot be resolved.""" @@ -1270,6 +1293,7 @@ class ErrorCodes: MODULE_RELOAD_CONFLICT = "MODULE_RELOAD_CONFLICT" SYS_MODULE_REGISTRATION_FAILED = "SYS_MODULE_REGISTRATION_FAILED" STREAMING_INTERFACE_MISMATCH = "STREAMING_INTERFACE_MISMATCH" + CONTEXT_BINDING_ERROR = "CONTEXT_BINDING_ERROR" # Note: this class is intentionally NOT instantiated. All callers access the # constants as class attributes (`ErrorCodes.MODULE_NOT_FOUND`). A previous @@ -1304,6 +1328,7 @@ class ErrorCodes: "ERROR_CODE_", "PIPELINE_", "STREAMING_", + "CONTEXT_", } ) diff --git a/src/apcore/executor.py b/src/apcore/executor.py index 67cc754..f8003da 100644 --- a/src/apcore/executor.py +++ b/src/apcore/executor.py @@ -30,7 +30,6 @@ ErrorCodes, InvalidInputError, ModuleError, - ModuleExecuteError, ModuleNotFoundError, ModuleTimeoutError, SchemaValidationError, @@ -487,6 +486,13 @@ async def _validate_async( checks.append(PreflightCheckResult(check="module_id", passed=False, error=e.to_dict())) return PreflightResult(valid=False, checks=checks) + # PROTOCOL_SPEC §"Contract: Executor binding to Context": bind self + # to the Context before pipeline step 1 (also applies to dry-run + # validation, since the pipeline expects a bound Context). + if context is None: + context = Context.create() + context._bind_executor(self) + # Run pipeline in dry_run mode — pure=False steps are skipped pipe_ctx = PipelineContext( module_id=module_id, @@ -751,6 +757,13 @@ async def call_async( """ self._validate_module_id(module_id) + # PROTOCOL_SPEC §"Contract: Executor binding to Context": bind self + # to the Context before pipeline step 1. Auto-create the Context when + # the caller did not supply one. + if context is None: + context = Context.create() + context._bind_executor(self) + pipe_ctx = PipelineContext( module_id=module_id, inputs=inputs or {}, @@ -811,12 +824,25 @@ async def _recover_from_call_error( - ``dict`` — a recovery output from the first handler that provided one. - :class:`RetrySignal` — a handler asked for a retry; the caller must re-run the pipeline. - - Never returns ``None``: if no handler recovered, the wrapped - error is raised (or a ``MiddlewareChainError`` is converted to - ``ModuleExecuteError``). + - Never returns ``None``: if no handler recovered, the unwrapped + original error is raised (D-22 / A-D-EXEC-005). + + D-22 — Error Unwrap Rule: when middleware machinery wraps a + domain-typed error (e.g. ``ApprovalDeniedError``) in a + ``MiddlewareChainError`` for diagnostics, the executor MUST unwrap + the wrapper and propagate ``MiddlewareChainError.original`` to + ``propagate_error`` and to the caller. Replacing the cause with a + generic ``ModuleExecuteError`` collapses callers' ability to + dispatch on the typed cause (e.g. MCP/A2A bridges keying on + ``APPROVAL_DENIED`` vs ``MODULE_EXECUTE_ERROR``). This mirrors the + TypeScript and Rust SDKs. """ ctx_obj = pipe_ctx.context - wrapped = propagate_error(exc, module_id, ctx_obj) if ctx_obj else exc + # Unwrap MiddlewareChainError BEFORE propagation so the wrapped + # typed cause is what middleware on_error handlers and the final + # caller observe — not the chain-machinery wrapper. + original = exc.original if isinstance(exc, MiddlewareChainError) else exc + wrapped = propagate_error(original, module_id, ctx_obj) if ctx_obj else original executed_mw = pipe_ctx.executed_middlewares if executed_mw: recovery = await self._middleware_manager.execute_on_error_async( @@ -824,9 +850,7 @@ async def _recover_from_call_error( ) if recovery is not None: return recovery - if isinstance(exc, MiddlewareChainError): - raise ModuleExecuteError(module_id=module_id, message=str(exc)) from exc - raise wrapped from exc + raise wrapped from original async def stream( self, @@ -854,6 +878,12 @@ async def stream( """ self._validate_module_id(module_id) + # PROTOCOL_SPEC §"Contract: Executor binding to Context": bind self + # to the Context before pipeline step 1. + if context is None: + context = Context.create() + context._bind_executor(self) + pipe_ctx = PipelineContext( module_id=module_id, inputs=inputs or {}, @@ -1107,6 +1137,13 @@ async def call_async_with_trace( A tuple of (result dict, PipelineTrace). """ effective_strategy = self._effective_strategy(strategy) + + # PROTOCOL_SPEC §"Contract: Executor binding to Context": bind self + # to the Context before pipeline step 1. + if context is None: + context = Context.create() + context._bind_executor(self) + pipe_ctx = PipelineContext( module_id=module_id, inputs=inputs or {}, diff --git a/src/apcore/registry/registry.py b/src/apcore/registry/registry.py index dbca33b..eddc6da 100644 --- a/src/apcore/registry/registry.py +++ b/src/apcore/registry/registry.py @@ -918,6 +918,15 @@ def _register_in_order( to ``_modules``, leaving version-hint queries unable to see auto-discovered modules. + A-D-REG-003 / Issue #65 — discover paths now apply the same + deferred-publish protocol as the public ``register()`` API: + reserve an in-flight slot → run ``on_load()`` outside the lock → + atomically publish on success. The earlier implementation + published into ``_modules`` *before* invoking ``on_load`` and + relied on rollback, leaving a window in which ``registry.get()`` + callers could observe a module whose ``on_load``-installed state + (warmed pools, primed caches) was incomplete. + Returns the number of modules that successfully completed registration (including a successful ``on_load`` call when defined). """ @@ -938,7 +947,26 @@ def _register_in_order( # are picked up; getattr falls through to class attrs anyway. # Aligned with manual register() / register_internal(). merged_meta = merge_module_metadata(module, meta) + + # Phase 1: reserve in-flight slot. Skip if already in-flight or + # visible — concurrent discover invocations must not double-load. + with self._lock: + if mod_id in self._in_flight or mod_id in self._modules: + logger.warning( + "Discover skipping '%s' — already registered or in-flight", mod_id + ) + continue + self._in_flight.add(mod_id) + + # Phase 2: run on_load OUTSIDE the lock. Module is NOT visible. + if not self._invoke_on_load(mod_id, module, effective_version): + # _invoke_on_load already discarded the in-flight slot and + # emitted apcore.registry.module_load_failed. Move on. + continue + + # Phase 3: atomic publish. with self._lock: + self._in_flight.discard(mod_id) self._versioned_modules.add(mod_id, effective_version, module) if meta: self._versioned_meta.add(mod_id, effective_version, meta) @@ -946,9 +974,6 @@ def _register_in_order( self._module_meta[mod_id] = merged_meta self._lowercase_map[mod_id.lower()] = mod_id - if not self._invoke_on_load(mod_id, module, effective_version): - continue - self._trigger_event("register", mod_id, module) registered_count += 1 return registered_count @@ -964,12 +989,24 @@ def _effective_version(module: Any, meta: dict[str, Any]) -> str: return resolved def _invoke_on_load(self, mod_id: str, module: Any, effective_version: str) -> bool: - """Call ``module.on_load()`` if defined; roll back registration on failure. - - Returns True if the module is still registered, False if it was removed. - Rollback symmetrically clears both the latest-only ``_modules`` view - and the multi-version ``_versioned_modules`` / ``_versioned_meta`` - stores populated by ``_register_in_order``. + """Call ``module.on_load()`` if defined; clean up in-flight slot on failure. + + Returns True when ``on_load`` succeeded (or no callback was defined) + and the caller may proceed to publish. Returns False when the + callback raised, in which case this helper has already: + + * removed ``mod_id`` from the in-flight set, + * emitted ``apcore.registry.module_load_failed`` (A-D-REG-005), + mirroring the public ``register()`` path so observers see a + uniform DLQ-style signal regardless of which registration + path failed, + * logged the failure at ERROR. + + Because the deferred-publish refactor (A-D-REG-003) calls this + BEFORE the module is inserted into the visible store, there is no + rollback of ``_modules`` / ``_versioned_modules`` to do — the + invariant is that an on_load failure leaves the registry exactly + as it was before the registration attempt began. """ if not (hasattr(module, "on_load") and callable(module.on_load)): return True @@ -978,12 +1015,12 @@ def _invoke_on_load(self, mod_id: str, module: Any, effective_version: str) -> b except Exception as e: logger.error("on_load() failed for module '%s': %s", mod_id, e) with self._lock: - self._versioned_modules.remove(mod_id, effective_version) - self._versioned_meta.remove(mod_id, effective_version) - if not self._versioned_modules.has(mod_id): - self._modules.pop(mod_id, None) - self._module_meta.pop(mod_id, None) - self._lowercase_map.pop(mod_id.lower(), None) + self._in_flight.discard(mod_id) + # A-D-REG-005: align with the public register() path — every + # registration site that observes an on_load failure must + # emit the canonical module_load_failed event so subscribers + # have a single hook for partial-init detection. + self._emit_module_load_failed(mod_id, e) return False return True @@ -2097,7 +2134,11 @@ def register_internal(self, module_id: str, module: Any) -> None: # here. Namespace → registration-mechanism is a 1:1 mapping; mixing # blurs the audit-trail distinction between framework-emitted # (system.*) and caller-emitted (ephemeral.*) modules. - if module_id.startswith(EPHEMERAL_NAMESPACE_PREFIX): + # A-D-REG-002: use the shared _is_ephemeral helper so the bare + # ``ephemeral`` ID (no dot) is rejected here too — ``startswith`` + # missed it, leaving a one-character carveout that contradicted + # the canonical _is_ephemeral classifier used everywhere else. + if _is_ephemeral(module_id): raise ValueError( f"ephemeral.* module IDs must be registered via Registry.register(), " f"not register_internal() (got: {module_id!r}). See apcore " @@ -2114,7 +2155,17 @@ def register_internal(self, module_id: str, module: Any) -> None: # __init__-set attributes are picked up. merged_meta = merge_module_metadata(module, {}) + # A-D-REG-004: register_internal now routes through the SAME + # deferred-publish + module_load_failed-emit helper that + # register() uses, so the Issue #65 invariant ("modules are + # invisible until on_load completes") holds uniformly across + # every registration path, not just the public API. with self._lock: + if module_id in self._in_flight: + raise InvalidInputError( + message=f"Module '{module_id}' is already being registered (in-flight)", + code=ErrorCodes.DUPLICATE_MODULE_ID, + ) # Sync finding A-D-002: use detect_id_conflicts with an empty # reserved-words set so case-collision detection still runs on # the sys/internal path. Apcore-rust's register_core (called by @@ -2135,21 +2186,27 @@ def register_internal(self, module_id: str, module: Any) -> None: message=conflict.message, code=ErrorCodes.DUPLICATE_MODULE_ID, ) - self._modules[module_id] = module - self._module_meta[module_id] = merged_meta - self._lowercase_map[module_id.lower()] = module_id + self._in_flight.add(module_id) - # Call on_load if defined — mirrors the register() path. Roll back if it fails. + # Phase 2: call on_load() OUTSIDE the lock. Module is not visible + # yet. On failure: remove from in-flight, emit module_load_failed, + # re-raise the original exception unchanged (no wrapping). if hasattr(module, "on_load") and callable(module.on_load): try: module.on_load() - except Exception: + except Exception as exc: with self._lock: - self._modules.pop(module_id, None) - self._module_meta.pop(module_id, None) - self._lowercase_map.pop(module_id.lower(), None) + self._in_flight.discard(module_id) + self._emit_module_load_failed(module_id, exc) raise + # Phase 3: atomically publish into the visible store. + with self._lock: + self._in_flight.discard(module_id) + self._modules[module_id] = module + self._module_meta[module_id] = merged_meta + self._lowercase_map[module_id.lower()] = module_id + self._trigger_event("register", module_id, module) # ----- Cache ----- diff --git a/tests/integration/test_async_flows.py b/tests/integration/test_async_flows.py index 0b30f61..2d77ebb 100644 --- a/tests/integration/test_async_flows.py +++ b/tests/integration/test_async_flows.py @@ -26,7 +26,7 @@ async def test_call_async_with_sync_module(self, int_executor): @pytest.mark.asyncio async def test_context_propagation_in_async(self, int_executor): - ctx = Context.create(executor=int_executor) + ctx = Context.create() result = await int_executor.call_async("async_greet", {"name": "Delta"}, context=ctx) assert result == {"message": "Hello, Delta!"} assert ctx.trace_id is not None diff --git a/tests/integration/test_full_lifecycle.py b/tests/integration/test_full_lifecycle.py index 3ec7127..430cc2b 100644 --- a/tests/integration/test_full_lifecycle.py +++ b/tests/integration/test_full_lifecycle.py @@ -432,7 +432,6 @@ def test_full_pipeline_with_acl_conditions(self) -> None: # Call with matching identity type: should succeed service_ctx = Context.create( - executor=executor, identity=Identity(id="svc-1", type="service"), ) result = executor.call("mod.cond", {"name": "Svc"}, context=service_ctx) @@ -440,7 +439,6 @@ def test_full_pipeline_with_acl_conditions(self) -> None: # Call with non-matching identity type: should be denied user_ctx = Context.create( - executor=executor, identity=Identity(id="user-1", type="user"), ) with pytest.raises(ACLDeniedError): diff --git a/tests/test_approval_executor.py b/tests/test_approval_executor.py index 1eeb77c..70f2112 100644 --- a/tests/test_approval_executor.py +++ b/tests/test_approval_executor.py @@ -451,7 +451,7 @@ class MockSpan: executor = Executor(registry=registry, approval_handler=handler) # Simulate a parent call that already has tracing spans (nested call scenario) - ctx = Context.create(executor=executor) + ctx = Context.create() ctx.data["_apcore.mw.tracing.spans"] = [MockSpan()] executor.call("test.approval_required", {}, context=ctx) @@ -471,7 +471,7 @@ class MockSpan: events = mock_span_events executor = Executor(registry=registry, approval_handler=AlwaysDenyHandler()) - ctx = Context.create(executor=executor) + ctx = Context.create() ctx.data["_apcore.mw.tracing.spans"] = [MockSpan()] with pytest.raises(ApprovalDeniedError): diff --git a/tests/test_approval_integration.py b/tests/test_approval_integration.py index 813fe80..7f082e3 100644 --- a/tests/test_approval_integration.py +++ b/tests/test_approval_integration.py @@ -191,7 +191,7 @@ async def capture(request: ApprovalRequest) -> ApprovalResult: handler = CallbackApprovalHandler(capture) executor = Executor(registry=registry, approval_handler=handler) identity = Identity(id="user-42", type="user", roles=("admin",)) - ctx = Context.create(executor=executor, identity=identity) + ctx = Context.create(identity=identity) executor.call("admin.delete_user", {"user_id": "123"}, context=ctx) @@ -213,7 +213,6 @@ async def conditional(request: ApprovalRequest) -> ApprovalResult: # Admin user → approved admin_ctx = Context.create( - executor=executor, identity=Identity(id="admin-1", roles=("admin",)), ) result = executor.call("admin.delete_user", {"user_id": "123"}, context=admin_ctx) @@ -221,7 +220,6 @@ async def conditional(request: ApprovalRequest) -> ApprovalResult: # Regular user → denied user_ctx = Context.create( - executor=executor, identity=Identity(id="user-1", roles=("viewer",)), ) with pytest.raises(ApprovalDeniedError) as exc_info: diff --git a/tests/test_async_task.py b/tests/test_async_task.py index a810f8d..06c056b 100644 --- a/tests/test_async_task.py +++ b/tests/test_async_task.py @@ -276,7 +276,7 @@ async def test_cleanup_removes_old_tasks(self, manager: AsyncTaskManager) -> Non assert info.status == TaskStatus.COMPLETED # With max_age=0, everything completed should be cleaned - removed = manager.cleanup(max_age_seconds=0.0) + removed = await manager.cleanup(max_age_seconds=0.0) assert removed == 1 assert manager.get_status(task_id) is None @@ -286,7 +286,7 @@ async def test_cleanup_preserves_recent_tasks(self, manager: AsyncTaskManager) - await asyncio.sleep(0.1) # With a large max_age, nothing should be removed - removed = manager.cleanup(max_age_seconds=3600.0) + removed = await manager.cleanup(max_age_seconds=3600.0) assert removed == 0 assert len(manager.list_tasks()) == 1 @@ -296,7 +296,7 @@ async def test_cleanup_preserves_running_tasks(self, manager: AsyncTaskManager) await asyncio.sleep(0.1) # Running tasks should not be cleaned up even with max_age=0 - removed = manager.cleanup(max_age_seconds=0.0) + removed = await manager.cleanup(max_age_seconds=0.0) assert removed == 0 await manager.cancel(task_id) @@ -370,7 +370,7 @@ async def test_submit_at_limit_after_cleanup(self, executor: Executor) -> None: await asyncio.sleep(0.1) # Clean up completed tasks to free slots - mgr.cleanup(max_age_seconds=0.0) + await mgr.cleanup(max_age_seconds=0.0) # Now we should be able to submit again task_id = await mgr.submit("test.simple", {"x": 3}) @@ -450,35 +450,35 @@ async def test_done_callback_removes_async_task(self, executor: Executor) -> Non class _SpyStore: - """TaskStore spy that records status at the time of each save() call. + """Async TaskStore spy that records status at the time of each save() call. - Implements the post-D-10 surface (``save``, ``list_expired``) and keeps - ``put`` as a back-compat shim for tests that still call it. + Implements the post-D-17 async surface (``save``, ``list_expired``) + and keeps ``put`` as a back-compat shim for tests that still call it. """ def __init__(self) -> None: self._data: dict[str, TaskInfo] = {} self.put_statuses: list[TaskStatus] = [] - def get(self, task_id: str) -> TaskInfo | None: + async def get(self, task_id: str) -> TaskInfo | None: return self._data.get(task_id) - def save(self, info: TaskInfo) -> None: + async def save(self, info: TaskInfo) -> None: self._data[info.task_id] = info self.put_statuses.append(info.status) - def put(self, info: TaskInfo) -> None: - self.save(info) + async def put(self, info: TaskInfo) -> None: + await self.save(info) - def delete(self, task_id: str) -> None: + async def delete(self, task_id: str) -> None: self._data.pop(task_id, None) - def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: + async def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: if status is None: return list(self._data.values()) return [t for t in self._data.values() if t.status == status] - def list_expired(self, before_timestamp: float) -> list[TaskInfo]: + async def list_expired(self, before_timestamp: float) -> list[TaskInfo]: terminal = {TaskStatus.COMPLETED, TaskStatus.FAILED, TaskStatus.CANCELLED} return [ t @@ -516,7 +516,7 @@ def test_custom_store_satisfies_protocol(self) -> None: class TestInMemoryTaskStore: - """TC-003..006: InMemoryTaskStore CRUD and list filtering.""" + """TC-003..006: InMemoryTaskStore CRUD and list filtering (async, post-D-17).""" def _make_info(self, status: TaskStatus = TaskStatus.COMPLETED) -> TaskInfo: import uuid @@ -528,29 +528,33 @@ def _make_info(self, status: TaskStatus = TaskStatus.COMPLETED) -> TaskInfo: submitted_at=time.time(), ) - def test_put_then_get_returns_same_object(self) -> None: + @pytest.mark.asyncio + async def test_put_then_get_returns_same_object(self) -> None: store = InMemoryTaskStore() info = self._make_info() - store.put(info) - assert store.get(info.task_id) is info + await store.put(info) + assert await store.get(info.task_id) is info - def test_get_unknown_returns_none(self) -> None: - assert InMemoryTaskStore().get("nonexistent") is None + @pytest.mark.asyncio + async def test_get_unknown_returns_none(self) -> None: + assert await InMemoryTaskStore().get("nonexistent") is None - def test_delete_removes_entry(self) -> None: + @pytest.mark.asyncio + async def test_delete_removes_entry(self) -> None: store = InMemoryTaskStore() info = self._make_info() - store.put(info) - store.delete(info.task_id) - assert store.get(info.task_id) is None + await store.put(info) + await store.delete(info.task_id) + assert await store.get(info.task_id) is None - def test_list_status_filter(self) -> None: + @pytest.mark.asyncio + async def test_list_status_filter(self) -> None: store = InMemoryTaskStore() completed = self._make_info(TaskStatus.COMPLETED) failed = self._make_info(TaskStatus.FAILED) - store.put(completed) - store.put(failed) - result = store.list(status=TaskStatus.COMPLETED) + await store.put(completed) + await store.put(failed) + result = await store.list(status=TaskStatus.COMPLETED) assert len(result) == 1 assert result[0] is completed @@ -565,8 +569,9 @@ async def test_manager_accepts_custom_store(self, executor: Executor) -> None: task_id = await mgr.submit("test.simple", {"x": 1}) await asyncio.sleep(0.1) - assert spy.get(task_id) is not None - assert spy.get(task_id).status == TaskStatus.COMPLETED # type: ignore[union-attr] + stored = await spy.get(task_id) + assert stored is not None + assert stored.status == TaskStatus.COMPLETED @pytest.mark.asyncio async def test_put_called_for_each_state_transition(self, executor: Executor) -> None: @@ -707,7 +712,7 @@ async def test_reaper_removes_terminal_tasks(self, executor: Executor) -> None: mgr.start_reaper(interval_seconds=0.05, max_age_seconds=0.0) await asyncio.sleep(0.2) - mgr.stop_reaper() + await mgr.stop_reaper() assert mgr.get_status(task_id) is None @@ -723,7 +728,7 @@ async def test_reaper_does_not_remove_active_tasks(self, executor: Executor) -> mgr.start_reaper(interval_seconds=0.05, max_age_seconds=0.0) await asyncio.sleep(0.2) - mgr.stop_reaper() + await mgr.stop_reaper() info = mgr.get_status(task_id) assert info is not None @@ -735,7 +740,7 @@ async def test_stop_reaper_halts_cleanup(self, executor: Executor) -> None: """TC-020: stop_reaper() prevents further automatic cleanup.""" mgr = AsyncTaskManager(executor) mgr.start_reaper(interval_seconds=0.05, max_age_seconds=0.0) - mgr.stop_reaper() + await mgr.stop_reaper() task_id = await mgr.submit("test.simple", {"x": 1}) await asyncio.sleep(0.2) @@ -751,10 +756,11 @@ async def test_shutdown_stops_reaper(self, executor: Executor) -> None: await mgr.shutdown() # No exception means reaper was stopped cleanly - def test_stop_reaper_noop_when_not_running(self, executor: Executor) -> None: + @pytest.mark.asyncio + async def test_stop_reaper_noop_when_not_running(self, executor: Executor) -> None: """TC-022: stop_reaper() is safe when no reaper is active.""" mgr = AsyncTaskManager(executor) - mgr.stop_reaper() # must not raise + await mgr.stop_reaper() # must not raise @pytest.mark.asyncio async def test_double_start_reaper_raises(self, executor: Executor) -> None: @@ -763,7 +769,7 @@ async def test_double_start_reaper_raises(self, executor: Executor) -> None: mgr.start_reaper(interval_seconds=60.0) with pytest.raises(RuntimeError, match="already running"): mgr.start_reaper(interval_seconds=60.0) - mgr.stop_reaper() + await mgr.stop_reaper() class TestRegression: @@ -795,7 +801,7 @@ async def test_existing_api_unchanged(self, executor: Executor) -> None: assert mgr.get_result(task_id) == {"value": 5} assert len(mgr.list_tasks()) == 1 - removed = mgr.cleanup(max_age_seconds=0.0) + removed = await mgr.cleanup(max_age_seconds=0.0) assert removed == 1 await mgr.shutdown() diff --git a/tests/test_async_task_alignment.py b/tests/test_async_task_alignment.py index 3f52189..9c03c85 100644 --- a/tests/test_async_task_alignment.py +++ b/tests/test_async_task_alignment.py @@ -37,26 +37,29 @@ class TestSaveRename: - def test_save_stores_task(self) -> None: + @pytest.mark.asyncio + async def test_save_stores_task(self) -> None: store = InMemoryTaskStore() info = TaskInfo(task_id="t1", module_id="m", status=TaskStatus.PENDING, submitted_at=time.time()) - store.save(info) - assert store.get("t1") is info + await store.save(info) + assert await store.get("t1") is info - def test_put_emits_deprecation_warning(self) -> None: + @pytest.mark.asyncio + async def test_put_emits_deprecation_warning(self) -> None: store = InMemoryTaskStore() info = TaskInfo(task_id="t2", module_id="m", status=TaskStatus.PENDING, submitted_at=time.time()) with warnings.catch_warnings(record=True) as caught: warnings.simplefilter("always") - store.put(info) + await store.put(info) # At least one DeprecationWarning mentioning save should have fired. assert any(issubclass(w.category, DeprecationWarning) for w in caught) # Functional behaviour preserved. - assert store.get("t2") is info + assert await store.get("t2") is info class TestListExpired: - def test_list_expired_returns_terminal_tasks_older_than(self) -> None: + @pytest.mark.asyncio + async def test_list_expired_returns_terminal_tasks_older_than(self) -> None: store = InMemoryTaskStore() old_completed = TaskInfo( task_id="old", @@ -78,11 +81,11 @@ def test_list_expired_returns_terminal_tasks_older_than(self) -> None: status=TaskStatus.RUNNING, submitted_at=time.time(), ) - store.save(old_completed) - store.save(recent) - store.save(active) + await store.save(old_completed) + await store.save(recent) + await store.save(active) - expired = store.list_expired(before_timestamp=200.0) + expired = await store.list_expired(before_timestamp=200.0) ids = {t.task_id for t in expired} # old_completed is older than 200 → expired. recent is newer. # active is non-terminal → never expired. @@ -109,25 +112,25 @@ def __init__(self) -> None: self._data: dict[str, TaskInfo] = {} self.statuses: list[TaskStatus] = [] - def get(self, task_id: str) -> TaskInfo | None: + async def get(self, task_id: str) -> TaskInfo | None: return self._data.get(task_id) - def save(self, info: TaskInfo) -> None: + async def save(self, info: TaskInfo) -> None: self._data[info.task_id] = info self.statuses.append(info.status) - def put(self, info: TaskInfo) -> None: # back-compat shim - self.save(info) + async def put(self, info: TaskInfo) -> None: # back-compat shim + await self.save(info) - def delete(self, task_id: str) -> None: + async def delete(self, task_id: str) -> None: self._data.pop(task_id, None) - def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: + async def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: if status is None: return list(self._data.values()) return [t for t in self._data.values() if t.status == status] - def list_expired(self, before_timestamp: float) -> list[TaskInfo]: + async def list_expired(self, before_timestamp: float) -> list[TaskInfo]: return [t for t in self._data.values() if t.completed_at is not None and t.completed_at < before_timestamp] diff --git a/tests/test_async_task_reaper_alignment.py b/tests/test_async_task_reaper_alignment.py index 598c8f7..fcb7589 100644 --- a/tests/test_async_task_reaper_alignment.py +++ b/tests/test_async_task_reaper_alignment.py @@ -43,8 +43,7 @@ async def test_start_reaper_new_signature_returns_handle() -> None: assert mgr._reaper_max_age == pytest.approx(10.0) assert mgr._reaper_interval == pytest.approx(60.0) # ms → s finally: - handle.stop() - await asyncio.sleep(0) # let cancellation settle + await handle.stop() @pytest.mark.asyncio @@ -59,8 +58,7 @@ async def test_start_reaper_legacy_aliases_emit_deprecation_warning() -> None: assert mgr._reaper_interval == pytest.approx(120.0) assert mgr._reaper_max_age == pytest.approx(30.0) finally: - handle.stop() - await asyncio.sleep(0) + await handle.stop() deprecation = [w for w in caught if issubclass(w.category, DeprecationWarning)] msgs = " | ".join(str(w.message) for w in deprecation) @@ -75,11 +73,10 @@ async def test_reaper_handle_stop_cancels_loop() -> None: mgr = AsyncTaskManager(executor=_StubExecutor()) handle = mgr.start_reaper(ttl_seconds=1.0, sweep_interval_ms=1_000) assert handle.is_running() - handle.stop() - await asyncio.sleep(0) + await handle.stop() assert not handle.is_running() # Idempotent — no exception on second stop. - handle.stop() + await handle.stop() @pytest.mark.asyncio diff --git a/tests/test_async_task_reaper_default.py b/tests/test_async_task_reaper_default.py index edd1110..891f584 100644 --- a/tests/test_async_task_reaper_default.py +++ b/tests/test_async_task_reaper_default.py @@ -35,5 +35,4 @@ async def test_start_reaper_default_sweep_interval_is_300000_ms() -> None: # Default sweep_interval_ms = 300_000 → 300 s internally. assert mgr._reaper_interval == pytest.approx(300.0) finally: - handle.stop() - await asyncio.sleep(0) + await handle.stop() diff --git a/tests/test_async_task_sync_audit_v022.py b/tests/test_async_task_sync_audit_v022.py new file mode 100644 index 0000000..0e7f8bc --- /dev/null +++ b/tests/test_async_task_sync_audit_v022.py @@ -0,0 +1,214 @@ +"""Regression tests for async-task sync-audit findings against v0.22.0. + +Each test pins a normative decision recorded in the apcore spec: + +- A-D-AT-03 / D-11 — ``ReaperHandle.stop`` is async and drains the task. +- A-D-AT-04 / D-17 — ``TaskStore`` Protocol methods are all async. +- A-D-AT-06 — ``AsyncTaskManager.get_status`` / ``list_tasks`` return copies. +- A-D-AT-09 / D-14 — Legacy ``RetryPolicy`` defaults to ``max_retries=0`` + and emits a ``DeprecationWarning`` on instantiation. +""" + +from __future__ import annotations + +import asyncio +import inspect +import warnings +from typing import Any + +import pytest + +from apcore.async_task import ( + AsyncTaskManager, + InMemoryTaskStore, + ReaperHandle, + RetryPolicy, + TaskInfo, + TaskStatus, + TaskStore, +) + + +class _StubExecutor: + """Minimal executor implementing the protocol surface for the manager.""" + + async def call_async( + self, + module_id: str, + inputs: dict[str, Any] | None = None, + context: Any = None, + version_hint: str | None = None, + ) -> dict[str, Any]: + await asyncio.sleep(0) # yield once so cancellation paths still race + return {"ok": True} + + +# --------------------------------------------------------------------------- +# A-D-AT-04 / D-17 — TaskStore Protocol methods are async +# --------------------------------------------------------------------------- + + +class TestTaskStoreProtocolIsAsync: + def test_in_memory_store_methods_are_coroutines(self) -> None: + store = InMemoryTaskStore() + for name in ("save", "get", "delete", "list", "list_expired"): + assert inspect.iscoroutinefunction(getattr(store, name)), ( + f"InMemoryTaskStore.{name} must be async (D-17)" + ) + + def test_protocol_methods_declared_async(self) -> None: + # Walk the Protocol annotations: every declared method must be an + # async function so custom stores share the contract. + for name in ("save", "get", "delete", "list", "list_expired"): + method = getattr(TaskStore, name) + assert inspect.iscoroutinefunction(method), ( + f"TaskStore Protocol method {name!r} must be async (D-17)" + ) + + +# --------------------------------------------------------------------------- +# A-D-AT-03 / D-11 — ReaperHandle.stop is async and drains the task +# --------------------------------------------------------------------------- + + +class TestReaperHandleStopIsAsync: + def test_reaper_handle_stop_is_coroutine_function(self) -> None: + assert inspect.iscoroutinefunction(ReaperHandle.stop) + + @pytest.mark.asyncio + async def test_reaper_handle_stop_awaits_underlying_task(self) -> None: + mgr = AsyncTaskManager(executor=_StubExecutor()) + handle = mgr.start_reaper(ttl_seconds=1.0, sweep_interval_ms=10) + assert handle.is_running() + await handle.stop() + # After stop returns, the underlying task is guaranteed to be done — + # no manual ``await asyncio.sleep(0)`` should be necessary. + assert mgr._reaper_task is None + assert not handle.is_running() + + @pytest.mark.asyncio + async def test_stop_reaper_idempotent_when_already_stopped(self) -> None: + mgr = AsyncTaskManager(executor=_StubExecutor()) + mgr.start_reaper(ttl_seconds=1.0, sweep_interval_ms=10) + await mgr.stop_reaper() + # Second call must be a clean no-op. + await mgr.stop_reaper() + + +# --------------------------------------------------------------------------- +# A-D-AT-06 — get_status / list_tasks return mutation-safe copies +# --------------------------------------------------------------------------- + + +class TestGetStatusReturnsSnapshot: + @pytest.mark.asyncio + async def test_get_status_returns_copy(self) -> None: + mgr = AsyncTaskManager(executor=_StubExecutor()) + task_id = await mgr.submit("test.x", {}) + await asyncio.sleep(0.05) + snapshot = mgr.get_status(task_id) + assert snapshot is not None + # Mutate the snapshot — the store's record must not be affected. + snapshot.status = TaskStatus.FAILED + snapshot.error = "tampered" + fresh = mgr.get_status(task_id) + assert fresh is not None + assert fresh.status != TaskStatus.FAILED or fresh.error != "tampered" + # And the two returned objects must not be the *same* instance — + # otherwise callers can still share state by aliasing. + again = mgr.get_status(task_id) + assert again is not None + assert again is not snapshot + + @pytest.mark.asyncio + async def test_list_tasks_returns_copies(self) -> None: + mgr = AsyncTaskManager(executor=_StubExecutor()) + await mgr.submit("test.x", {}) + await asyncio.sleep(0.05) + first = mgr.list_tasks() + second = mgr.list_tasks() + assert first and second + # Distinct list elements per call (defensive copies). + assert first[0] is not second[0] + # Mutating the returned snapshot must not leak into the store. + first[0].error = "tampered" + again = mgr.list_tasks() + assert again[0].error != "tampered" + + +# --------------------------------------------------------------------------- +# A-D-AT-09 / D-14 — RetryPolicy default + deprecation warning +# --------------------------------------------------------------------------- + + +class TestRetryPolicyOptInDefault: + def test_default_max_retries_is_zero(self) -> None: + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + policy = RetryPolicy() + assert policy.max_retries == 0, ( + "RetryPolicy() must default to max_retries=0 (D-14) so retries are opt-in" + ) + + def test_instantiation_emits_deprecation_warning(self) -> None: + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + RetryPolicy(max_retries=1) + deprecations = [w for w in caught if issubclass(w.category, DeprecationWarning)] + assert deprecations, ( + "Constructing RetryPolicy must emit a DeprecationWarning steering " + "callers to RetryConfig (A-D-AT-09)" + ) + + +# --------------------------------------------------------------------------- +# Sanity check that the manager survives an async-only custom store +# --------------------------------------------------------------------------- + + +class _AsyncOnlyStore: + """Custom store whose methods perform a real await — proves the manager + no longer relies on synchronous fall-through for store I/O.""" + + def __init__(self) -> None: + self._data: dict[str, TaskInfo] = {} + + async def get(self, task_id: str) -> TaskInfo | None: + await asyncio.sleep(0) + return self._data.get(task_id) + + async def save(self, info: TaskInfo) -> None: + await asyncio.sleep(0) + self._data[info.task_id] = info + + async def delete(self, task_id: str) -> None: + await asyncio.sleep(0) + self._data.pop(task_id, None) + + async def list(self, status: TaskStatus | None = None) -> list[TaskInfo]: + await asyncio.sleep(0) + if status is None: + return list(self._data.values()) + return [t for t in self._data.values() if t.status == status] + + async def list_expired(self, before_timestamp: float) -> list[TaskInfo]: + await asyncio.sleep(0) + return [ + t + for t in self._data.values() + if t.completed_at is not None and t.completed_at < before_timestamp + ] + + +class TestManagerWithAsyncStore: + @pytest.mark.asyncio + async def test_submit_and_inspect_works_against_async_store(self) -> None: + mgr = AsyncTaskManager(executor=_StubExecutor(), store=_AsyncOnlyStore()) + task_id = await mgr.submit("test.x", {}) + await asyncio.sleep(0.05) + # ``get_status_async`` is the async-friendly surface for I/O-backed stores. + info = await mgr.get_status_async(task_id) + assert info is not None + assert info.task_id == task_id + listed = await mgr.list_tasks_async() + assert any(t.task_id == task_id for t in listed) diff --git a/tests/test_cancel.py b/tests/test_cancel.py index 14e523d..5df16e6 100644 --- a/tests/test_cancel.py +++ b/tests/test_cancel.py @@ -67,8 +67,7 @@ def execute(self, inputs: dict[str, Any], context: Context) -> dict[str, Any]: token = CancelToken() token.cancel() - ctx = Context.create(executor=executor) - ctx.cancel_token = token + ctx = Context.create(cancel_token=token) with pytest.raises(ExecutionCancelledError): executor.call("test.module", {}, context=ctx) diff --git a/tests/test_conformance.py b/tests/test_conformance.py index a119a6d..e7b31da 100644 --- a/tests/test_conformance.py +++ b/tests/test_conformance.py @@ -1616,3 +1616,232 @@ def test_contextual_audit(case: dict[str, Any]) -> None: for forbidden in expected.get("data_must_not_contain_keys", []): assert forbidden not in data, f"[{case['id']}] data must not contain key {forbidden!r}; got={data!r}" + + +# --------------------------------------------------------------------------- +# Context.create unified-signature contract (PROTOCOL_SPEC §"Contract: Context.create", +# §"Contract: Executor binding to Context", §"Contract: Distributed cancellation", +# §"Contract: global_deadline distributed semantics"; apcore Issue #66) +# --------------------------------------------------------------------------- + +import inspect as _inspect # noqa: E402 + +from apcore.cancel import CancelToken as _CancelToken # noqa: E402 +from apcore.errors import ContextBindingError as _ContextBindingError # noqa: E402 +from apcore.executor import Executor as _CtxCreateExecutor # noqa: E402 +from apcore.module import Module as _CtxCreateModule # noqa: E402 +from apcore.registry import Registry as _CtxCreateRegistry # noqa: E402 + +_context_create_data = _load("context_create") + + +class _EchoModule(_CtxCreateModule): + """Minimal module used by Context.create conformance scenarios.""" + + description = "Echo module for Context.create conformance tests." + input_schema = None + output_schema = None + + def execute(self, inputs: dict[str, Any], context: Context) -> dict[str, Any]: + return dict(inputs) + + +def _build_executor_with_echo(module_id: str = "test.echo") -> _CtxCreateExecutor: + reg = _CtxCreateRegistry() + reg.register(module_id, _EchoModule()) + return _CtxCreateExecutor(registry=reg) + + +def _assert_trace_id(trace_id: str) -> None: + assert len(trace_id) == 32, f"trace_id length expected 32, got {len(trace_id)}" + assert all(c in "0123456789abcdef" for c in trace_id), f"trace_id not lowercase hex: {trace_id!r}" + + +@pytest.mark.parametrize( + "case", + _context_create_data["test_cases"], + ids=[c["id"] for c in _context_create_data["test_cases"]], +) +def test_context_create_unified_signature(case: dict[str, Any]) -> None: + """Conformance harness for context_create.json — exercises every scenario + defined in the cross-language fixture (Issue #66).""" + case_id = case["id"] + + if case_id == "create_minimal_all_defaults": + ctx = Context.create() + _assert_trace_id(ctx.trace_id) + assert ctx.identity is None + assert ctx.executor is None + assert ctx.cancel_token is None + assert ctx.services is None + assert ctx.global_deadline is None + assert ctx.caller_id is None + assert ctx.call_chain == [] + assert ctx.data == {} + + elif case_id == "create_with_identity_only": + ident = Identity( + id=case["input"]["identity"]["id"], + type=case["input"]["identity"]["type"], + roles=tuple(case["input"]["identity"]["roles"]), + ) + ctx = Context.create(identity=ident) + _assert_trace_id(ctx.trace_id) + assert ctx.identity is not None and ctx.identity.id == case["expected"]["identity_id"] + assert ctx.executor is None + assert ctx.cancel_token is None + + elif case_id == "create_with_cancel_token": + token = _CancelToken() + ctx = Context.create(cancel_token=token) + assert ctx.cancel_token is token, "cancel_token must be the same instance passed in" + assert ctx.executor is None, "executor MUST NOT be bound at create() time" + + elif case_id == "create_with_global_deadline": + deadline = case["input"]["global_deadline"] + ctx = Context.create(global_deadline=deadline) + assert ctx.global_deadline == deadline + assert ctx.executor is None + + elif case_id == "create_rejects_executor_input": + # Conforming SDK: executor is not a parameter of Context.create. + params = _inspect.signature(Context.create).parameters + assert "executor" not in params, ( + "Context.create MUST NOT accept an 'executor' parameter per PROTOCOL_SPEC " + "§Contract: Context.create (Issue #66)." + ) + + elif case_id == "create_rejects_caller_id_input": + params = _inspect.signature(Context.create).parameters + assert "caller_id" not in params, ( + "Context.create MUST NOT accept a 'caller_id' parameter per PROTOCOL_SPEC " + "§Contract: Context.create (Issue #66)." + ) + # And the field stays null on freshly created top-level contexts. + ctx = Context.create() + assert ctx.caller_id is None + + elif case_id == "executor_binds_on_first_call_local": + executor = _build_executor_with_echo("test.echo") + ctx = Context.create() + assert ctx.executor is None, "executor must be null immediately after create()" + executor.call("test.echo", {"hello": "world"}, context=ctx) + assert ctx.executor is executor, "Executor MUST bind itself to ctx.executor before step 1" + + elif case_id == "executor_binds_idempotent_same_instance": + executor = _build_executor_with_echo("test.echo") + ctx = Context.create() + for _ in range(3): + executor.call("test.echo", {}, context=ctx) + assert ctx.executor is executor, "Rebinding the same Executor MUST be a noop" + + elif case_id == "executor_rejects_cross_executor_rebind": + executor_a = _build_executor_with_echo("test.echo") + executor_b = _build_executor_with_echo("test.echo") + ctx = Context.create() + executor_a.call("test.echo", {}, context=ctx) + assert ctx.executor is executor_a + # Python SDK chooses the "raise" branch of expected_one_of. + with pytest.raises(_ContextBindingError): + executor_b.call("test.echo", {}, context=ctx) + + elif case_id == "child_propagates_executor": + executor = _build_executor_with_echo("test.echo") + ctx = Context.create() + ctx._bind_executor(executor) + child = ctx.child("test.target") + assert child.executor is executor + assert child.caller_id == (ctx.call_chain[-1] if ctx.call_chain else None) + assert child.call_chain[-1] == "test.target" + + elif case_id == "child_propagates_cancel_token": + token = _CancelToken() + ctx = Context.create(cancel_token=token) + child = ctx.child(case["input"]["create_child_module_id"]) + assert child.cancel_token is ctx.cancel_token + assert child.cancel_token is token + + elif case_id == "deserialize_then_call_binds_local_executor": + serialized = case["input"]["serialized_context"] + restored = Context.deserialize(serialized) + assert restored.executor is None + assert restored.cancel_token is None + assert restored.services is None + assert restored.global_deadline is None + assert restored.caller_id == case["expected"]["caller_id_preserved"] + # Now a local Executor receives the deserialized Context and binds itself. + executor = _build_executor_with_echo("local.target") + executor.call("local.target", {}, context=restored) + assert restored.executor is executor + + elif case_id == "distributed_cancel_token_synthesized_locally": + # Deserialized contexts arrive with cancel_token == None. The Python + # Executor synthesizes a per-call CancelToken inside the pipeline + # (Step 2 / BuiltinCallChainGuard). We exercise the end-to-end path + # by deserializing a Context and verifying no in-context token rode + # across the boundary. + serialized = { + "_context_version": 1, + "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736", + "caller_id": "remote.caller", + "call_chain": ["remote.caller"], + "identity": None, + "data": {}, + } + restored = Context.deserialize(serialized) + assert restored.cancel_token is None + executor = _build_executor_with_echo("local.slow") + executor.call("local.slow", {}, context=restored) + # After the call, ctx.cancel_token MAY still be None (synthesis is + # an Executor-internal concern, not necessarily surfaced on the + # outer Context). The normative requirement here is "no in-context + # token rode across deserialization" — re-asserted above. + + elif case_id == "distributed_global_deadline_recomputed_locally": + # Deserialized contexts have global_deadline == None. The local + # Executor MUST recompute from local config. + serialized = { + "_context_version": 1, + "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736", + "caller_id": "remote.caller", + "call_chain": [], + "identity": None, + "data": {}, + } + restored = Context.deserialize(serialized) + assert restored.global_deadline is None + global_timeout_ms = case["input"]["local_executor_global_timeout_ms"] + reg = _CtxCreateRegistry() + reg.register("local.echo", _EchoModule()) + executor = _CtxCreateExecutor( + registry=reg, + config=Config(data={"executor": {"global_timeout": global_timeout_ms}}), + ) + executor.call("local.echo", {}, context=restored) + assert restored.global_deadline is not None, "Executor MUST recompute global_deadline from local config" + + elif case_id == "tracestate_carried_inside_traceparent": + from apcore.trace_context import TraceParent as _TP + + tp_in = case["input"]["trace_parent"] + tp = _TP( + version="00", + trace_id=tp_in["trace_id"], + parent_id=tp_in["parent_id"], + trace_flags=tp_in["trace_flags"], + tracestate=tuple((v[0], v[1]) for v in tp_in["tracestate"]), + ) + ctx = Context.create(trace_parent=tp) + assert ctx.trace_id == case["expected"]["trace_id"] + # tracestate is carried via context.data per the SDK's TraceParent + # plumbing — verify it round-tripped. + carried = ctx.data.get("_apcore.trace.state") + assert carried is not None and len(carried) == len(tp_in["tracestate"]) + # And the signature does NOT expose a separate tracestate parameter. + params = _inspect.signature(Context.create).parameters + assert "tracestate" not in params, ( + "tracestate MUST live inside TraceParent — no separate Context.create parameter" + ) + + else: + pytest.fail(f"Unknown context_create fixture case id: {case_id!r}") diff --git a/tests/test_context_key.py b/tests/test_context_key.py index 3a0e224..a1ac776 100644 --- a/tests/test_context_key.py +++ b/tests/test_context_key.py @@ -11,7 +11,7 @@ class TestContextKey: def _make_ctx(self) -> Context: """Create a minimal Context for testing.""" - return Context.create(executor=None) + return Context.create() def test_get_returns_typed_value(self) -> None: """AC-001: get() returns typed value from context.data.""" diff --git a/tests/test_context_serialization.py b/tests/test_context_serialization.py index d313d7c..76ecb07 100644 --- a/tests/test_context_serialization.py +++ b/tests/test_context_serialization.py @@ -13,7 +13,7 @@ class TestContextSerialize: """AC-003, AC-004, AC-005: Context.serialize() protocol compliance.""" def _make_ctx(self) -> Context: - ctx = Context.create(executor=None) + ctx = Context.create() ctx.identity = Identity( id="user-1", type="user", @@ -97,7 +97,7 @@ class TestContextDeserialize: def test_deserialize_roundtrip(self) -> None: """Serialize then deserialize preserves fields.""" - ctx = Context.create(executor=None) + ctx = Context.create() ctx.identity = Identity(id="user-1", type="user", roles=("admin",), attrs={"org": "acme"}) ctx.data["app.counter"] = 42 serialized = ctx.serialize() @@ -110,28 +110,33 @@ def test_deserialize_roundtrip(self) -> None: def test_deserialize_executor_is_none(self) -> None: """After deserialization, executor is None.""" - ctx = Context.create(executor="some-executor") + ctx = Context.create() + # Simulate a Context that an Executor has already bound itself to. + # Under the unified Context.create() signature (Issue #66), executor + # is no longer a constructor input; binding happens via the private + # helper. + ctx._bind_executor("some-executor") serialized = ctx.serialize() restored = Context.deserialize(serialized) assert restored.executor is None def test_deserialize_services_is_none(self) -> None: """After deserialization, services is None.""" - ctx = Context.create(executor=None) + ctx = Context.create() serialized = ctx.serialize() restored = Context.deserialize(serialized) assert restored.services is None def test_deserialize_cancel_token_is_none(self) -> None: """After deserialization, cancel_token is None.""" - ctx = Context.create(executor=None) + ctx = Context.create() serialized = ctx.serialize() restored = Context.deserialize(serialized) assert restored.cancel_token is None def test_deserializeglobal_deadline_is_none(self) -> None: """After deserialization, global_deadline is None.""" - ctx = Context.create(executor=None) + ctx = Context.create() serialized = ctx.serialize() restored = Context.deserialize(serialized) assert restored.global_deadline is None diff --git a/tests/test_executor.py b/tests/test_executor.py index 4561116..b4ff5d9 100644 --- a/tests/test_executor.py +++ b/tests/test_executor.py @@ -166,7 +166,7 @@ def test_derives_child_context(self) -> None: """Step 1: Derives child context when context provided.""" mod = MockModule() ex = _make_executor(module=mod) - parent_ctx = Context.create(executor=ex) + parent_ctx = Context.create() ex.call("test.module", {"name": "Alice"}, context=parent_ctx) _, ctx = mod.execute_calls[0] assert "test.module" in ctx.call_chain @@ -178,7 +178,7 @@ def test_depth_exceeded(self) -> None: config = Config(data={"executor": {"max_call_depth": 2}}) ex = _make_executor(module=mod, config=config) # Create a context with a deep call chain - ctx = Context.create(executor=ex) + ctx = Context.create() ctx.call_chain = ["a", "b", "c"] with pytest.raises(CallDepthExceededError): ex.call("test.module", {"name": "Alice"}, context=ctx) @@ -187,7 +187,7 @@ def test_circular_detection(self) -> None: """Step 2: Raises CircularCallError for a->b->a.""" mod = MockModule() ex = _make_executor(module=mod, module_id="a") - ctx = Context.create(executor=ex) + ctx = Context.create() ctx.call_chain = ["a", "b"] with pytest.raises(CircularCallError): ex.call("a", {"name": "Alice"}, context=ctx) @@ -196,7 +196,7 @@ def test_circular_detection_longer_chain(self) -> None: """Step 2: Raises CircularCallError for a->b->c->a.""" mod = MockModule() ex = _make_executor(module=mod, module_id="a") - ctx = Context.create(executor=ex) + ctx = Context.create() ctx.call_chain = ["a", "b", "c"] with pytest.raises(CircularCallError): ex.call("a", {"name": "Alice"}, context=ctx) @@ -205,7 +205,7 @@ def test_self_call_not_circular(self) -> None: """Step 2: Self-call a->a is NOT circular, governed by frequency.""" mod = MockModule() ex = _make_executor(module=mod, module_id="a") - ctx = Context.create(executor=ex) + ctx = Context.create() ctx.call_chain = ["a"] # Self-call should not raise CircularCallError; frequency limit governs it ex.call("a", {"name": "Alice"}, context=ctx) @@ -216,7 +216,7 @@ def test_frequency_exceeded(self) -> None: mod = MockModule() config = Config(data={"executor": {"max_module_repeat": 2}}) ex = _make_executor(module=mod, module_id="a", config=config) - ctx = Context.create(executor=ex) + ctx = Context.create() ctx.call_chain = ["a", "a", "a"] with pytest.raises(CallFrequencyExceededError): ex.call("a", {"name": "Alice"}, context=ctx) diff --git a/tests/test_executor_sync_audit_v022.py b/tests/test_executor_sync_audit_v022.py new file mode 100644 index 0000000..74c2c47 --- /dev/null +++ b/tests/test_executor_sync_audit_v022.py @@ -0,0 +1,127 @@ +"""Regression tests for executor sync-audit findings against v0.22.0. + +- A-D-EXEC-002 / D-21 — Cancel-token check at the CallChainGuard step + (Step 2) — fires before any expensive ACL / middleware / validation + work runs. +- A-D-EXEC-005 / D-22 — Error Unwrap Rule. MiddlewareChainError MUST + surface its ``.original`` typed cause unchanged; the executor MUST NOT + replace it with a generic ``ModuleExecuteError``. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from apcore.cancel import CancelToken, ExecutionCancelledError +from apcore.context import Context +from apcore.errors import ApprovalDeniedError, ModuleExecuteError +from apcore.executor import Executor +from apcore.middleware import Middleware +from apcore.registry import Registry + + +class _NoopModule: + """Module whose execute() should never run when the call is cancelled + at Step 2.""" + + input_schema = None + output_schema = None + + def __init__(self) -> None: + self.executed = False + + async def execute(self, inputs: dict[str, Any], context: Context) -> dict[str, Any]: + self.executed = True + return {"ok": True} + + +class _ApprovalResult: + """Minimal stand-in for an ApprovalResult — carries the ``reason`` attr + that :class:`ApprovalDeniedError` reads when formatting its message.""" + + reason = "testing unwrap" + + +class _RaisingApprovalMiddleware(Middleware): + """Middleware whose before() always raises ApprovalDeniedError. + + The middleware chain machinery will wrap this in MiddlewareChainError; + per D-22 the executor MUST unwrap and surface the original typed + cause to the caller. + """ + + async def before(self, module_id: str, inputs: dict[str, Any], context: Context) -> None: + raise ApprovalDeniedError(result=_ApprovalResult(), module_id=module_id) + + +# --------------------------------------------------------------------------- +# A-D-EXEC-002 / D-21 — Cancel token observed at CallChainGuard (Step 2) +# --------------------------------------------------------------------------- + + +class TestCancelTokenAtCallChainGuard: + @pytest.mark.asyncio + async def test_cancelled_token_short_circuits_before_module_runs(self) -> None: + module = _NoopModule() + reg = Registry() + reg.register("test.noop", module) + executor = Executor(registry=reg) + + token = CancelToken() + token.cancel() + # cancel_token is a first-class Context.create() parameter (v0.22.0 + + # Issue #66 unified signature). The Executor auto-binds itself at + # entry, so executor= is no longer accepted as a public input. + ctx = Context.create(cancel_token=token) + + with pytest.raises(ExecutionCancelledError): + await executor.call_async("test.noop", {}, context=ctx) + + # Step 2 short-circuit means the module body never ran — no ACL, + # no input validation, no middleware-before work happened first. + assert module.executed is False + + +# --------------------------------------------------------------------------- +# A-D-EXEC-005 / D-22 — MiddlewareChainError unwrap rule +# --------------------------------------------------------------------------- + + +class _PlainModule: + input_schema = None + output_schema = None + + async def execute(self, inputs: dict[str, Any], context: Context) -> dict[str, Any]: + return {"ok": True} + + +class TestMiddlewareChainErrorUnwrap: + @pytest.mark.asyncio + async def test_typed_cause_propagates_unwrapped(self) -> None: + reg = Registry() + reg.register("test.plain", _PlainModule()) + executor = Executor(registry=reg) + executor.use(_RaisingApprovalMiddleware()) + + # The caller MUST receive ApprovalDeniedError unchanged, NOT a + # generic ModuleExecuteError that swallows the typed cause. + with pytest.raises(ApprovalDeniedError): + await executor.call_async("test.plain", {}) + + @pytest.mark.asyncio + async def test_unwrap_does_not_collapse_to_module_execute_error(self) -> None: + reg = Registry() + reg.register("test.plain", _PlainModule()) + executor = Executor(registry=reg) + executor.use(_RaisingApprovalMiddleware()) + + try: + await executor.call_async("test.plain", {}) + except ModuleExecuteError as exc: + pytest.fail( + f"Caller received generic ModuleExecuteError instead of typed cause: {exc!r}" + ) + except ApprovalDeniedError: + pass # expected diff --git a/tests/test_integration_executor.py b/tests/test_integration_executor.py index 96bd10b..801bdc0 100644 --- a/tests/test_integration_executor.py +++ b/tests/test_integration_executor.py @@ -121,7 +121,7 @@ def test_acl_context_conditions(self, mock_registry: Registry) -> None: ex.call("test.sync_module", {"name": "test"}) # Service identity -> allowed - ctx = Context.create(executor=ex) + ctx = Context.create() ctx.identity = Identity(id="svc-1", type="service") result = ex.call("test.sync_module", {"name": "test"}, context=ctx) assert result == {"greeting": "Hello, test!"} diff --git a/tests/test_public_api.py b/tests/test_public_api.py index ce14357..b9a3dea 100644 --- a/tests/test_public_api.py +++ b/tests/test_public_api.py @@ -455,6 +455,7 @@ class TestPublicAPIAll: "CircularDependencyError", "ConfigError", "ConfigNotFoundError", + "ContextBindingError", "DependencyNotFoundError", "DependencyVersionMismatchError", "TaskLimitExceededError", diff --git a/tests/test_registry_sync_audit_v022.py b/tests/test_registry_sync_audit_v022.py new file mode 100644 index 0000000..49d802a --- /dev/null +++ b/tests/test_registry_sync_audit_v022.py @@ -0,0 +1,190 @@ +"""Regression tests for registry sync-audit findings against v0.22.0. + +- A-D-REG-002 — ``register_internal`` rejects both ``"ephemeral"`` (bare) + and ``"ephemeral.*"`` IDs via the shared ``_is_ephemeral`` helper. + (The previous ``.startswith("ephemeral.")`` check missed bare + ``"ephemeral"``, leaving a one-character carveout that contradicted + the canonical classifier.) + +- A-D-REG-003 / Issue #65 — Discover-path registration uses the same + deferred-publish protocol as ``register()``: modules MUST NOT appear + in the visible store until ``on_load`` completes successfully. + +- A-D-REG-004 / Issue #65 — ``register_internal`` uses deferred-publish + too — the same invariant holds for sys-modules. ``on_load`` failures + emit ``apcore.registry.module_load_failed`` and leave the registry + exactly as before the registration attempt. + +- A-D-REG-005 — Discover-path ``on_load`` failures emit + ``apcore.registry.module_load_failed``, matching the public path so + subscribers have a single hook for partial-init detection. +""" + +from __future__ import annotations + +from typing import Any +from unittest.mock import MagicMock + +import pytest + +from apcore.registry.registry import Registry + + +class _NoopModule: + description = "stub" + input_schema = {"type": "object"} + output_schema = {"type": "object"} + + async def execute(self, inputs: dict[str, Any], context: Any) -> dict[str, Any]: + return {} + + +class _ProbeOnLoadModule(_NoopModule): + """on_load that asserts the module is NOT visible while the callback runs.""" + + def __init__(self, registry: Registry, mod_id: str) -> None: + self._registry = registry + self._mod_id = mod_id + self.visible_during_on_load: bool | None = None + + def on_load(self) -> None: + self.visible_during_on_load = self._registry.get(self._mod_id) is not None + + +class _FailingOnLoadModule(_NoopModule): + """on_load that always raises.""" + + def on_load(self) -> None: + raise RuntimeError("intentional on_load failure") + + +# --------------------------------------------------------------------------- +# A-D-REG-002 — register_internal rejects bare "ephemeral" too +# --------------------------------------------------------------------------- + + +class TestRegisterInternalRejectsBareEphemeralId: + def test_bare_ephemeral_id_rejected(self) -> None: + reg = Registry() + with pytest.raises(ValueError, match="ephemeral"): + reg.register_internal("ephemeral", _NoopModule()) + + def test_namespaced_ephemeral_id_rejected(self) -> None: + reg = Registry() + with pytest.raises(ValueError, match="ephemeral"): + reg.register_internal("ephemeral.foo", _NoopModule()) + + +# --------------------------------------------------------------------------- +# A-D-REG-003 — Discover-path uses deferred-publish (no early visibility) +# --------------------------------------------------------------------------- + + +class TestDiscoverPathDeferredPublish: + def test_module_invisible_during_on_load(self) -> None: + reg = Registry() + module = _ProbeOnLoadModule(reg, "system.probe") + # Drive _register_in_order directly via the same plumbing discover() + # would use — the only thing being tested here is the deferred-publish + # ordering, not the upstream stages of the 8-step pipeline. + reg._register_in_order( + load_order=["system.probe"], + valid_classes={"system.probe": type( + "_ProbeFactory", + (), + {"__init__": lambda self_, m=module: setattr(self_, "_proxy", m) or None, + "__getattr__": lambda self_, n: getattr(self_._proxy, n)}, + )}, + raw_metadata={}, + ) + # Probe-style assertion: the on_load callback's snapshot of + # ``registry.get(...)`` must have been ``None`` — i.e. the + # module was NOT yet published when the callback ran. + assert module.visible_during_on_load is False, ( + "Deferred-publish invariant violated: module was visible during on_load" + ) + + def test_on_load_failure_does_not_publish(self) -> None: + reg = Registry() + cls = type("_FailingFactory", (_FailingOnLoadModule,), {}) + count = reg._register_in_order( + load_order=["system.fail"], + valid_classes={"system.fail": cls}, + raw_metadata={}, + ) + assert count == 0 + # Module must NOT be visible after on_load failure. + assert reg.get("system.fail") is None + assert "system.fail" not in reg.list() + # And the in-flight slot must be released. + assert "system.fail" not in reg._in_flight + + +# --------------------------------------------------------------------------- +# A-D-REG-004 — register_internal uses deferred-publish + DLQ event +# --------------------------------------------------------------------------- + + +class TestRegisterInternalDeferredPublish: + def test_register_internal_invisible_during_on_load(self) -> None: + reg = Registry() + module = _ProbeOnLoadModule(reg, "system.probe") + reg.register_internal("system.probe", module) + assert module.visible_during_on_load is False + # And after on_load completed, the module IS visible. + assert reg.get("system.probe") is module + + def test_register_internal_on_load_failure_leaves_registry_unchanged(self) -> None: + reg = Registry() + before_ids = set(reg.list()) + with pytest.raises(RuntimeError, match="intentional on_load failure"): + reg.register_internal("system.fail", _FailingOnLoadModule()) + # Registry must be exactly as before — no partial state leak. + assert reg.get("system.fail") is None + assert set(reg.list()) == before_ids + assert "system.fail" not in reg._in_flight + + +# --------------------------------------------------------------------------- +# A-D-REG-005 — Discover-path emits module_load_failed on on_load failure +# --------------------------------------------------------------------------- + + +class TestDiscoverPathEmitsModuleLoadFailed: + def test_discover_path_emits_module_load_failed(self) -> None: + reg = Registry() + emitter = MagicMock() + reg.set_event_emitter(emitter) + + cls = type("_FailingFactory", (_FailingOnLoadModule,), {}) + reg._register_in_order( + load_order=["system.fail"], + valid_classes={"system.fail": cls}, + raw_metadata={}, + ) + + # The emitter MUST have received an apcore.registry.module_load_failed + # event for the failing module — the discover path is no longer + # silent about on_load failures. + emitted_types = [ + call.args[0].event_type for call in emitter.emit.call_args_list + ] + assert "apcore.registry.module_load_failed" in emitted_types, ( + f"Expected module_load_failed event from discover path, got: {emitted_types}" + ) + + def test_register_internal_emits_module_load_failed(self) -> None: + # Sanity: register_internal's load-failed event was already covered + # by Issue #65 for the public register() path; confirm that route + # holds for register_internal too after A-D-REG-004. + reg = Registry() + emitter = MagicMock() + reg.set_event_emitter(emitter) + + with pytest.raises(RuntimeError): + reg.register_internal("system.fail", _FailingOnLoadModule()) + + emitted_types = [ + call.args[0].event_type for call in emitter.emit.call_args_list + ] + assert "apcore.registry.module_load_failed" in emitted_types