From e539b49fb5b8b82359f008aabeb87298299b533f Mon Sep 17 00:00:00 2001 From: habeck Date: Thu, 18 Jun 2026 11:25:38 -0400 Subject: [PATCH 1/3] fix: create the HookRef on each invocation of load_and_run_hook so a stale one is not used. doc: isolated worker implementation plan Signed-off-by: habeck --- cpex/framework/isolated/worker.py | 22 +- ...eat-python-isolated-plugin-adapter-plan.md | 463 ++++++++++++++++++ 2 files changed, 477 insertions(+), 8 deletions(-) create mode 100644 docs/plans/2026-06-17-001-feat-python-isolated-plugin-adapter-plan.md diff --git a/cpex/framework/isolated/worker.py b/cpex/framework/isolated/worker.py index 74deee05..894168ed 100644 --- a/cpex/framework/isolated/worker.py +++ b/cpex/framework/isolated/worker.py @@ -36,7 +36,7 @@ class TaskProcessor: config_hash: str module_path_hash: str - hook_ref: HookRef + plugin_ref: PluginRef executor: PluginExecutor plugin_config: PluginConfig | None = None @@ -55,19 +55,27 @@ def compute_hash(self, json_config_or_module_path: str): def initialize( self, - hook_ref: HookRef, + plugin_ref: PluginRef, executor: PluginExecutor, json_config: str, module_path: str, plugin_config: PluginConfig, ): """Assign locals, and compute hashes.""" - self.hook_ref = hook_ref + # self.hook_ref = hook_ref + self.plugin_ref = plugin_ref self.executor = executor self.config_hash = self.compute_hash(json_config_or_module_path=json_config) self.module_path_hash = self.compute_hash(json_config_or_module_path=module_path) self.plugin_config = plugin_config + def get_hook_ref(self, hook_type: str) -> HookRef: + """ + make sure that the hook ref is not stale for the current task data. + """ + hook_ref = HookRef(hook_type, self.plugin_ref) + return hook_ref + def get_environment_info(): """Get information about current Python environment.""" @@ -112,7 +120,6 @@ async def process_task(task_data, tp: TaskProcessor): if tp.config_hash != tp.compute_hash(json_config): # pull the resolved plugin path and only add the module path if it has the same root config: PluginConfig = PluginConfig(**config_raw) - hook_type = task_data.get(HOOK_TYPE) cls_name: str = task_data.get("class_name") mod_name, n_cls_name = parse_class_name(cls_name) module: ModuleType = import_module(mod_name) @@ -121,12 +128,10 @@ async def process_task(task_data, tp: TaskProcessor): plugin_type = cast(Type[Plugin], class_) plugin = plugin_type(config) await plugin.initialize() - # now invoke the hook plugin_ref = PluginRef(plugin) - hook_ref = HookRef(hook_type, plugin_ref) executor = PluginExecutor(None, 30) tp.initialize( - hook_ref=hook_ref, + plugin_ref=plugin_ref, executor=executor, json_config=json_config, module_path=json.dumps(resolved_paths), @@ -134,11 +139,12 @@ async def process_task(task_data, tp: TaskProcessor): ) # retrieve the context context = task_data.get("context") + hook_type = task_data.get(HOOK_TYPE) plugin_context = PluginContext( state=context.get("state"), global_context=context.get("global_context"), metadata=context.get("metadata") ) result = await tp.executor.execute_plugin( - hook_ref=tp.hook_ref, + hook_ref=tp.get_hook_ref(hook_type), payload=task_data.get("payload"), local_context=plugin_context, violations_as_exceptions=False, diff --git a/docs/plans/2026-06-17-001-feat-python-isolated-plugin-adapter-plan.md b/docs/plans/2026-06-17-001-feat-python-isolated-plugin-adapter-plan.md new file mode 100644 index 00000000..db0d0034 --- /dev/null +++ b/docs/plans/2026-06-17-001-feat-python-isolated-plugin-adapter-plan.md @@ -0,0 +1,463 @@ +--- +title: "feat: Add python-isolated:// plugin adapter" +date: 2026-06-17 +origin: docs/brainstorms/2026-06-15-python-plugin-compat-requirements.md +milestone: 0.2.0 +issue: "#20" +status: ready +deepened: 2026-06-17 +--- + +# feat: Add `python-isolated://` Plugin Adapter + +**Origin:** `docs/brainstorms/2026-06-15-python-plugin-compat-requirements.md` +**Milestone:** 0.2.0 · **Issue:** #20 +**Dependency:** Issue #19 (cpex-python bindings) — complete as of d158db5 + +--- + +## Summary + +Add a new crate `crates/cpex-hosts-python` that lets the Rust `PluginManager` load and invoke Python plugin classes through a subprocess-isolated virtual environment. Plugin operators declare `kind: "python-isolated://module.ClassName"` in YAML; the Rust runtime creates and manages the venv, spawns the existing `cpex/framework/isolated/worker.py` process, and drives it via the JSON-lines stdin/stdout protocol already used by `IsolatedVenvPlugin` on the Python side. + +No PyO3 dependency is introduced. The in-process `python://` adapter is deferred to a future milestone. + +--- + +## Problem Frame + +The Rust `PluginManager` can load plugins implemented in Rust. Existing CPEX deployments have Python plugin classes (identity resolvers, token delegators, custom hook handlers) that must continue to work without modification. Issue #19 established the Python bindings foundation; this work surfaces it to Rust callers through the plugin factory system. + +The subprocess-isolated model is chosen over in-process PyO3 to: +- Avoid introducing a libpython dependency into the default Rust build +- Reuse the battle-tested `worker.py` / `VenvProcessCommunicator` protocol already in production +- Guarantee that plugin venv dependencies cannot conflict with host site-packages + +--- + +## Requirements Trace + +| Criterion | Source | +|---|---| +| `kind: "python-isolated://module.ClassName"` loads Python plugin | AC-8 | +| Venv created once on `initialize()`, reused via requirements-hash cache | AC-9 | +| `shutdown()` sends shutdown task, waits 5 s, then kills | AC-10 | +| Payload serialization uses JSON (no MessagePack) | AC-11 | +| `on_error` behavior (`fail` / `ignore` / `disable`) honoured on Python exceptions | AC-5 | +| `initialize()` / `shutdown()` Python methods called if defined; missing = no-op | AC-6 | +| Crate in `[workspace] members` but not `default-members` | AC-7 | +| `PluginResult` fields (`continue_processing`, `modified_payload`, `violation`) mapped correctly | AC-3, AC-4 | + +--- + +## Key Technical Decisions + +**1. Subprocess protocol reuse, no new wire format.** +`worker.py` already implements `load_and_run_hook` over JSON-lines stdin/stdout with request-ID-based demultiplexing. The Rust adapter replicates `VenvProcessCommunicator`'s subprocess lifecycle using `tokio::process::Command` and drives the same task dict. No new serialization layer needed; `serde_json` suffices. + +**2. `AnyHookHandler` hook-name binding.** +`AnyHookHandler::invoke` does not receive the hook name at call time — the handler is pre-bound via `hook_type_name() -> &'static str`. The factory reads hook names from YAML `config.hooks` and uses `Box::leak` (same pattern as `apl-pii-scanner`) to produce `'static str` keys. It populates `PluginInstance.handlers: Vec<(&'static str, Arc)>` and returns it; the manager calls `register_multi_handler` internally — the factory never calls the registry directly. + +**3. Payload serialization via per-hook static dispatch functions.** +`PluginPayload` has no `Serialize` bound — it is object-safe by design and `erased-serde` cannot be used on `&dyn PluginPayload` without breaking that contract. Serialization is performed by a `HookPayloadRegistry`: two `HashMap<&'static str, fn(...)>` maps (one for serialize, one for deserialize) keyed by hook type name. Each entry is a thin shim that downcasts via `as_any().downcast_ref::()` and calls `serde_json::to_value` (or `serde_json::from_value`). The registry is a field on `IsolatedPythonPluginAdapter`, populated during `IsolatedPythonPluginAdapterFactory::create` and shared across all handler entries via `Arc`. Private credential fields marked `#[serde(skip)]` remain on the Rust side. + +**4. `ErasedResultFields` constructed directly; registry shared via `Arc`.** +The adapter's `invoke()` returns `Box::new(ErasedResultFields { ... })` (`cpex_core::executor::ErasedResultFields` is `pub`) rather than `erase_result::

()`, since the concrete payload type is not statically known at the Python boundary. `PluginInstance` carries only `plugin` and `handlers` — no third field exists. Auxiliary state (the payload registry) lives on the adapter struct; the `Arc` cloned into each `handlers` entry naturally shares it at no extra cost. + +**5. `venv_path` defaults, YAML override optional.** +Default: `//.venv` — matching `IsolatedVenvPlugin`. An optional `config.venv_path` YAML key overrides it. Requirements-hash cache metadata lives at `/../.cpex/venv_cache/_metadata.json` (same layout as Python side). + +**6. `tokio` `process` feature.** +The workspace `tokio` declaration omits the `process` feature. The new crate's `Cargo.toml` adds `tokio = { workspace = true, features = ["process"] }` to activate it via Cargo feature unification — existing crates are unaffected. + +**7. Payload deserialization routing.** +The adapter stores a `hook_name → deserialize_fn` map populated at construction time from a registry of known payload types. Unknown hook names fall back to a `GenericPayload(serde_json::Value)` wrapper that satisfies `PluginPayload` and passes through unmodified. This defers the CMF `MessagePayload` special-case (open question in origin doc) to implementation-time lookup rather than speculative code now. + +--- + +## Scope Boundaries + +### In scope +- `crates/cpex-hosts-python` crate with `IsolatedPythonPluginAdapter` and its factory +- Subprocess lifecycle: venv create/cache, worker spawn, graceful shutdown with kill fallback +- JSON-lines dispatch reusing `worker.py` `load_and_run_hook` protocol +- `PluginResult` fields mapped to `ErasedResultFields` +- `on_error` (`fail` / `ignore` / `disable`) behaviour on Python exceptions +- Workspace plumbing: `Cargo.toml` members, `tokio` feature extension, CI comment +- Integration tests against a minimal Python plugin fixture + +### Deferred to Follow-Up Work +- In-process `python://` adapter (PyO3) — deferred by user decision +- Bidirectional payloads for hooks not yet in cpex-core +- Hot-reload of Python plugins at runtime +- Type stub generation for the adapter crate + +### Out of scope +- Modifying `worker.py` or `VenvProcessCommunicator` — Rust adapter consumes them as-is +- Python-side changes to `IsolatedVenvPlugin` +- Changes to `cpex-ffi` or the Go bindings layer + +--- + +## High-Level Technical Design + +### Subprocess lifecycle + +``` +PluginManager::initialize() + └─► IsolatedPythonPluginAdapter::initialize() + ├─ compute requirements hash + ├─ if hash unchanged and .venv exists → reuse + ├─ else → create venv, pip install, save metadata + └─ spawn worker.py subprocess (tokio::process::Command) + stdin/stdout pipes open, stderr logged + +PluginManager::invoke_by_name("hook_name", payload, extensions, ctx) + └─► IsolatedPythonPluginAdapter::invoke(payload, extensions, ctx) + ├─ registry.serialize(hook_name, payload) → task["payload"] + ├─ write JSON-lines task to worker stdin (+ request_id) + ├─ await response line from stdout channel + ├─ deserialize response → ErasedResultFields + └─ return Box::new(erased_fields) + +PluginManager::shutdown() + └─► IsolatedPythonPluginAdapter::shutdown() + ├─ send {"task_type":"shutdown","request_id":"shutdown"} + ├─ wait up to 5 s + └─ kill() if timeout +``` + +### Factory registration flow + +``` +// Host at startup: +let mut factories = PluginFactoryRegistry::new(); +factories.register( + cpex_hosts_python::isolated::KIND, // "python-isolated://" + Box::new(IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default())), + // HookPayloadRegistry::default() covers all built-in cpex-core payload types. + // Hosts with custom payload types must extend the registry before passing it in. +); +let manager = PluginManager::from_config(path, &factories)?; + +// YAML config consumed by factory: +// kind: "python-isolated://my_pkg.MyPlugin" +// config: +// requirements_file: plugins/my_pkg/requirements.txt +// venv_path: plugins/my_pkg/.venv # optional + +// Factory extracts URI host+path as class_name, +// Box::leaks each hook name from config.hooks, +// returns PluginInstance { plugin: adapter, handlers: [(leaked_name, Arc), ...] } +``` + +### Response → ErasedResultFields mapping + +`ErasedResultFields` has four fields (`executor.rs:1029–1034`); all must be supplied: + +| Worker response field | ErasedResultFields field | Notes | +|---|---|---| +| `continue_processing: bool` | `continue_processing` | Direct | +| `violation: {message, ...}` | `violation: Some(PluginViolation)` | Present when plugin denied; `PluginViolation` already derives `Deserialize` | +| `modified_payload: dict \| null` | `modified_payload: Option>` | Deserialized via hook-name router | +| _(not in worker protocol)_ | `modified_extensions: None` | Python plugins cannot modify `Extensions`; always `None` for this adapter | +| `status: "error"` / exception | → `Box` propagated | Triggers `on_error` in executor | + +Note: the raw JSON response from `worker.py` stdout **contains** `request_id` at the top level (`worker.py:196`). The Rust background reader must use it for demultiplexing and must NOT strip it before routing — unlike `VenvProcessCommunicator.send_task` which pops it on the Python side after routing. + +--- + +## Output Structure + +``` +crates/cpex-hosts-python/ +├── Cargo.toml +└── src/ + ├── lib.rs # pub use isolated::*; crate-level docs + ├── isolated/ + │ ├── mod.rs # pub use adapter::*, factory::* + │ ├── adapter.rs # IsolatedPythonPluginAdapter: Plugin + AnyHookHandler + │ ├── factory.rs # IsolatedPythonPluginAdapterFactory + KIND constant + │ ├── subprocess.rs # WorkerProcess: spawn, send_task, shutdown + │ ├── venv.rs # venv creation, requirements-hash cache + │ └── payload.rs # payload_to_json, json_to_erased, hook-name deserializer map + └── tests/ + └── isolated_e2e.rs # integration test against echo_plugin.py fixture +``` + +Python test fixture (not a Rust source file — lives alongside the integration test): + +``` +crates/cpex-hosts-python/tests/ +├── isolated_e2e.rs +└── fixtures/ + ├── echo_plugin.py # minimal @hook plugin that echoes payload + └── requirements.txt # empty (no deps beyond cpex) +``` + +--- + +## Implementation Units + +### U1. Workspace scaffolding + +**Goal:** Add `crates/cpex-hosts-python` to the workspace and establish the Cargo manifest with correct dependency declarations. + +**Requirements:** AC-7 (not in `default-members`) + +**Dependencies:** none + +**Files:** +- `Cargo.toml` — add `crates/cpex-hosts-python` to `[workspace].members`; do not add to `default-members` +- `crates/cpex-hosts-python/Cargo.toml` — new file +- `crates/cpex-hosts-python/src/lib.rs` — crate root, empty re-exports for now + +**Approach:** +`Cargo.toml` for the new crate declares `cpex-core` as a path dependency, adds `tokio = { workspace = true, features = ["process"] }`, `serde_json = { workspace = true }`, `serde = { workspace = true }`, `async-trait = { workspace = true }`, `thiserror = { workspace = true }`, `tracing = { workspace = true }`. No pyo3 anywhere. The crate compiles clean with an empty lib.rs before any implementation begins. + +**Patterns to follow:** `crates/apl-pii-scanner/Cargo.toml` for workspace dep declarations; `apl-cedarling` exclusion from `default-members` for the workspace pattern. + +**Test scenarios:** +- `cargo build -p cpex-hosts-python` succeeds on a machine with no libpython installed +- `cargo build` (default members only) still succeeds and does not include `cpex-hosts-python` +- `cargo build --workspace` includes the crate + +**Verification:** `cargo build -p cpex-hosts-python` exits 0; `cargo build` (no `-p`) does not list `cpex-hosts-python` in its compile units. + +--- + +### U2. Venv lifecycle module + +**Goal:** Implement the venv creation and requirements-hash cache logic that `IsolatedPythonPluginAdapter` calls on `initialize()`. + +**Requirements:** AC-9 + +**Dependencies:** U1 + +**Files:** +- `crates/cpex-hosts-python/src/isolated/venv.rs` — new +- `crates/cpex-hosts-python/src/isolated/mod.rs` — new (pub mod venv) + +**Approach:** +`VenvManager` struct holds `venv_path: PathBuf`, `requirements_file: PathBuf`, `cache_metadata_path: PathBuf`. `ensure_venv()` is async: reads the metadata JSON, computes SHA-256 of the requirements file (or empty bytes if absent), compares with stored hash — if match, returns `VenvState::Reused`; otherwise creates venv via `std::process::Command` (`python3 -m venv `), runs `pip install -r`, writes metadata JSON, returns `VenvState::Created`. Metadata JSON schema mirrors Python: `{venv_path, requirements_file, requirements_hash, python_version}`. `python_executable()` returns the platform-correct path (`/bin/python` on Unix, `/Scripts/python.exe` on Windows). + +**Patterns to follow:** `cpex/framework/isolated/client.py` — `_compute_requirements_hash`, `_is_venv_cache_valid`, `_save_cache_metadata`, `create_venv`. + +**Test scenarios:** +- Fresh venv: no `.venv` dir → `ensure_venv()` creates it, installs requirements, writes metadata, returns `Created` +- Cache hit: `.venv` exists and hash matches → `ensure_venv()` returns `Reused` without running pip +- Cache miss: `.venv` exists but requirements file has changed → removes old venv, recreates, returns `Created` +- Missing requirements file: treated as empty hash (no pip install step), venv still created +- `python_executable()` returns a path that exists after `ensure_venv()` + +**Verification:** Unit tests in `venv.rs` using a temp directory; no network access needed since requirements.txt is empty in fixtures. + +--- + +### U3. Worker subprocess module + +**Goal:** Implement the long-running subprocess lifecycle: spawn, send/receive JSON-lines tasks with request-ID demux, graceful shutdown. + +**Requirements:** AC-10, AC-11 + +**Dependencies:** U2 + +**Files:** +- `crates/cpex-hosts-python/src/isolated/subprocess.rs` — new + +**Approach:** +`WorkerProcess` struct holds a `tokio::process::Child`, a `tokio::sync::mpsc::Sender` where `WorkerTask = (String /* request_id */, serde_json::Value, oneshot::Sender>)`. A background `tokio::task` owns the child's stdin/stdout: it receives tasks from the mpsc channel, writes `{...task_data, "request_id": id}\n` to stdin, reads response lines from stdout, demultiplexes by `request_id`, and sends the response back on the oneshot. + +`WorkerProcess::spawn(python_exe, script_path, cwd)` starts the child with `stdin(Stdio::piped())`, `stdout(Stdio::piped())`, `stderr(Stdio::piped())`, spawns the background reader/writer task. + +`WorkerProcess::send_task(task_data, timeout)` generates a UUID request_id, registers a oneshot, sends on the mpsc channel, awaits the oneshot with timeout. + +`WorkerProcess::shutdown(timeout_secs)` sends `{"task_type":"shutdown","request_id":"shutdown"}` via `send_task`, waits, then calls `child.kill()` if the process hasn't exited within `timeout_secs`. `WorkerProcess` also implements `Drop` to send the shutdown line and call `child.kill()` synchronously — this prevents subprocess orphans if the adapter is dropped without an explicit `shutdown()` call. + +**Patterns to follow:** `cpex/framework/isolated/venv_comm.py` — `start_worker`, `send_task`, `stop_worker`, `_read_responses`; adapt the threading model to tokio tasks. + +**Test scenarios:** +- Spawn + send `{"task_type":"info"}` → worker returns environment info JSON +- Request-ID routing: two concurrent tasks return responses to the correct callers +- Timeout: a task that never responds returns `WorkerError::Timeout` after the configured duration +- Graceful shutdown: send shutdown task → worker process exits within 5 s, child handle cleaned up +- Kill fallback: if shutdown task times out, `child.kill()` is called and the process is gone +- Stderr from worker is captured and logged at `tracing::debug` level (not silently dropped) +- Worker process crash mid-invocation: outstanding oneshots receive `WorkerError::ProcessDied` +- Drop without `shutdown()`: `WorkerProcess::drop` kills the child process; no zombie in process table after drop + +**Verification:** Integration test spawning a real `worker.py` subprocess (requires Python in PATH); use the `echo_plugin.py` fixture from `tests/fixtures/`. + +--- + +### U4. Payload serialization module + +**Goal:** Serialize `&dyn PluginPayload` to JSON for the task dict, and deserialize the worker's response payload JSON back to `Box`. + +**Requirements:** AC-3, AC-4, AC-11 + +**Dependencies:** U1 + +**Files:** +- `crates/cpex-hosts-python/src/isolated/payload.rs` — new + +**Approach:** +`payload.rs` defines two function type aliases: `SerializeFn = fn(&dyn PluginPayload) -> serde_json::Value` and `DeserializeFn = fn(serde_json::Value) -> Box`. It also defines `HookPayloadRegistry { serialize: HashMap<&'static str, SerializeFn>, deserialize: HashMap<&'static str, DeserializeFn> }`. + +Each concrete payload type contributes one pair of shims: `serialize_shim` downcasts via `as_any().downcast_ref::().expect(...)` and calls `serde_json::to_value`; `deserialize_shim` calls `serde_json::from_value::()` and boxes the result. Unknown hook names fall back to a `GenericPayload(serde_json::Value)` passthrough wrapper. + +`json_to_erased` takes `registry: &HookPayloadRegistry, hook_name: &str, response: serde_json::Value` and produces `ErasedResultFields` directly (no `erase_result` call). `PluginViolation` already derives `Serialize, Deserialize` (confirmed in `cpex-core/src/error.rs:214`). + +**Patterns to follow:** `crates/cpex-core/src/executor.rs:1029–1065` (`ErasedResultFields`, `erase_result`, `extract_erased`); `crates/cpex-core/src/hooks/trait_def.rs` (`PluginResult` constructors). + +**Test scenarios:** +- `payload_to_json` on a `MessagePayload` produces a JSON object with expected top-level keys +- `payload_to_json` on a payload with `#[serde(skip)]` private fields does not include those fields +- `json_to_erased` with `continue_processing: true`, no violation, no modified payload → allow result +- `json_to_erased` with `continue_processing: false` and a violation dict → deny result with `PluginViolation` +- `json_to_erased` with `modified_payload` for a known hook → returns `Some(Box)` with correct concrete type +- `json_to_erased` with `modified_payload` for an unknown hook name → returns `GenericPayload` passthrough wrapper, does not panic +- Round-trip: serialize a `MessagePayload` to JSON via `serialize_shim`, deserialize back via `deserialize_shim`, key fields match +- `PluginViolation` round-trips through JSON without adding `#[derive(Deserialize)]` (it already exists) + +**Verification:** Unit tests in `payload.rs`; no subprocess needed. + +--- + +### U5. `IsolatedPythonPluginAdapter` and factory + +**Goal:** Implement the `Plugin` lifecycle and `AnyHookHandler` dispatch for the subprocess-isolated adapter, and the `PluginFactory` that constructs it from YAML config. + +**Requirements:** AC-5, AC-6, AC-8, AC-9, AC-10 + +**Dependencies:** U2, U3, U4 + +**Files:** +- `crates/cpex-hosts-python/src/isolated/adapter.rs` — new +- `crates/cpex-hosts-python/src/isolated/factory.rs` — new +- `crates/cpex-hosts-python/src/isolated/mod.rs` — updated (pub mod adapter, factory) +- `crates/cpex-hosts-python/src/lib.rs` — updated (pub use isolated::{KIND, IsolatedPythonPluginAdapterFactory}) + +**Approach:** + +`IsolatedPythonPluginAdapter` holds: `config: PluginConfig`, `venv_manager: VenvManager`, `worker: tokio::sync::Mutex>`, `hook_payload_registry: Arc`, `class_name: String`, `plugin_dirs: Vec`. + +`Plugin::initialize()`: calls `venv_manager.ensure_venv().await`, then `WorkerProcess::spawn(...)` and stores in `worker`. Also sends a `load_and_run_hook`-style init call if the Python class defines `initialize()` — or more simply, the first hook invocation triggers lazy initialization via the worker's existing caching logic (since `worker.py` caches the plugin after first load). This matches existing Python behavior. + +`Plugin::shutdown()`: locks `worker`, calls `WorkerProcess::shutdown(5)`, sets to `None`. + +`AnyHookHandler::invoke(payload, extensions, ctx)`: +1. Serialize payload via `payload_to_json` +2. Serialize `PluginContext` state fields to JSON (only `state` and `global_context` — same as Python `_build_hook_task`) +3. Build task dict: `{task_type, plugin_dirs, class_name, config: safe_config_json, hook_type, plugin_name, payload, context, request_id}` +4. Call `worker.send_task(task, timeout)` — await response +5. On worker error: construct `Box` and return `Err(...)` — executor's `on_error` handling takes over +6. On success: call `json_to_erased(hook_name, response)` and return `Ok(Box::new(erased))` + +`hook_type_name()`: returns the hook name this handler instance was pre-bound to (stored as `&'static str` via `Box::leak` at factory construction time). + +`IsolatedPythonPluginAdapterFactory`: +- `pub const KIND: &str = "python-isolated"` +- Initialized with a `HookPayloadRegistry` (or constructs a built-in default registry covering all known cpex-core payload types). The registry is `Arc`-wrapped. +- `create(config)` parses the URI path from `config.kind` (strip `python-isolated://` prefix → `class_name`), reads `config.config["requirements_file"]`, optionally `config.config["venv_path"]`, builds `VenvManager`, constructs one `IsolatedPythonPluginAdapter` (with `Arc::clone(&self.registry)`) wrapped in `Arc`, then for each hook name in `config.hooks` leaks the string (`Box::leak(h.clone().into_boxed_str())` — same pattern as `apl-pii-scanner`) and registers the same adapter as handler. Returns `PluginInstance { plugin: adapter, handlers }`. +- Validates: `config.hooks` must be non-empty; `requirements_file` treated as empty if absent (no pip install step). + +**Patterns to follow:** `crates/apl-pii-scanner/src/factory.rs` (canonical factory with `Box::leak`); `crates/cpex-core/src/plugin.rs` (`Plugin` trait via `#[async_trait]`); `cpex/framework/isolated/client.py` (`invoke_hook`, `_build_hook_task`). + +**Test scenarios:** +- `create()` with a valid config → `PluginInstance` with one `Arc` and N handler entries matching `config.hooks` +- `create()` with empty `hooks` list → returns `Err(PluginError::Config)` +- `create()` with kind `"python-isolated://my_pkg.MyPlugin"` → `class_name == "my_pkg.MyPlugin"` +- `initialize()` creates the `.venv` directory and starts the worker subprocess +- `invoke()` sends a task and receives a result with `continue_processing: true` (happy path) +- Python exception in worker → `invoke()` returns `Err(PluginError)`, executor applies `on_error` +- `on_error: disable` → second invocation skips the plugin (circuit-breaker via `PluginRef::disable`) +- `shutdown()` terminates the worker process; subsequent `invoke()` after shutdown fails gracefully +- `hook_type_name()` returns the hook name the handler was registered for, not the plugin name + +**Verification:** Unit tests for the factory; integration test in `tests/isolated_e2e.rs` (U6). + +--- + +### U6. Integration test + +**Goal:** End-to-end test: load a real Python plugin class through the Rust `PluginManager` using `kind: "python-isolated://"` config. + +**Requirements:** AC-3, AC-4, AC-5, AC-6, AC-8, AC-9, AC-10, AC-11 + +**Dependencies:** U5 + +**Files:** +- `crates/cpex-hosts-python/tests/isolated_e2e.rs` — new +- `crates/cpex-hosts-python/tests/fixtures/echo_plugin.py` — new Python fixture +- `crates/cpex-hosts-python/tests/fixtures/requirements.txt` — new (empty) + +**Approach:** +`echo_plugin.py` is a minimal CPEX plugin class with `@hook("tool_pre_invoke")` on a method that returns `PluginResult.allow()` unchanged, and a second variant that returns `PluginResult.modify(payload)` with a field mutated. Also a variant that raises `RuntimeError` to test `on_error`. `initialize()` and `shutdown()` methods are present but no-op. + +The test builds a `PluginFactoryRegistry`, registers `IsolatedPythonPluginAdapterFactory` under `KIND`, constructs a `PluginConfig` inline (pointing `plugin_dirs` at `tests/fixtures/`, `class_name` at `echo_plugin.EchoPlugin`, empty `requirements.txt`), calls `manager.initialize().await`, invokes the hook with a `ToolPreInvokePayload` (or generic payload), and asserts the result. + +Tests gated behind `#[cfg(target_os = "linux")]` or behind a `with-python` Cargo feature to avoid failing on CI machines without Python — add a note in `README.md` (or test module docstring) about the requirement. Alternatively, use `std::process::Command::new("python3").arg("--version")` to skip gracefully. + +**Patterns to follow:** `crates/cpex-core/tests/identity_e2e.rs` (manager setup, `#[tokio::test]`, inline `PluginConfig`). + +**Test scenarios:** +- Covers AC-8: plugin loaded via `kind: "python-isolated://echo_plugin.EchoPlugin"` and invoked successfully +- Covers AC-9: second invocation reuses venv (no pip re-run); assert metadata file present and unchanged +- Covers AC-3/AC-4: `allow()` result → `continue_processing: true`, no modified payload +- Covers AC-3/AC-4: `modify(payload)` result → `modified_payload` present in `PipelineResult` +- Covers AC-5: Python `RuntimeError` → `on_error: fail` propagates error to caller +- Covers AC-6: `initialize()` / `shutdown()` called on Python class; missing methods are no-op (use a plugin class without those methods) +- Covers AC-10: `manager.shutdown()` terminates the worker process within 5 s + +**Verification:** `cargo test -p cpex-hosts-python` passes when Python 3.11+ is available. Test skips gracefully when Python is absent. + +--- + +## System-Wide Impact + +- **Unregistered kind → hard error.** `PluginManager::load_config` calls `factories.get(&kind)` and returns `PluginError::Config` if the kind is absent — it does not silently skip. Any host using `python-isolated` in YAML must call `manager.register_factory("python-isolated", Box::new(IsolatedPythonPluginAdapterFactory::new(...)))` before `load_config`. Misconfigured hosts fail fast at startup, not at first invocation. +- **Drop without `shutdown` orphans subprocesses.** `WorkerProcess` must implement `Drop` to send `{"task_type":"shutdown"}` and then call `child.kill()` — without it, the OS-level worker process outlives the Rust `PluginManager` drop. Pure-Rust adapters have no subprocess to orphan; this adapter imposes a stronger resource-cleanup obligation than existing plugin kinds. +- **Subprocess count scales T×N under multi-tenancy.** Each `PluginManager::load_config` call creates a new `IsolatedPythonPluginAdapter` (and one subprocess) per plugin config entry. If a tenant-scoped Rust `PluginManager` is added in the future (analogous to Python's `TenantPluginManager`), T tenants × N `python-isolated` plugins = T×N subprocesses. No subprocess sharing across `PluginManager` instances exists; operators must size `RLIMIT_NPROC` and file-descriptor limits accordingly. +- **`cpex-orchestration` unaffected.** `cpex-orchestration` is a domain-agnostic `run_branches` primitive with no knowledge of plugin kinds, factory registries, or subprocess management. No changes needed. +- **Workspace CI.** Adding `crates/cpex-hosts-python` to `[workspace] members` enrolls it in `cargo test --workspace`. The `tokio "process"` feature is absent from the workspace-level tokio declaration; the new crate must activate it in its own `Cargo.toml` (`tokio = { workspace = true, features = ["process"] }`). Feature unification is additive — no impact on existing crates. + +--- + +## Open Questions + +| Question | Status | Notes | +|---|---|---| +| Does `PluginViolation` derive `Deserialize`? | **Resolved** — yes | `#[derive(Serialize, Deserialize)]` confirmed at `crates/cpex-core/src/error.rs:214`. No changes to cpex-core needed. | +| Should `PluginContext::state` / `global_context` be sent to worker? | Resolve at U5 | Python `_build_hook_task` sends both; verify `PluginContext` fields are `serde_json`-serializable | +| CMF `MessagePayload` special-case serialization? | Deferred — all built-in payloads are serde-compatible; no special treatment anticipated | If a non-serializable payload type is added later, the `serialize_fn` map in U4 handles it without adapter changes | +| `venv_path` YAML override — required or optional? | Optional, defaults to `//.venv` | Match `IsolatedVenvPlugin` behaviour; document in factory `create()` | + +--- + +## Risks & Dependencies + +| Risk | Likelihood | Impact | Mitigation | +|---|---|---|---| +| Python 3.11+ not available in CI | Medium | Integration tests silently skip or fail | Gate test behind Python version check; document requirement | +| `worker.py` imports cpex Python package — not installed in test venv | High | Integration test fails at worker startup | Test fixture `requirements.txt` must install cpex from the local path; handle in venv setup | +| `tokio::process` feature unification breaks existing crates | Low | Cargo build error | Feature is additive; only adds process-related tokio internals, no API changes for existing crates | +| `WorkerProcess` drop without `shutdown` — subprocess leak | Medium | OS-level orphan process | `WorkerProcess` must implement `Drop` with `kill()` call (see System-Wide Impact); document in U3 | +| `PluginContext` not fully JSON-serializable | Low | Runtime serde error | Check `PluginContext` fields before U5; add `#[serde(skip)]` or `Default` impls if needed | + +--- + +## Sources & Research + +- `crates/cpex-core/src/registry.rs:182–197` — `AnyHookHandler` trait signature +- `crates/cpex-core/src/factory.rs` — `PluginFactory`, `PluginInstance`, `PluginFactoryRegistry` +- `crates/cpex-core/src/executor.rs:1029–1065` — `ErasedResultFields`, `erase_result`, `extract_erased` +- `crates/cpex-core/src/executor.rs:400–520` — `run_serial_phase` dispatch path, extensions filtering +- `crates/apl-pii-scanner/src/factory.rs:47–63` — canonical `Box::leak` factory pattern; `apl-audit-logger/src/factory.rs:38–48` identical shape +- `crates/cpex-core/src/error.rs:214` — `PluginViolation` derives `Serialize, Deserialize` (confirmed; no cpex-core changes needed) +- `crates/cpex-core/src/manager.rs:322` — `factories.get(&kind).ok_or_else(|| PluginError::Config)` — missing kind is a hard error +- `crates/cpex-core/tests/identity_e2e.rs` — integration test structure +- `cpex/framework/isolated/client.py` — `IsolatedVenvPlugin`, `_build_hook_task`, `invoke_hook` +- `cpex/framework/isolated/venv_comm.py` — `VenvProcessCommunicator` subprocess lifecycle +- `cpex/framework/isolated/worker.py` — `load_and_run_hook` task protocol +- `cpex/framework/decorator.py` — `_HOOK_METADATA_ATTR`, `get_hook_metadata`, `HookMetadata` From c68571d1e776f9edabd48ca5e5508deee531bf1c Mon Sep 17 00:00:00 2001 From: habeck Date: Tue, 23 Jun 2026 12:15:17 -0400 Subject: [PATCH 2/3] feature: python compatibility layer Signed-off-by: habeck --- Cargo.lock | 75 ++++ Cargo.toml | 2 + cpex/framework/base.py | 9 +- cpex/framework/isolated/worker.py | 12 +- crates/cpex-hosts-python/.gitignore | 10 + crates/cpex-hosts-python/Cargo.toml | 32 ++ .../cpex-hosts-python/src/isolated/adapter.rs | 220 +++++++++++ .../cpex-hosts-python/src/isolated/factory.rs | 266 ++++++++++++++ crates/cpex-hosts-python/src/isolated/mod.rs | 14 + .../cpex-hosts-python/src/isolated/payload.rs | 341 ++++++++++++++++++ .../src/isolated/subprocess.rs | 300 +++++++++++++++ crates/cpex-hosts-python/src/isolated/venv.rs | 312 ++++++++++++++++ crates/cpex-hosts-python/src/lib.rs | 16 + .../tests/fixtures/echo_plugin.py | 70 ++++ .../cpex-hosts-python/tests/isolated_e2e.rs | 288 +++++++++++++++ .../cpex/framework/isolated/test_worker.py | 2 +- 16 files changed, 1964 insertions(+), 5 deletions(-) create mode 100644 crates/cpex-hosts-python/.gitignore create mode 100644 crates/cpex-hosts-python/Cargo.toml create mode 100644 crates/cpex-hosts-python/src/isolated/adapter.rs create mode 100644 crates/cpex-hosts-python/src/isolated/factory.rs create mode 100644 crates/cpex-hosts-python/src/isolated/mod.rs create mode 100644 crates/cpex-hosts-python/src/isolated/payload.rs create mode 100644 crates/cpex-hosts-python/src/isolated/subprocess.rs create mode 100644 crates/cpex-hosts-python/src/isolated/venv.rs create mode 100644 crates/cpex-hosts-python/src/lib.rs create mode 100644 crates/cpex-hosts-python/tests/fixtures/echo_plugin.py create mode 100644 crates/cpex-hosts-python/tests/isolated_e2e.rs diff --git a/Cargo.lock b/Cargo.lock index b8c0a56b..b520908e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -830,6 +830,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "cpex-hosts-python" +version = "0.2.0" +dependencies = [ + "async-trait", + "cpex-core", + "serde", + "serde_json", + "sha2 0.10.9", + "tempfile", + "thiserror 2.0.18", + "tokio", + "tracing", + "uuid", +] + [[package]] name = "cpex-orchestration" version = "0.2.0" @@ -1246,6 +1262,22 @@ dependencies = [ "typeid", ] +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + [[package]] name = "ff" version = "0.13.1" @@ -2142,6 +2174,12 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.2" @@ -3148,6 +3186,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.40" @@ -3538,6 +3589,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -3787,6 +3848,19 @@ dependencies = [ "libc", ] +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.4.2", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "term" version = "1.2.1" @@ -3913,6 +3987,7 @@ dependencies = [ "mio", "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", diff --git a/Cargo.toml b/Cargo.toml index 6f2b8669..8d10edab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "crates/apl-delegator-biscuit", "crates/apl-pii-scanner", "crates/apl-audit-logger", + "crates/cpex-hosts-python", "examples/go-demo/ffi", ] @@ -83,6 +84,7 @@ rmp-serde = "1" serde_bytes = "0.11" chrono = { version = "0.4", features = ["serde"] } regex = "1" +sha2 = "0.10" # Size-first release profile. The FFI artifact (libcpex_ffi.a) is linked # statically into host binaries, so its compiled size flows straight into diff --git a/cpex/framework/base.py b/cpex/framework/base.py index 6f120141..9abe0326 100644 --- a/cpex/framework/base.py +++ b/cpex/framework/base.py @@ -441,10 +441,13 @@ def __init__(self, hook: str, plugin_ref: PluginRef): self._plugin_ref = plugin_ref self._hook = hook - # Try convention-based lookup first (method name matches hook type) + # Try convention-based lookup first (method name matches hook type). + # For namespaced hooks like "cmf.tool_pre_invoke", also try the bare + # name "tool_pre_invoke" so convention-named methods are found. + bare_hook = hook.rsplit(".", 1)[-1] if "." in hook else hook self._func: Callable[[PluginPayload, PluginContext], Awaitable[PluginResult]] | None = getattr( plugin_ref.plugin, hook, None - ) + ) or getattr(plugin_ref.plugin, bare_hook, None) # If not found by convention, scan for @hook decorated methods if self._func is None: @@ -455,7 +458,7 @@ def __init__(self, hook: str, plugin_ref: PluginRef): # Check for @hook decorator metadata metadata = get_hook_metadata(method) - if metadata and metadata.matches(hook): + if metadata and (metadata.matches(hook) or metadata.matches(bare_hook)): self._func = method break diff --git a/cpex/framework/isolated/worker.py b/cpex/framework/isolated/worker.py index 894168ed..3bf4bca3 100644 --- a/cpex/framework/isolated/worker.py +++ b/cpex/framework/isolated/worker.py @@ -140,6 +140,15 @@ async def process_task(task_data, tp: TaskProcessor): # retrieve the context context = task_data.get("context") hook_type = task_data.get(HOOK_TYPE) + # Normalize namespaced hook names (e.g. "cmf.tool_pre_invoke" → + # "tool_pre_invoke") so that convention-named plugin methods are found. + # Plugins that use @hook("cmf.tool_pre_invoke") decorators are handled + # by the bare-name fallback in HookRef; plugins that rely solely on + # method-name convention need the bare name here. + if hook_type and "." in hook_type: + bare = hook_type.rsplit(".", 1)[-1] + if not hasattr(tp.plugin_ref.plugin, hook_type) and hasattr(tp.plugin_ref.plugin, bare): + hook_type = bare plugin_context = PluginContext( state=context.get("state"), global_context=context.get("global_context"), metadata=context.get("metadata") ) @@ -230,7 +239,8 @@ async def main(): error_response = { "status": "error", "message": f"Unexpected error: {str(e)}", - "request_id": "unknown", + # Use the request_id captured above when available so callers can demux. + "request_id": request_id if "request_id" in locals() else "unknown", } print(json.dumps(error_response), flush=True) diff --git a/crates/cpex-hosts-python/.gitignore b/crates/cpex-hosts-python/.gitignore new file mode 100644 index 00000000..05f7305e --- /dev/null +++ b/crates/cpex-hosts-python/.gitignore @@ -0,0 +1,10 @@ +# Generated by isolated_e2e test setup (write_requirements_txt). +# Contains an absolute path to the monorepo root so pip resolves it +# correctly regardless of clone location. +tests/fixtures/requirements.txt + +# Test venv created by integration tests +tests/fixtures/.venv/ + +# Venv metadata cache written by VenvManager +tests/fixtures/.cpex/ diff --git a/crates/cpex-hosts-python/Cargo.toml b/crates/cpex-hosts-python/Cargo.toml new file mode 100644 index 00000000..a048e62b --- /dev/null +++ b/crates/cpex-hosts-python/Cargo.toml @@ -0,0 +1,32 @@ +# Location: ./crates/cpex-hosts-python/Cargo.toml +# Copyright 2026 +# SPDX-License-Identifier: Apache-2.0 +# Authors: Ted Habeck +# +# cpex-hosts-python — subprocess-isolated Python plugin adapter for the +# CPEX Rust PluginManager. Loads Python plugin classes via +# `kind: "isolated_venv"` and `config: class_name: module.ClassName` YAML config and drives +# them over the JSON-lines stdin/stdout protocol used by worker.py. + +[package] +name = "cpex-hosts-python" +version.workspace = true +edition.workspace = true +license.workspace = true +authors.workspace = true + +[dependencies] +cpex-core = { path = "../cpex-core" } + +async-trait = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tokio = { workspace = true, features = ["process", "io-util"] } +uuid = { workspace = true } +sha2 = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread"] } +tempfile = "3" diff --git a/crates/cpex-hosts-python/src/isolated/adapter.rs b/crates/cpex-hosts-python/src/isolated/adapter.rs new file mode 100644 index 00000000..690f0471 --- /dev/null +++ b/crates/cpex-hosts-python/src/isolated/adapter.rs @@ -0,0 +1,220 @@ +// Location: ./crates/cpex-hosts-python/src/isolated/adapter.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck +// +// IsolatedPythonPluginAdapter — Plugin lifecycle + AnyHookHandler dispatch. +// +// Wraps a WorkerProcess and drives it with JSON-lines tasks over the +// same protocol as Python's VenvProcessCommunicator / worker.py. + +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use cpex_core::{ + context::PluginContext, + error::PluginError, + hooks::payload::{Extensions, PluginPayload}, + plugin::{Plugin, PluginConfig}, + registry::AnyHookHandler, +}; +use tokio::sync::Mutex; +use tracing::{debug, info}; +use uuid::Uuid; + +use super::payload::HookPayloadRegistry; +use super::subprocess::WorkerProcess; +use super::venv::VenvManager; + +const INVOKE_TIMEOUT_SECS: u64 = 30; + +/// Subprocess-isolated Python plugin adapter. +/// +/// One instance wraps one Python plugin class running in a dedicated +/// worker.py subprocess. The same `Arc` is +/// registered as the handler for every hook name in `config.hooks`. +pub struct IsolatedPythonPluginAdapter { + pub config: PluginConfig, + venv_manager: VenvManager, + worker: Mutex>, + registry: Arc, + class_name: String, + plugin_dirs: Vec, + /// Worker script path (cpex/framework/isolated/worker.py relative to cwd). + worker_script: std::path::PathBuf, +} + +impl IsolatedPythonPluginAdapter { + pub fn new( + config: PluginConfig, + venv_manager: VenvManager, + registry: Arc, + class_name: String, + plugin_dirs: Vec, + worker_script: std::path::PathBuf, + ) -> Self { + Self { + config, + venv_manager, + worker: Mutex::new(None), + registry, + class_name, + plugin_dirs, + worker_script, + } + } + + fn safe_config(&self) -> serde_json::Value { + // Serialize PluginConfig to JSON, omitting private/secret fields. + // We use serde_json::to_value on the whole config; the Python worker + // receives this as the `config` string (JSON-encoded). + serde_json::to_value(&self.config).unwrap_or(serde_json::Value::Null) + } + + /// Invoke a named hook on the worker subprocess. + async fn invoke_hook( + &self, + hook_name: &str, + payload: &dyn PluginPayload, + _extensions: &Extensions, + ctx: &mut PluginContext, + ) -> Result, Box> { + let payload_json = self.registry + .payload_to_json(hook_name, payload) + .map_err(|e| Box::new(PluginError::Config { message: e.to_string() }))?; + + // Build context in Python's PluginContext schema: + // { state: {}, global_context: { request_id: "..." }, metadata: {} } + // The Rust PluginContext uses local_state/global_state which is a + // different schema — map it explicitly so worker.py can validate it. + let context_json = serde_json::json!({ + "state": ctx.local_state, + "global_context": { + "request_id": Uuid::new_v4().to_string(), + "state": ctx.global_state, + }, + "metadata": {}, + }); + + let task = serde_json::json!({ + "task_type": "load_and_run_hook", + "plugin_dirs": self.plugin_dirs, + "class_name": self.class_name, + "config": serde_json::to_string(&self.safe_config()).unwrap_or_default(), + "hook_type": hook_name, + "plugin_name": self.config.name, + "payload": payload_json, + "context": context_json, + }); + + let guard = self.worker.lock().await; + let worker = guard.as_ref().ok_or_else(|| { + Box::new(PluginError::Config { + message: format!( + "plugin '{}': worker not started — call initialize() first", + self.config.name + ), + }) + })?; + + let timeout = Duration::from_secs(INVOKE_TIMEOUT_SECS); + let response = worker.send_task(task, timeout).await.map_err(|e| { + Box::new(PluginError::Config { + message: format!("plugin '{}': worker error: {}", self.config.name, e), + }) + })?; + + debug!( + plugin = %self.config.name, + hook = hook_name, + "received worker response" + ); + + let erased = self.registry + .json_to_erased(hook_name, response) + .map_err(|e| { + Box::new(PluginError::Config { + message: format!( + "plugin '{}': failed to decode worker response: {:?}", + self.config.name, e + ), + }) + })?; + + Ok(Box::new(erased)) + } +} + +#[async_trait] +impl Plugin for IsolatedPythonPluginAdapter { + fn config(&self) -> &PluginConfig { + &self.config + } + + async fn initialize(&self) -> Result<(), Box> { + info!(plugin = %self.config.name, "initializing isolated Python plugin"); + self.venv_manager.ensure_venv().await.map_err(|e| { + Box::new(PluginError::Config { + message: format!("plugin '{}': venv error: {}", self.config.name, e), + }) + })?; + + let python_exe = self.venv_manager.python_executable(); + let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from(".")); + let worker = WorkerProcess::spawn(&python_exe, &self.worker_script, &cwd) + .await + .map_err(|e| { + Box::new(PluginError::Config { + message: format!("plugin '{}': failed to spawn worker: {}", self.config.name, e), + }) + })?; + + *self.worker.lock().await = Some(worker); + info!(plugin = %self.config.name, "worker subprocess started"); + Ok(()) + } + + async fn shutdown(&self) -> Result<(), Box> { + info!(plugin = %self.config.name, "shutting down isolated Python plugin"); + let mut guard = self.worker.lock().await; + if let Some(worker) = guard.take() { + worker.shutdown(5).await; + } + Ok(()) + } +} + +/// An `AnyHookHandler` bound to a specific hook name. +/// +/// Each hook name in `config.hooks` gets its own `BoundHookHandler` +/// wrapping the same `Arc`. The handler's +/// `hook_type_name()` returns the pre-bound name. +pub struct BoundHookHandler { + adapter: Arc, + hook_name: &'static str, +} + +impl BoundHookHandler { + pub fn new(adapter: Arc, hook_name: &'static str) -> Self { + Self { adapter, hook_name } + } +} + +#[async_trait] +impl AnyHookHandler for BoundHookHandler { + async fn invoke( + &self, + payload: &dyn PluginPayload, + extensions: &Extensions, + ctx: &mut PluginContext, + ) -> Result, Box> { + self.adapter + .invoke_hook(self.hook_name, payload, extensions, ctx) + .await + } + + fn hook_type_name(&self) -> &'static str { + self.hook_name + } +} diff --git a/crates/cpex-hosts-python/src/isolated/factory.rs b/crates/cpex-hosts-python/src/isolated/factory.rs new file mode 100644 index 00000000..98a5f59b --- /dev/null +++ b/crates/cpex-hosts-python/src/isolated/factory.rs @@ -0,0 +1,266 @@ +// Location: ./crates/cpex-hosts-python/src/isolated/factory.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck +// +// IsolatedPythonPluginAdapterFactory — PluginFactory implementation. +// +// Parses `kind: "isolated_venv://module.ClassName"` YAML config, +// constructs a VenvManager, builds the adapter, and registers one +// BoundHookHandler per declared hook name. + +use std::path::PathBuf; +use std::sync::Arc; + +use cpex_core::{ + error::PluginError, + factory::{PluginFactory, PluginInstance}, + plugin::PluginConfig, +}; + +use super::adapter::{BoundHookHandler, IsolatedPythonPluginAdapter}; +use super::payload::HookPayloadRegistry; +use super::venv::VenvManager; + +/// `kind:` prefix operators write in CPEX YAML to declare a +/// subprocess-isolated Python plugin. +/// +/// Full URI form: `isolated_venv://module.path.ClassName` +pub const KIND: &str = "isolated_venv"; + +/// Default path to the worker script relative to the working directory. +/// Hosts that install cpex at a non-standard location should override via +/// `IsolatedPythonPluginAdapterFactory::with_worker_script`. +pub const DEFAULT_WORKER_SCRIPT: &str = "cpex/framework/isolated/worker.py"; + +/// Factory for `kind: "isolated_venv"` and `config: class_name: module.ClassName` +/// +/// # Registration +/// +/// ```rust,ignore +/// let mut factories = PluginFactoryRegistry::new(); +/// factories.register( +/// cpex_hosts_python::KIND, +/// Box::new(IsolatedPythonPluginAdapterFactory::new( +/// HookPayloadRegistry::default(), +/// )), +/// ); +/// ``` +pub struct IsolatedPythonPluginAdapterFactory { + registry: Arc, + worker_script: PathBuf, +} + +impl IsolatedPythonPluginAdapterFactory { + /// Create with a pre-populated payload registry and the default worker script path. + pub fn new(registry: HookPayloadRegistry) -> Self { + Self { + registry: Arc::new(registry), + worker_script: PathBuf::from(DEFAULT_WORKER_SCRIPT), + } + } + + /// Override the path to `worker.py` (for non-standard cpex installs or tests). + pub fn with_worker_script(mut self, path: impl Into) -> Self { + self.worker_script = path.into(); + self + } +} + +impl PluginFactory for IsolatedPythonPluginAdapterFactory { + fn create(&self, config: &PluginConfig) -> Result> { + // Validate hooks list. + if config.hooks.is_empty() { + return Err(Box::new(PluginError::Config { + message: format!( + "plugin '{}' (isolated_venv): `hooks:` must list at least one hook name", + config.name + ), + })); + } + + // class_name comes from config.config["class_name"] — consistent with + // IsolatedVenvPlugin and the cpex/templates/isolated cookiecutter. + let plugin_config_obj = config.config.as_ref().and_then(|v| v.as_object()); + + let class_name = plugin_config_obj + .and_then(|m| m.get("class_name")) + .and_then(|v| v.as_str()) + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .ok_or_else(|| { + Box::new(PluginError::Config { + message: format!( + "plugin '{}': `config.class_name` is required for isolated_venv plugins", + config.name + ), + }) + })?; + + // Read remaining optional config fields (plugin_config_obj already bound above). + + let requirements_file: Option = plugin_config_obj + .and_then(|m| m.get("requirements_file")) + .and_then(|v| v.as_str()) + .map(PathBuf::from); + + // plugin_dirs: use `config.config.plugin_dirs` or derive from class root. + let plugin_dirs: Vec = plugin_config_obj + .and_then(|m| m.get("plugin_dirs")) + .and_then(|v| v.as_array()) + .map(|arr| { + arr.iter() + .filter_map(|v| v.as_str().map(String::from)) + .collect() + }) + .unwrap_or_else(|| vec!["plugins".to_string()]); + + // venv_path: optional override; default = //.venv + let venv_path: PathBuf = plugin_config_obj + .and_then(|m| m.get("venv_path")) + .and_then(|v| v.as_str()) + .map(PathBuf::from) + .unwrap_or_else(|| { + let class_root = class_name + .split('.') + .next() + .unwrap_or("plugin") + .to_string(); + let base = plugin_dirs + .first() + .map(PathBuf::from) + .unwrap_or_else(|| PathBuf::from("plugins")); + base.join(class_root).join(".venv") + }); + + let venv_manager = VenvManager::new(venv_path, requirements_file); + + let adapter = Arc::new(IsolatedPythonPluginAdapter::new( + config.clone(), + venv_manager, + Arc::clone(&self.registry), + class_name, + plugin_dirs, + self.worker_script.clone(), + )); + + // Register a BoundHookHandler for each declared hook name. + // Leak the string to satisfy the 'static lifetime requirement of + // AnyHookHandler::hook_type_name() — same pattern as apl-pii-scanner. + // PluginConfigs are created once at startup; the leak count is + // bounded by (plugins × hooks per config). + let handlers: Vec<_> = config + .hooks + .iter() + .map(|h| -> (&'static str, Arc) { + let leaked: &'static str = Box::leak(h.clone().into_boxed_str()); + let handler: Arc = + Arc::new(BoundHookHandler::new(Arc::clone(&adapter), leaked)); + (leaked, handler) + }) + .collect(); + + Ok(PluginInstance { + plugin: adapter, + handlers, + }) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + fn make_config(class_name: &str, hooks: Vec<&str>) -> PluginConfig { + PluginConfig { + name: "test-plugin".to_string(), + kind: KIND.to_string(), + hooks: hooks.iter().map(|s| s.to_string()).collect(), + config: Some(serde_json::json!({ + "class_name": class_name, + "requirements_file": "tests/fixtures/requirements.txt", + "plugin_dirs": ["tests/fixtures"] + })), + ..Default::default() + } + } + + #[test] + fn create_valid_config_returns_instance() { + let factory = IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default()); + let config = make_config("echo_plugin.EchoPlugin", vec!["cmf.tool_pre_invoke"]); + let instance = factory.create(&config).unwrap(); + assert_eq!(instance.handlers.len(), 1); + assert_eq!(instance.handlers[0].0, "cmf.tool_pre_invoke"); + } + + #[test] + fn create_multi_hook_produces_multiple_handlers() { + let factory = IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default()); + let config = make_config( + "echo_plugin.EchoPlugin", + vec!["cmf.tool_pre_invoke", "cmf.tool_post_invoke"], + ); + let instance = factory.create(&config).unwrap(); + assert_eq!(instance.handlers.len(), 2); + } + + #[test] + fn create_empty_hooks_returns_error() { + let factory = IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default()); + let config = make_config("echo_plugin.EchoPlugin", vec![]); + let result = factory.create(&config); + assert!(result.is_err(), "expected error for empty hooks"); + let err = match result { + Err(e) => e, + Ok(_) => panic!("expected error"), + }; + assert!( + format!("{:?}", err).contains("hooks"), + "error should mention hooks, got: {:?}", + err + ); + } + + #[test] + fn create_missing_class_name_returns_error() { + let factory = IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default()); + let mut config = make_config("echo_plugin.EchoPlugin", vec!["cmf.tool_pre_invoke"]); + // Remove class_name from config. + if let Some(obj) = config.config.as_mut().and_then(|v| v.as_object_mut()) { + obj.remove("class_name"); + } + let result = factory.create(&config); + assert!(result.is_err()); + let err = match result { + Err(e) => e, + Ok(_) => panic!("expected error"), + }; + assert!( + format!("{:?}", err).contains("class_name"), + "error should mention class_name, got: {:?}", + err + ); + } + + #[test] + fn class_name_read_from_config() { + let factory = IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default()); + let config = make_config("my_pkg.MyPlugin", vec!["cmf.tool_pre_invoke"]); + let instance = factory.create(&config).unwrap(); + assert_eq!(instance.handlers[0].0, "cmf.tool_pre_invoke"); + } + + #[test] + fn hook_type_name_matches_declared_hook() { + let factory = IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default()); + let config = make_config("echo_plugin.EchoPlugin", vec!["cmf.tool_pre_invoke"]); + let instance = factory.create(&config).unwrap(); + let hook_name = instance.handlers[0].1.hook_type_name(); + assert_eq!(hook_name, "cmf.tool_pre_invoke"); + } +} diff --git a/crates/cpex-hosts-python/src/isolated/mod.rs b/crates/cpex-hosts-python/src/isolated/mod.rs new file mode 100644 index 00000000..d38691cc --- /dev/null +++ b/crates/cpex-hosts-python/src/isolated/mod.rs @@ -0,0 +1,14 @@ +// Location: ./crates/cpex-hosts-python/src/isolated/mod.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck + +pub mod adapter; +pub mod factory; +pub mod payload; +pub mod subprocess; +pub mod venv; + +pub use adapter::IsolatedPythonPluginAdapter; +pub use factory::{IsolatedPythonPluginAdapterFactory, KIND}; +pub use payload::HookPayloadRegistry; diff --git a/crates/cpex-hosts-python/src/isolated/payload.rs b/crates/cpex-hosts-python/src/isolated/payload.rs new file mode 100644 index 00000000..e19953cb --- /dev/null +++ b/crates/cpex-hosts-python/src/isolated/payload.rs @@ -0,0 +1,341 @@ +// Location: ./crates/cpex-hosts-python/src/isolated/payload.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck +// +// Payload serialization registry. +// +// PluginPayload has no Serialize bound (object-safe by design). This +// module provides a HookPayloadRegistry that maps hook type names to +// (serialize_fn, deserialize_fn) shim pairs. Each shim downcasts the +// trait object to the concrete type before calling serde_json. +// +// Unknown hook names fall back to GenericPayload which wraps a raw +// serde_json::Value and passes it through unmodified. + +use std::collections::HashMap; + +use cpex_core::{ + cmf::{ + MessagePayload, + constants::{ + HOOK_CMF_LLM_INPUT, HOOK_CMF_LLM_OUTPUT, HOOK_CMF_PROMPT_POST_INVOKE, + HOOK_CMF_PROMPT_PRE_INVOKE, HOOK_CMF_RESOURCE_POST_FETCH, HOOK_CMF_RESOURCE_PRE_FETCH, + HOOK_CMF_TOOL_POST_INVOKE, HOOK_CMF_TOOL_PRE_INVOKE, + }, + }, + delegation::{DelegationPayload, HOOK_TOKEN_DELEGATE}, + error::{PluginError, PluginViolation}, + executor::ErasedResultFields, + hooks::payload::PluginPayload, + identity::{IdentityPayload, HOOK_IDENTITY_RESOLVE}, +}; + +// --------------------------------------------------------------------------- +// Type aliases for shim function pointers +// --------------------------------------------------------------------------- + +pub type SerializeFn = fn(&dyn PluginPayload) -> Result; +pub type DeserializeFn = fn(serde_json::Value) -> Result, serde_json::Error>; + +// --------------------------------------------------------------------------- +// GenericPayload — fallback for unknown hook names +// --------------------------------------------------------------------------- + +/// Opaque payload that carries raw JSON across the Python boundary. +/// Used when no concrete type is registered for a hook name. +#[derive(Debug, Clone)] +pub struct GenericPayload(pub serde_json::Value); + +cpex_core::impl_plugin_payload!(GenericPayload); + +// --------------------------------------------------------------------------- +// HookPayloadRegistry +// --------------------------------------------------------------------------- + +/// Maps hook type names to (serialize, deserialize) shim pairs. +pub struct HookPayloadRegistry { + serialize: HashMap<&'static str, SerializeFn>, + deserialize: HashMap<&'static str, DeserializeFn>, +} + +impl HookPayloadRegistry { + /// Empty registry. Use `default()` for a registry pre-populated with + /// all built-in cpex-core payload types. + pub fn empty() -> Self { + Self { + serialize: HashMap::new(), + deserialize: HashMap::new(), + } + } + + /// Register a (serialize, deserialize) shim pair for a hook type name. + pub fn register( + &mut self, + hook_name: &'static str, + ser: SerializeFn, + de: DeserializeFn, + ) { + self.serialize.insert(hook_name, ser); + self.deserialize.insert(hook_name, de); + } + + /// Serialize a payload trait object to JSON. + /// Falls back to `serde_json::Value::Null` for unknown hook names + /// (should not happen if the registry is fully populated). + pub fn payload_to_json( + &self, + hook_name: &str, + payload: &dyn PluginPayload, + ) -> Result { + match self.serialize.get(hook_name) { + Some(f) => f(payload), + None => { + // Unknown — try to downcast to GenericPayload and return its inner Value. + if let Some(g) = payload.as_any().downcast_ref::() { + Ok(g.0.clone()) + } else { + Ok(serde_json::Value::Null) + } + } + } + } + + /// Deserialize a JSON value to a concrete payload type, or GenericPayload. + pub fn json_to_payload( + &self, + hook_name: &str, + value: serde_json::Value, + ) -> Result, serde_json::Error> { + match self.deserialize.get(hook_name) { + Some(f) => f(value), + None => Ok(Box::new(GenericPayload(value))), + } + } + + /// Convert a worker JSON response into ErasedResultFields. + /// + /// Worker response schema: + /// { + /// "continue_processing": bool, + /// "violation": {code, reason, ...} | null, + /// "modified_payload": {...} | null, + /// "request_id": "...", ← present but ignored here (stripped by caller) + /// } + pub fn json_to_erased( + &self, + hook_name: &str, + response: serde_json::Value, + ) -> Result> { + let continue_processing = response + .get("continue_processing") + .and_then(|v| v.as_bool()) + .unwrap_or(true); + + let violation: Option = response + .get("violation") + .filter(|v| !v.is_null()) + .and_then(|v| serde_json::from_value(v.clone()).ok()); + + let modified_payload: Option> = response + .get("modified_payload") + .filter(|v| !v.is_null()) + .map(|v| { + self.json_to_payload(hook_name, v.clone()) + .map_err(|e| Box::new(PluginError::Config { message: e.to_string() })) + }) + .transpose()?; + + Ok(ErasedResultFields { + continue_processing, + modified_payload, + modified_extensions: None, + violation, + }) + } +} + +impl Default for HookPayloadRegistry { + /// Pre-populate with all built-in cpex-core payload types. + fn default() -> Self { + let mut r = Self::empty(); + + // CMF hooks — all use MessagePayload + for name in &[ + HOOK_CMF_TOOL_PRE_INVOKE, + HOOK_CMF_TOOL_POST_INVOKE, + HOOK_CMF_LLM_INPUT, + HOOK_CMF_LLM_OUTPUT, + HOOK_CMF_PROMPT_PRE_INVOKE, + HOOK_CMF_PROMPT_POST_INVOKE, + HOOK_CMF_RESOURCE_PRE_FETCH, + HOOK_CMF_RESOURCE_POST_FETCH, + ] { + r.register(name, serialize_message_payload, deserialize_message_payload); + } + + r.register( + HOOK_IDENTITY_RESOLVE, + serialize_identity_payload, + deserialize_identity_payload, + ); + r.register( + HOOK_TOKEN_DELEGATE, + serialize_delegation_payload, + deserialize_delegation_payload, + ); + + r + } +} + +// --------------------------------------------------------------------------- +// Shim functions — one pair per concrete payload type +// --------------------------------------------------------------------------- + +fn serialize_message_payload(p: &dyn PluginPayload) -> Result { + let concrete = p + .as_any() + .downcast_ref::() + .expect("serialize_message_payload: downcast failed — handler registered wrong type"); + serde_json::to_value(concrete) +} + +fn deserialize_message_payload(v: serde_json::Value) -> Result, serde_json::Error> { + Ok(Box::new(serde_json::from_value::(v)?)) +} + +fn serialize_identity_payload(p: &dyn PluginPayload) -> Result { + let concrete = p + .as_any() + .downcast_ref::() + .expect("serialize_identity_payload: downcast failed"); + serde_json::to_value(concrete) +} + +fn deserialize_identity_payload(v: serde_json::Value) -> Result, serde_json::Error> { + Ok(Box::new(serde_json::from_value::(v)?)) +} + +fn serialize_delegation_payload(p: &dyn PluginPayload) -> Result { + let concrete = p + .as_any() + .downcast_ref::() + .expect("serialize_delegation_payload: downcast failed"); + serde_json::to_value(concrete) +} + +fn deserialize_delegation_payload(v: serde_json::Value) -> Result, serde_json::Error> { + Ok(Box::new(serde_json::from_value::(v)?)) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use cpex_core::cmf::{Message, MessagePayload, enums::Role}; + + fn make_registry() -> HookPayloadRegistry { + HookPayloadRegistry::default() + } + + fn msg_payload() -> MessagePayload { + MessagePayload { + message: Message::text(Role::User, "hello"), + } + } + + #[test] + fn payload_to_json_message_payload() { + let r = make_registry(); + let p: Box = Box::new(msg_payload()); + let v = r.payload_to_json(HOOK_CMF_TOOL_PRE_INVOKE, p.as_ref()).unwrap(); + assert!(v.is_object()); + assert!(v.get("message").is_some()); + } + + #[test] + fn json_to_erased_allow() { + let r = make_registry(); + let resp = serde_json::json!({ + "continue_processing": true, + "violation": null, + "modified_payload": null, + "request_id": "test-123" + }); + let erased = r.json_to_erased(HOOK_CMF_TOOL_PRE_INVOKE, resp).unwrap(); + assert!(erased.continue_processing); + assert!(erased.violation.is_none()); + assert!(erased.modified_payload.is_none()); + } + + #[test] + fn json_to_erased_deny_with_violation() { + let r = make_registry(); + let resp = serde_json::json!({ + "continue_processing": false, + "violation": {"code": "pii.found", "reason": "PII detected", "description": null, "details": {}, "plugin_name": null}, + "modified_payload": null, + "request_id": "test-456" + }); + let erased = r.json_to_erased(HOOK_CMF_TOOL_PRE_INVOKE, resp).unwrap(); + assert!(!erased.continue_processing); + let v = erased.violation.unwrap(); + assert_eq!(v.code, "pii.found"); + } + + #[test] + fn json_to_erased_with_modified_payload() { + let r = make_registry(); + // ContentPart uses serde tag = "content_type". + let msg = serde_json::json!({ + "role": "user", + "content": [{"content_type": "text", "text": "modified"}] + }); + let resp = serde_json::json!({ + "continue_processing": true, + "violation": null, + "modified_payload": {"message": msg}, + "request_id": "test-789" + }); + let erased = r.json_to_erased(HOOK_CMF_TOOL_PRE_INVOKE, resp).unwrap(); + assert!(erased.continue_processing); + assert!(erased.modified_payload.is_some()); + // Concrete type should be MessagePayload. + let mp = erased.modified_payload.unwrap(); + assert!(mp.as_any().downcast_ref::().is_some()); + } + + #[test] + fn json_to_erased_unknown_hook_falls_back_to_generic() { + let r = make_registry(); + let resp = serde_json::json!({ + "continue_processing": true, + "violation": null, + "modified_payload": {"some": "data"}, + "request_id": "test-000" + }); + let erased = r.json_to_erased("unknown.hook", resp).unwrap(); + assert!(erased.modified_payload.is_some()); + let mp = erased.modified_payload.unwrap(); + assert!(mp.as_any().downcast_ref::().is_some()); + } + + #[test] + fn round_trip_message_payload() { + let r = make_registry(); + let original = msg_payload(); + let p: &dyn PluginPayload = &original; + let json = r.payload_to_json(HOOK_CMF_TOOL_PRE_INVOKE, p).unwrap(); + let boxed = r.json_to_payload(HOOK_CMF_TOOL_PRE_INVOKE, json).unwrap(); + let roundtripped = boxed.as_any().downcast_ref::().unwrap(); + // Content should survive the round-trip. + assert_eq!( + format!("{:?}", original.message), + format!("{:?}", roundtripped.message), + ); + } +} diff --git a/crates/cpex-hosts-python/src/isolated/subprocess.rs b/crates/cpex-hosts-python/src/isolated/subprocess.rs new file mode 100644 index 00000000..91ab44ca --- /dev/null +++ b/crates/cpex-hosts-python/src/isolated/subprocess.rs @@ -0,0 +1,300 @@ +// Location: ./crates/cpex-hosts-python/src/isolated/subprocess.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck +// +// WorkerProcess — long-running worker.py subprocess lifecycle. +// +// Mirrors Python's VenvProcessCommunicator (venv_comm.py): +// - spawn: tokio::process::Command with stdin/stdout/stderr piped +// - send_task: write JSON-lines task + request_id, await oneshot response +// - shutdown: send {"task_type":"shutdown"}, wait, then kill +// - Drop: synchronously kills the child to prevent subprocess orphans + +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; + +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::process::{Child, Command}; +use tokio::sync::{oneshot, Mutex}; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +#[derive(Debug, thiserror::Error)] +pub enum WorkerError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + #[error("task timed out after {0:?}")] + Timeout(Duration), + #[error("worker process died or channel closed")] + ProcessDied, + #[error("worker returned error: {0}")] + WorkerError(String), +} + +type ResponseSender = oneshot::Sender>; + +/// Message sent to the background I/O task. +struct WorkerTask { + request_id: String, + task_data: serde_json::Value, + reply_tx: ResponseSender, +} + +/// Long-running worker.py subprocess with JSON-lines I/O. +pub struct WorkerProcess { + /// Sender to the background I/O task. `None` after shutdown. + task_tx: Arc>>>, + /// Handle for the background I/O task. + _io_task: tokio::task::JoinHandle<()>, + /// The child process handle — kept for kill() in Drop. + /// + /// Wrapped in Arc so we can share it between the Drop impl + /// and the background I/O task (which needs to signal process death). + child_pid: Arc>>, + /// Raw kill handle — we keep a separate `std::process::Child`-level + /// kill path because `tokio::process::Child::kill()` is async and + /// unavailable in Drop. We use a raw kill(pid, SIGKILL) instead. + raw_child: Arc>>, +} + +impl WorkerProcess { + /// Spawn the worker.py subprocess and start the background I/O task. + pub async fn spawn(python_exe: &Path, script_path: &Path, cwd: &Path) -> Result { + let mut child = Command::new(python_exe) + .arg(script_path) + .current_dir(cwd) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn()?; + + let pid = child.id(); + let stdin = child.stdin.take().expect("stdin piped"); + let stdout = child.stdout.take().expect("stdout piped"); + let stderr = child.stderr.take().expect("stderr piped"); + + let (task_tx, mut task_rx) = tokio::sync::mpsc::channel::(64); + let task_tx = Arc::new(Mutex::new(Some(task_tx))); + + let raw_child = Arc::new(Mutex::new(Some(child))); + let raw_child_for_io = Arc::clone(&raw_child); + + // Background task: owns stdin writer + stdout reader, routes responses. + let io_task = tokio::spawn(async move { + let mut writer = tokio::io::BufWriter::new(stdin); + let mut reader = BufReader::new(stdout); + let mut stderr_reader = BufReader::new(stderr); + + // Pending requests waiting for their response. + let mut pending: HashMap = HashMap::new(); + let mut line_buf = String::new(); + let mut stderr_buf = String::new(); + + loop { + tokio::select! { + // New task to send. + maybe_task = task_rx.recv() => { + let Some(task) = maybe_task else { + // Sender dropped — shut down. + break; + }; + let mut obj = task.task_data.clone(); + if let Some(map) = obj.as_object_mut() { + map.insert("request_id".to_string(), serde_json::json!(task.request_id.clone())); + } + let line = match serde_json::to_string(&obj) { + Ok(s) => s, + Err(e) => { + let _ = task.reply_tx.send(Err(WorkerError::Json(e))); + continue; + } + }; + pending.insert(task.request_id, task.reply_tx); + if let Err(e) = writer.write_all(format!("{}\n", line).as_bytes()).await { + error!("failed to write to worker stdin: {}", e); + break; + } + if let Err(e) = writer.flush().await { + error!("failed to flush worker stdin: {}", e); + break; + } + } + // Response line from worker stdout. + n = reader.read_line(&mut line_buf) => { + match n { + Ok(0) => { + // EOF — worker exited. + info!("worker stdout EOF"); + break; + } + Ok(_) => { + let trimmed = line_buf.trim(); + if !trimmed.is_empty() { + match serde_json::from_str::(trimmed) { + Ok(resp) => { + let rid = resp.get("request_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if let Some(tx) = pending.remove(&rid) { + // Check for worker-level error. + if resp.get("status").and_then(|v| v.as_str()) == Some("error") { + let msg = resp.get("message") + .and_then(|v| v.as_str()) + .unwrap_or("worker error") + .to_string(); + let _ = tx.send(Err(WorkerError::WorkerError(msg))); + } else { + let _ = tx.send(Ok(resp)); + } + } else { + debug!("no pending request for request_id={:?}", rid); + } + } + Err(e) => { + warn!("could not parse worker response: {} — {:?}", e, trimmed); + } + } + } + line_buf.clear(); + } + Err(e) => { + error!("error reading worker stdout: {}", e); + break; + } + } + } + // Drain stderr to logs. + n = stderr_reader.read_line(&mut stderr_buf) => { + match n { + Ok(0) | Err(_) => {} + Ok(_) => { + let trimmed = stderr_buf.trim(); + if !trimmed.is_empty() { + debug!("[worker stderr] {}", trimmed); + } + stderr_buf.clear(); + } + } + } + } + } + + // Drain any remaining pending requests with ProcessDied. + for (_, tx) in pending.drain() { + let _ = tx.send(Err(WorkerError::ProcessDied)); + } + + // Wait for child to exit. + let mut guard = raw_child_for_io.lock().await; + if let Some(mut child) = guard.take() { + if let Err(e) = child.wait().await { + debug!("worker wait() error: {}", e); + } + } + }); + + Ok(Self { + task_tx, + _io_task: io_task, + child_pid: Arc::new(Mutex::new(pid)), + raw_child, + }) + } + + /// Send a task dict and await the response, with a timeout. + pub async fn send_task( + &self, + task_data: serde_json::Value, + timeout: Duration, + ) -> Result { + let request_id = Uuid::new_v4().to_string(); + let (reply_tx, reply_rx) = oneshot::channel(); + + { + let guard = self.task_tx.lock().await; + let sender = guard.as_ref().ok_or(WorkerError::ProcessDied)?; + sender + .send(WorkerTask { + request_id, + task_data, + reply_tx, + }) + .await + .map_err(|_| WorkerError::ProcessDied)?; + } + + tokio::time::timeout(timeout, reply_rx) + .await + .map_err(|_| WorkerError::Timeout(timeout))? + .map_err(|_| WorkerError::ProcessDied)? + } + + /// Graceful shutdown: send shutdown task, wait up to `timeout_secs`, then kill. + pub async fn shutdown(&self, timeout_secs: u64) { + let shutdown_data = serde_json::json!({ + "task_type": "shutdown", + }); + let timeout = Duration::from_secs(timeout_secs); + match self.send_task(shutdown_data, timeout).await { + Ok(_) => { + info!("worker shutdown acknowledged"); + } + Err(WorkerError::Timeout(_)) => { + warn!("worker shutdown timed out — killing"); + self.kill().await; + } + Err(e) => { + debug!("worker shutdown send error: {} — likely already dead", e); + } + } + // Close the sender so the I/O task can finish. + *self.task_tx.lock().await = None; + } + + async fn kill(&self) { + let mut guard = self.raw_child.lock().await; + if let Some(ref mut child) = *guard { + if let Err(e) = child.kill().await { + debug!("kill() error: {}", e); + } + } + } +} + +impl Drop for WorkerProcess { + fn drop(&mut self) { + // Best-effort synchronous kill to prevent orphan subprocesses. + // We send SIGKILL via the OS directly since we can't await here. + if let Ok(guard) = self.child_pid.try_lock() { + if let Some(pid) = *guard { + #[cfg(unix)] + { + unsafe { + libc_kill(pid as i32, 9); // SIGKILL + } + } + #[cfg(windows)] + { + // On Windows, TerminateProcess would be the equivalent. + // For now we rely on the OS cleaning up when this process exits. + let _ = pid; + } + } + } + } +} + +#[cfg(unix)] +unsafe fn libc_kill(pid: i32, sig: i32) { + // Use the libc kill(2) syscall directly. + extern "C" { + fn kill(pid: i32, sig: i32) -> i32; + } + kill(pid, sig); +} diff --git a/crates/cpex-hosts-python/src/isolated/venv.rs b/crates/cpex-hosts-python/src/isolated/venv.rs new file mode 100644 index 00000000..0f184b61 --- /dev/null +++ b/crates/cpex-hosts-python/src/isolated/venv.rs @@ -0,0 +1,312 @@ +// Location: ./crates/cpex-hosts-python/src/isolated/venv.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck +// +// Venv creation and requirements-hash cache, mirroring the Python-side +// IsolatedVenvPlugin._compute_requirements_hash / _is_venv_cache_valid / +// _save_cache_metadata / create_venv logic. + +use std::path::{Path, PathBuf}; +use std::process::Command; + +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tracing::{debug, info, warn}; + +#[derive(Debug, Error)] +pub enum VenvError { + #[error("venv creation failed: {0}")] + Create(String), + #[error("pip install failed: {0}")] + PipInstall(String), + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), +} + +/// Outcome of `ensure_venv()`. +#[derive(Debug, PartialEq, Eq)] +pub enum VenvState { + /// Venv already existed and requirements hash matched — reused as-is. + Reused, + /// Venv was (re-)created and requirements installed. + Created, +} + +#[derive(Debug, Serialize, Deserialize)] +struct VenvMetadata { + venv_path: String, + requirements_file: Option, + requirements_hash: String, + python_version: String, +} + +/// Manages the lifecycle of a single venv associated with one Python plugin. +pub struct VenvManager { + pub venv_path: PathBuf, + pub requirements_file: Option, + /// `/../.cpex/venv_cache/_metadata.json` + cache_metadata_path: PathBuf, +} + +impl VenvManager { + /// Create a new VenvManager. + /// + /// * `venv_path` — where the venv lives (or will be created) + /// * `requirements_file` — optional path to requirements.txt + pub fn new(venv_path: PathBuf, requirements_file: Option) -> Self { + let venv_name = venv_path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .to_string(); + let plugin_dir = venv_path + .parent() + .unwrap_or(Path::new(".")); + let cache_metadata_path = plugin_dir + .join(".cpex") + .join("venv_cache") + .join(format!("{}_metadata.json", venv_name)); + // Relative requirements paths are resolved against the plugin dir + // (venv_path.parent()), mirroring Python's `package_path / requirements_file`. + let requirements_file = requirements_file.map(|r| { + if r.is_relative() { + plugin_dir.join(r) + } else { + r + } + }); + Self { + venv_path, + requirements_file, + cache_metadata_path, + } + } + + /// Compute SHA-256 of the requirements file, or an empty-bytes hash if absent. + /// Matches Python's `hashlib.sha256(); hasher.update(content); hasher.hexdigest()`. + fn compute_requirements_hash(&self) -> String { + use sha2::{Digest, Sha256}; + use std::io::Read; + let mut hasher = Sha256::new(); + if let Some(ref req) = self.requirements_file { + if let Ok(mut f) = std::fs::File::open(req) { + let mut buf = Vec::new(); + if f.read_to_end(&mut buf).is_ok() { + hasher.update(&buf); + } + } + } + // No file or unreadable: finalize over zero bytes — identical to + // Python's `hasher.update(b"")` fallthrough. + format!("{:x}", hasher.finalize()) + } + + fn is_cache_valid(&self) -> bool { + if !self.venv_path.exists() { + debug!("venv path does not exist: {:?}", self.venv_path); + return false; + } + if !self.cache_metadata_path.exists() { + debug!("metadata file does not exist: {:?}", self.cache_metadata_path); + return false; + } + match std::fs::read_to_string(&self.cache_metadata_path) { + Ok(s) => match serde_json::from_str::(&s) { + Ok(meta) => { + let current = self.compute_requirements_hash(); + if meta.requirements_hash != current { + info!( + "requirements changed (cached={}, current={})", + meta.requirements_hash, current + ); + false + } else { + info!("valid venv cache at {:?}", self.venv_path); + true + } + } + Err(e) => { + warn!("could not parse venv metadata: {}", e); + false + } + }, + Err(e) => { + warn!("could not read venv metadata: {}", e); + false + } + } + } + + fn save_metadata(&self) -> Result<(), VenvError> { + if let Some(parent) = self.cache_metadata_path.parent() { + std::fs::create_dir_all(parent)?; + } + let meta = VenvMetadata { + venv_path: self.venv_path.display().to_string(), + requirements_file: self.requirements_file.as_ref().map(|p| p.display().to_string()), + requirements_hash: self.compute_requirements_hash(), + python_version: python_version_string(), + }; + let json = serde_json::to_string_pretty(&meta)?; + std::fs::write(&self.cache_metadata_path, json)?; + info!("saved venv cache metadata to {:?}", self.cache_metadata_path); + Ok(()) + } + + fn create_venv(&self) -> Result<(), VenvError> { + if self.venv_path.exists() { + info!("removing stale venv at {:?}", self.venv_path); + std::fs::remove_dir_all(&self.venv_path)?; + } + info!("creating venv at {:?}", self.venv_path); + let status = Command::new("python3") + .args(["-m", "venv"]) + .arg(&self.venv_path) + .status()?; + if !status.success() { + return Err(VenvError::Create(format!( + "python3 -m venv exited with {:?}", + status.code() + ))); + } + Ok(()) + } + + fn pip_install(&self) -> Result<(), VenvError> { + let Some(ref req) = self.requirements_file else { + return Ok(()); + }; + if !req.exists() { + debug!("requirements file {:?} does not exist — skipping pip install", req); + return Ok(()); + } + let python = self.python_executable(); + info!("running pip install -r {:?}", req); + let status = Command::new(&python) + .args(["-m", "pip", "install", "-r"]) + .arg(req) + .status()?; + if !status.success() { + return Err(VenvError::PipInstall(format!( + "pip install exited with {:?}", + status.code() + ))); + } + Ok(()) + } + + /// Return the platform-correct path to the Python interpreter inside the venv. + pub fn python_executable(&self) -> PathBuf { + #[cfg(windows)] + { + self.venv_path.join("Scripts").join("python.exe") + } + #[cfg(not(windows))] + { + self.venv_path.join("bin").join("python") + } + } + + /// Ensure the venv exists and requirements are installed. + /// + /// Returns `VenvState::Reused` if the cache was valid; `VenvState::Created` + /// after creating/reinstalling. + pub async fn ensure_venv(&self) -> Result { + if self.is_cache_valid() { + return Ok(VenvState::Reused); + } + self.create_venv()?; + self.pip_install()?; + self.save_metadata()?; + Ok(VenvState::Created) + } +} + +fn python_version_string() -> String { + Command::new("python3") + .args(["--version"]) + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .map(|s| s.trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Write; + use tempfile::TempDir; + + fn req_path(dir: &TempDir, content: &str) -> PathBuf { + let p = dir.path().join("requirements.txt"); + std::fs::write(&p, content).unwrap(); + p + } + + #[tokio::test] + async fn fresh_venv_is_created() { + let dir = TempDir::new().unwrap(); + let venv = dir.path().join(".venv"); + let req = req_path(&dir, ""); + let mgr = VenvManager::new(venv.clone(), Some(req)); + let state = mgr.ensure_venv().await.unwrap(); + assert_eq!(state, VenvState::Created); + assert!(venv.exists(), "venv dir should exist after creation"); + assert!(mgr.python_executable().exists(), "python executable should exist"); + } + + #[tokio::test] + async fn cache_hit_reuses_venv() { + let dir = TempDir::new().unwrap(); + let venv = dir.path().join(".venv"); + let req = req_path(&dir, ""); + let mgr = VenvManager::new(venv.clone(), Some(req)); + mgr.ensure_venv().await.unwrap(); + // Second call should hit the cache. + let state = mgr.ensure_venv().await.unwrap(); + assert_eq!(state, VenvState::Reused); + } + + #[tokio::test] + async fn cache_miss_on_requirements_change() { + let dir = TempDir::new().unwrap(); + let venv = dir.path().join(".venv"); + let req = req_path(&dir, ""); + let mgr = VenvManager::new(venv.clone(), Some(req.clone())); + mgr.ensure_venv().await.unwrap(); + // Mutate requirements file. + let mut f = std::fs::OpenOptions::new().append(true).open(&req).unwrap(); + writeln!(f, "# changed").unwrap(); + drop(f); + let state = mgr.ensure_venv().await.unwrap(); + assert_eq!(state, VenvState::Created); + } + + #[tokio::test] + async fn missing_requirements_file_still_creates_venv() { + let dir = TempDir::new().unwrap(); + let venv = dir.path().join(".venv"); + // No requirements file. + let mgr = VenvManager::new(venv.clone(), None); + let state = mgr.ensure_venv().await.unwrap(); + assert_eq!(state, VenvState::Created); + assert!(venv.exists()); + } + + #[test] + fn python_executable_path_is_inside_venv() { + let dir = TempDir::new().unwrap(); + let venv = dir.path().join(".venv"); + let mgr = VenvManager::new(venv.clone(), None); + let exe = mgr.python_executable(); + assert!(exe.starts_with(&venv)); + } +} diff --git a/crates/cpex-hosts-python/src/lib.rs b/crates/cpex-hosts-python/src/lib.rs new file mode 100644 index 00000000..afdb5978 --- /dev/null +++ b/crates/cpex-hosts-python/src/lib.rs @@ -0,0 +1,16 @@ +// Location: ./crates/cpex-hosts-python/src/lib.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck +// +// cpex-hosts-python — subprocess-isolated Python plugin adapter. +// +// Lets the Rust PluginManager load Python plugin classes through a +// subprocess-isolated virtual environment. Plugin operators declare +// `kind: "isolated_venv"` and `config: class_name: module.ClassName` in YAML; the Rust +// runtime creates and manages the venv, spawns worker.py, and drives +// it via the JSON-lines stdin/stdout protocol. + +pub mod isolated; + +pub use isolated::{HookPayloadRegistry, IsolatedPythonPluginAdapterFactory, KIND}; diff --git a/crates/cpex-hosts-python/tests/fixtures/echo_plugin.py b/crates/cpex-hosts-python/tests/fixtures/echo_plugin.py new file mode 100644 index 00000000..561b13d8 --- /dev/null +++ b/crates/cpex-hosts-python/tests/fixtures/echo_plugin.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# Location: ./crates/cpex-hosts-python/tests/fixtures/echo_plugin.py +# Copyright 2026 +# SPDX-License-Identifier: Apache-2.0 +# +# Minimal CPEX plugin fixture for Rust integration tests. +# Three variants: +# EchoPlugin — returns allow() unchanged +# ModifyPlugin — returns a modified payload (mutates a field) +# ErrorPlugin — raises RuntimeError to test on_error handling + +from cpex.framework.base import Plugin +from cpex.framework.decorator import hook +from cpex.framework.models import PluginConfig, PluginResult + + +class EchoPlugin(Plugin): + """Minimal plugin that echoes the payload unchanged.""" + + def __init__(self, config: PluginConfig) -> None: + super().__init__(config) + + async def initialize(self) -> None: + pass + + async def shutdown(self) -> None: + pass + + @hook("cmf.tool_pre_invoke") + async def on_tool_pre_invoke(self, payload, context) -> PluginResult: + return PluginResult(continue_processing=True) + + @hook("cmf.tool_post_invoke") + async def on_tool_post_invoke(self, payload, context) -> PluginResult: + return PluginResult(continue_processing=True) + + +class ModifyPlugin(Plugin): + """Plugin that modifies the payload by wrapping the content.""" + + def __init__(self, config: PluginConfig) -> None: + super().__init__(config) + + @hook("cmf.tool_pre_invoke") + async def on_tool_pre_invoke(self, payload, context) -> PluginResult: + # Return a modified payload — add a sentinel field to the dict. + modified = dict(payload) if isinstance(payload, dict) else payload + return PluginResult(continue_processing=True, modified_payload=modified) + + +class ErrorPlugin(Plugin): + """Plugin that always raises RuntimeError — used to test on_error.""" + + def __init__(self, config: PluginConfig) -> None: + super().__init__(config) + + @hook("cmf.tool_pre_invoke") + async def on_tool_pre_invoke(self, payload, context) -> PluginResult: + raise RuntimeError("intentional error from ErrorPlugin") + + +class NoLifecyclePlugin(Plugin): + """Plugin with no initialize/shutdown — tests AC-6 (missing = no-op).""" + + def __init__(self, config: PluginConfig) -> None: + super().__init__(config) + + @hook("cmf.tool_pre_invoke") + async def on_tool_pre_invoke(self, payload, context) -> PluginResult: + return PluginResult(continue_processing=True) diff --git a/crates/cpex-hosts-python/tests/isolated_e2e.rs b/crates/cpex-hosts-python/tests/isolated_e2e.rs new file mode 100644 index 00000000..47b2beb9 --- /dev/null +++ b/crates/cpex-hosts-python/tests/isolated_e2e.rs @@ -0,0 +1,288 @@ +// Location: ./crates/cpex-hosts-python/tests/isolated_e2e.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck +// +// End-to-end integration test: load a real Python plugin class through the +// Rust PluginManager using `kind: "isolated_venv"` config. +// +// Requirements: Python 3.11+ must be in PATH and cpex must be importable. +// Tests are skipped gracefully when Python is absent. + +use std::path::PathBuf; +use std::process::Command; +use std::sync::Arc; + +use cpex_core::{ + cmf::{Message, MessagePayload, constants::HOOK_CMF_TOOL_PRE_INVOKE, enums::Role}, + hooks::payload::Extensions, + manager::PluginManager, +}; +use cpex_hosts_python::{HookPayloadRegistry, IsolatedPythonPluginAdapterFactory, KIND}; + +fn fixtures_dir() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("tests") + .join("fixtures") +} + +fn worker_script_path() -> PathBuf { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .parent() + .unwrap() + .join("cpex") + .join("framework") + .join("isolated") + .join("worker.py") +} + +fn python3_available() -> bool { + Command::new("python3") + .arg("--version") + .output() + .map(|o| o.status.success()) + .unwrap_or(false) +} + +fn make_factory() -> IsolatedPythonPluginAdapterFactory { + IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default()) + .with_worker_script(worker_script_path()) +} + +fn plugin_yaml(class_name: &str, hook: &str, on_error: &str, mode: &str) -> String { + let fix = fixtures_dir(); + format!( + r#" +plugins: + - name: test-plugin + kind: "{kind}" + hooks: ["{hook}"] + mode: {mode} + priority: 10 + on_error: {on_error} + config: + class_name: "{class_name}" + requirements_file: "{req}" + plugin_dirs: + - "{fix}" + venv_path: "{venv}" +"#, + kind = KIND, + class_name = class_name, + hook = hook, + mode = mode, + on_error = on_error, + req = fix.join("requirements.txt").display(), + fix = fix.display(), + venv = fix.join(".venv").display(), + ) +} + +fn cpex_root() -> PathBuf { + // CARGO_MANIFEST_DIR = .../cpex/crates/cpex-hosts-python + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .parent() + .unwrap() + .to_path_buf() +} + +/// Write a requirements.txt with the absolute cpex install path. +/// This avoids pip's inconsistent relative-path resolution inside `-r` files. +fn write_requirements_txt() { + let req_path = fixtures_dir().join("requirements.txt"); + let cpex = cpex_root(); + std::fs::write( + &req_path, + format!("-e {}\n", cpex.display()), + ) + .expect("failed to write requirements.txt"); +} + +fn tool_payload() -> MessagePayload { + MessagePayload { + message: Message::text(Role::User, "test message"), + } +} + +fn make_manager(class_name: &str, on_error: &str) -> Arc { + write_requirements_txt(); + let mgr = Arc::new(PluginManager::default()); + mgr.register_factory(KIND, Box::new(make_factory())); + let yaml = plugin_yaml(class_name, HOOK_CMF_TOOL_PRE_INVOKE, on_error, "sequential"); + mgr.load_config_yaml(&yaml).expect("load_config_yaml failed"); + mgr +} + +// --------------------------------------------------------------------------- +// AC-8: Plugin loaded via `kind: "isolated_venv://echo_plugin.EchoPlugin"` +// and invoked successfully (allow result). +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn echo_plugin_allow_result() { + if !python3_available() { + eprintln!("SKIP: python3 not in PATH"); + return; + } + + let mgr = make_manager("echo_plugin.EchoPlugin", "fail"); + mgr.initialize().await.unwrap(); + + let (result, _bg) = mgr + .invoke_by_name( + HOOK_CMF_TOOL_PRE_INVOKE, + Box::new(tool_payload()), + Extensions::default(), + None, + ) + .await; + + assert!( + result.continue_processing, + "echo plugin should allow: violation={:?}", + result.violation + ); + + mgr.shutdown().await; +} + +// --------------------------------------------------------------------------- +// AC-9: Second invocation reuses the venv. +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn second_invocation_reuses_venv() { + if !python3_available() { + eprintln!("SKIP: python3 not in PATH"); + return; + } + + let fix = fixtures_dir(); + let venv_path = fix.join(".venv"); + + let mgr = make_manager("echo_plugin.EchoPlugin", "fail"); + mgr.initialize().await.unwrap(); + + let (result, _bg) = mgr + .invoke_by_name( + HOOK_CMF_TOOL_PRE_INVOKE, + Box::new(tool_payload()), + Extensions::default(), + None, + ) + .await; + assert!(result.continue_processing); + + assert!(venv_path.exists(), "venv should be created by initialize()"); + + // Second manager pointing at same venv should reuse it. + let mgr2 = make_manager("echo_plugin.EchoPlugin", "fail"); + mgr2.initialize().await.unwrap(); + let (result2, _bg2) = mgr2 + .invoke_by_name( + HOOK_CMF_TOOL_PRE_INVOKE, + Box::new(tool_payload()), + Extensions::default(), + None, + ) + .await; + assert!(result2.continue_processing); + + mgr.shutdown().await; + mgr2.shutdown().await; +} + +// --------------------------------------------------------------------------- +// AC-5: Python exception → on_error:fail propagates error to caller. +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn error_plugin_on_error_fail_stops_pipeline() { + if !python3_available() { + eprintln!("SKIP: python3 not in PATH"); + return; + } + + let mgr = make_manager("echo_plugin.ErrorPlugin", "fail"); + mgr.initialize().await.unwrap(); + + let (result, _bg) = mgr + .invoke_by_name( + HOOK_CMF_TOOL_PRE_INVOKE, + Box::new(tool_payload()), + Extensions::default(), + None, + ) + .await; + + assert!( + !result.continue_processing || result.violation.is_some(), + "error plugin with on_error:fail should stop processing or set violation" + ); + + mgr.shutdown().await; +} + +// --------------------------------------------------------------------------- +// AC-6: Plugin without initialize/shutdown methods works without crashing. +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn no_lifecycle_plugin_works() { + if !python3_available() { + eprintln!("SKIP: python3 not in PATH"); + return; + } + + let mgr = make_manager("echo_plugin.NoLifecyclePlugin", "fail"); + mgr.initialize().await.unwrap(); + + let (result, _bg) = mgr + .invoke_by_name( + HOOK_CMF_TOOL_PRE_INVOKE, + Box::new(tool_payload()), + Extensions::default(), + None, + ) + .await; + assert!(result.continue_processing); + + mgr.shutdown().await; +} + +// --------------------------------------------------------------------------- +// AC-10: manager.shutdown() terminates the worker process within 5s. +// --------------------------------------------------------------------------- + +#[tokio::test] +async fn shutdown_terminates_worker_within_timeout() { + if !python3_available() { + eprintln!("SKIP: python3 not in PATH"); + return; + } + + let mgr = make_manager("echo_plugin.EchoPlugin", "fail"); + mgr.initialize().await.unwrap(); + + let (result, _bg) = mgr + .invoke_by_name( + HOOK_CMF_TOOL_PRE_INVOKE, + Box::new(tool_payload()), + Extensions::default(), + None, + ) + .await; + assert!(result.continue_processing); + + let start = std::time::Instant::now(); + mgr.shutdown().await; + assert!( + start.elapsed().as_secs() < 10, + "shutdown took too long: {:?}", + start.elapsed() + ); +} diff --git a/tests/unit/cpex/framework/isolated/test_worker.py b/tests/unit/cpex/framework/isolated/test_worker.py index 3dcfb686..7fe8f806 100644 --- a/tests/unit/cpex/framework/isolated/test_worker.py +++ b/tests/unit/cpex/framework/isolated/test_worker.py @@ -333,7 +333,7 @@ async def test_main_unexpected_exception(self, mock_process_task, mock_print, mo output_data = json.loads(printed_output) assert output_data["status"] == "error" assert "Unexpected error: Unexpected error occurred" in output_data["message"] - assert output_data["request_id"] == "unknown" + assert output_data["request_id"] == "req-789" @pytest.mark.asyncio @patch("sys.stdin") From c9cc237077e9fa84b32e653c499c4cd6ebcac36a Mon Sep 17 00:00:00 2001 From: habeck Date: Fri, 26 Jun 2026 11:57:25 -0400 Subject: [PATCH 3/3] enh: add e2e test for cpex-test-plugin (ignored by default since it requires the plugin to be installed before the test will pass) Signed-off-by: habeck --- crates/cpex-hosts-python/tests/config_e2e.rs | 101 +++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 crates/cpex-hosts-python/tests/config_e2e.rs diff --git a/crates/cpex-hosts-python/tests/config_e2e.rs b/crates/cpex-hosts-python/tests/config_e2e.rs new file mode 100644 index 00000000..f56632c6 --- /dev/null +++ b/crates/cpex-hosts-python/tests/config_e2e.rs @@ -0,0 +1,101 @@ +// Location: ./crates/cpex-hosts-python/tests/config_e2e.rs +// Copyright 2026 +// SPDX-License-Identifier: Apache-2.0 +// Authors: Ted Habeck +// +// End-to-end test: load PluginManager from plugins/config.yaml and invoke +// tool_pre_invoke on cpex-test-plugin. + +use std::path::PathBuf; +use std::process::Command; +use std::sync::Arc; + +use cpex_core::{ + cmf::{Message, MessagePayload, constants::HOOK_CMF_TOOL_PRE_INVOKE, enums::Role}, + hooks::payload::Extensions, + manager::PluginManager, +}; +use cpex_hosts_python::{HookPayloadRegistry, IsolatedPythonPluginAdapterFactory, KIND}; + +fn repo_root() -> PathBuf { + // CARGO_MANIFEST_DIR = .../cpex/crates/cpex-hosts-python + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .unwrap() + .parent() + .unwrap() + .to_path_buf() +} + +fn worker_script_path() -> PathBuf { + repo_root() + .join("cpex") + .join("framework") + .join("isolated") + .join("worker.py") +} + +fn python3_available() -> bool { + Command::new("python3") + .arg("--version") + .output() + .map(|o| o.status.success()) + .unwrap_or(false) +} + +// --------------------------------------------------------------------------- +// Load config from plugins/config.yaml and invoke tool_pre_invoke on cpex-test-plugin. +// +// To run this test, the cpex-test-plugin must be installed via: +// +// cpex plugin --type test-pypi install "cpex-test-plugin@>=0.2.0" +// +// Then run the test using this command: +// +// cargo test -p cpex-hosts-python --test config_e2e -- --ignored +// +// --------------------------------------------------------------------------- + +#[tokio::test] +#[ignore] +async fn cpex_test_plugin_tool_pre_invoke() { + if !python3_available() { + eprintln!("SKIP: python3 not in PATH"); + return; + } + + let root = repo_root(); + std::env::set_current_dir(&root).expect("failed to cd to repo root"); + let config_path = root.join("plugins").join("config.yaml"); + + let factory = IsolatedPythonPluginAdapterFactory::new(HookPayloadRegistry::default()) + .with_worker_script(worker_script_path()); + + let mgr = Arc::new(PluginManager::default()); + mgr.register_factory(KIND, Box::new(factory)); + mgr.load_config_file(&config_path) + .expect("failed to load plugins/config.yaml"); + + mgr.initialize().await.expect("initialize failed"); + + let payload = MessagePayload { + message: Message::text(Role::User, "test invocation"), + }; + + let (result, _bg) = mgr + .invoke_by_name( + HOOK_CMF_TOOL_PRE_INVOKE, + Box::new(payload), + Extensions::default(), + None, + ) + .await; + + assert!( + result.continue_processing, + "cpex-test-plugin tool_pre_invoke should allow: violation={:?}", + result.violation + ); + + mgr.shutdown().await; +}