diff --git a/docs/action-log-helper.md b/docs/action-log-helper.md new file mode 100644 index 0000000..9136afe --- /dev/null +++ b/docs/action-log-helper.md @@ -0,0 +1,66 @@ +# Action Log Helper + +`Action_Log_Helper` is the public integration surface for writing and reading action/event logs. + +- Class: `ClawPress\Helpers\Action_Log_Helper` +- DB store behind it: `ClawPress\Stores\Action_Log_Store` + +Use the helper from controllers/services. Do not call the store directly unless you are working on persistence internals. + +## Common Usage + +```php +use ClawPress\Helpers\Action_Log_Helper; + +$log_helper = Action_Log_Helper::get_instance(); + +$log_helper->log_event( + 'tool.execute', + [ + 'event_type' => 'tool_call', + 'status' => 'success', + 'message' => 'Workspace created.', + 'requesting_user_id' => get_current_user_id(), + 'execution_user_id' => 123, + 'context' => [ 'ability' => 'create_workspace' ], + ] +); +``` + +## API + +### `get_instance(): Action_Log_Helper` +Singleton accessor. + +### `log_event( string $action_name, array $args = [] ): bool` +Writes one log row. + +Supported `$args` keys: + +- `event_type` (string, default: `event`) +- `status` (string, allowed: `debug`, `info`, `success`, `warning`, `error`; invalid values become `info`) +- `message` (string) +- `requesting_user_id` (int; defaults to current user when available) +- `execution_user_id` (int) +- `context` (array; JSON-encoded for storage) + +### `get_recent_logs( array $args = [] ): array` +Reads recent rows and returns normalized records. + +Supported filters: + +- `event_type` (string) +- `status` (string) +- `requesting_user_id` (int) +- `execution_user_id` (int) +- `limit` (int, default `50`, max `500`) +- `offset` (int, default `0`) + +Returned `context` is decoded back to an array. + +## Notes + +- Input is sanitized by the helper before persistence. +- Status and event/action identifiers are normalized to predictable values. +- The helper is safe to use across admin, REST, and background execution paths. +- Schema/table management for action logs is owned by `Action_Log_Store` and called from plugin activation. diff --git a/docs/agent-loop-spec-edits.md b/docs/agent-loop-spec-edits.md deleted file mode 100644 index b4771d9..0000000 --- a/docs/agent-loop-spec-edits.md +++ /dev/null @@ -1,270 +0,0 @@ -## Issue #12 Spec Update: Non-Streaming Now, Streaming Later (Transport-Agnostic + Time-Sliced Runner) - -### Background / Constraint (NEW) - -* Current LLM integration uses **WP AI Client**, which **does not support streaming yet**. -* Therefore, long-running “run-to-completion” turns inside a single web request are **not reliable** (timeouts / proxy buffering / request aborts). -* The agent loop must support: - - 1. **non-streaming execution** with **polling-based progress**, and - 2. an **upgrade path** to streaming **without rewriting** core loop logic. - -### Non-Negotiable Design Rule (NEW) - -> The **agent loop logic** must be deterministic and resumable **independent of delivery transport**. -> Streaming is an optimization layer (transport), not a control-plane rewrite. - ---- - -## Updated Goal - -Keep the original goal (“one core loop used by chat + heartbeat + spawning”). ([GitHub][1]) -**Add**: the loop must support **time-sliced execution** and **event persistence** so it can run safely under Action Scheduler with non-streaming providers. - ---- - -## Architecture Changes / Additions - -### 0) Introduce explicit layers (NEW) - -1. **Loop Engine** (pure logic; no DB; no HTTP; no Action Scheduler assumptions) -2. **Session Store** (DB-backed, already implied by issue) -3. **Runner** (Action Scheduler tick executor; uses store + lock) -4. **Transport** (how progress is delivered: polling now, streaming later) - -This clarifies “transport-agnostic” in a concrete way. - ---- - -### 1) Agent Loop Helper becomes a “Loop Engine” (UPDATED) - -Current proposal: `class-agent-loop-helper.php` extracted from `Chat_Helper`. ([GitHub][1]) -Adjust it to: - -#### A) Provide **step-based execution** (NEW) - -Instead of “run loop until done”, expose: - -* `run_turn(TurnRequest $req, AgentSession $session, LoopOptions $opts, AgentTransport $transport): TurnResult` -* `run_slice(RunSliceRequest $req): RunSliceResult` - - * Executes **bounded work** (one LLM call OR a limited batch of tool calls), then returns a resumable state marker. - -**Why:** enables safe background execution even when LLM calls/tool runs are slow. - -#### B) Make the engine emit events (NEW) - -The engine must emit structured events as it progresses (even non-streaming): - -* `agent_start`, `turn_start`, `message_start`, `message_end` -* `tool_execution_start`, `tool_execution_update`, `tool_execution_end` -* `turn_end`, `agent_end` -* optionally `message_update` (no-op for non-streaming today, real deltas later) - -These events go to the **Transport** layer. - ---- - -### 2) Expand TurnRequest / TurnResult contracts (UPDATED) - -Existing contracts are good. ([GitHub][1]) -Add the fields needed for slicing + resumability: - -#### `TurnRequest` additions - -* `run_id` (unique per attempt) -* `attempt` (int) -* `slice_budget_ms` (hard per-tick time budget; e.g., 2000–5000ms) -* `max_steps_per_slice` (hard cap; e.g., 1 LLM call or N tool calls) -* `transport_mode` (`polling` | `streaming`) -* `resume_cursor` (opaque engine cursor/state token; optional) - -#### `TurnResult` additions - -* `status` expands to include: - - * `success`, `requires_confirmation`, `error`, `timeout`, - * **`in_progress`** (paused due to slice budget) -* `next_action` expands to: - - * `continue_now` (enqueue next tick ASAP) - * `continue_later` (backoff/retry scheduling) - * `stop` -* `resume_cursor` (present when `status=in_progress`) -* `events_cursor` (cursor for UI polling; optional) - ---- - -### 3) Formalize Store Models: Thread vs Run vs Event (NEW) - -Issue already calls for thread/session state + locking. ([GitHub][1]) -Make it explicit: - -#### A) `agent_threads` (long-lived) - -* `thread_id` -* `status` (idle|running|paused|error|dead) -* `policy_profile` (by trigger) -* scheduling: `last_run_at`, `next_run_at` -* lock fields (or separate lock table; see below) - -#### B) `agent_runs` (per attempt) - -* `run_id`, `thread_id` -* `trigger` (chat|heartbeat|spawned_agent|...) -* `status` (queued|running|waiting_llm|waiting_tools|paused|done|error) -* `attempt`, `retry_at`, `error_code`, `error_message` -* `resume_cursor` (opaque engine cursor) -* usage totals (tokens/cost if available) - -#### C) `agent_events` (append-only log for polling + debugging) - -* `event_id` (monotonic) -* `run_id`, `thread_id` -* `type` -* `payload` (json) -* `created_at` - -**Polling UI reads events**: `GET /runs/{run_id}/events?after={event_id}` - ---- - -### 4) Transport abstraction (NEW) - -Add an internal interface: - -* `AgentTransport::emit(AgentEvent $event): void` -* `AgentTransport::close(): void` - -Implementations: - -1. **PollingTransport** (default today): writes events to `agent_events` table -2. **StreamingTransport** (future): emits SSE/websocket updates *and optionally also persists events* (debug mode) - -**Core loop engine must never care which transport is in use.** - ---- - -### 5) Runner: Action Scheduler ticks become first-class (UPDATED) - -Issue already proposes a heartbeat consumer that claims runnable threads and runs the loop helper. ([GitHub][1]) -Adjust so that the heartbeat worker executes **run slices**: - -#### Runner algorithm (per tick) - -1. Claim runnable `agent_threads` -2. Acquire lock/lease (existing requirement) -3. Load or create `agent_run` in `running` state -4. Execute **one slice**: - - * time budget enforcement - * either: perform the next LLM call OR next tool batch -5. Persist: - - * updated `resume_cursor` - * `agent_run.status` + `next_action` - * emitted events (via PollingTransport) -6. If `next_action=continue_now`, enqueue another AS action immediately -7. Release lock/lease - ---- - -## Lock/Lease Semantics (UPDATED) - -Issue already calls for lock/lease + stale recovery. ([GitHub][1]) -Clarify: - -* Lock covers a **single slice execution**, not “the entire multi-slice run”. -* Lease must be renewed each tick; stale lease recovery should requeue the run safely. -* Store must support idempotency: - - * if a tick repeats (duplicate AS run), it should detect already-advanced `resume_cursor`/status and no-op. - ---- - -## UI / API Implications (NEW) - -Because WP AI Client is non-streaming today, “real time” must be achieved via polling: - -### Endpoints (internal or REST) - -* `POST /agent/runs` (chat/spawn) -> returns `run_id` -* `POST /agent/runs/{run_id}/enqueue` (optional) -> enqueue tick -* `GET /agent/runs/{run_id}` -> status + summary -* `GET /agent/runs/{run_id}/events?after=...` -> incremental event feed - -Chat can remain synchronous for small turns, but must have a fallback: - -* if request budget exceeded, return `{ run_id, status: in_progress }` and client switches to polling. - ---- - -## Streaming Upgrade Path (NEW) - -When WP AI Client supports streaming: - -* Implement `StreamingTransport` that emits `message_update` events live. -* Update the LLM adapter to emit deltas to the transport. -* **No change** to: - - * Run/session store schemas - * Lock/lease mechanism - * Tool execution logic - * State machine - * Runner (still valid; streaming can be used for UI only) - -Optional: allow a “streaming-only” immediate path for chat if hosting supports it, but do not remove the slice runner. - ---- - -## Updated Implementation Phases - -### Phase 1: Extract Loop Engine + Events (behavior-preserving) - -* Extract core loop out of `Chat_Helper` into Loop Engine -* Introduce `AgentTransport` and implement `PollingTransport` -* Emit events, even if chat endpoint doesn’t use them yet - -### Phase 2: Session Store + Run/Event tables + Lock manager - -* Implement DB-backed thread/run/event storage -* Implement lock/lease with stale recovery + idempotency keys -* Wire minimal admin debugging (at least inspect by run_id) - -### Phase 3: Runner (Action Scheduler) with slice execution - -* Implement slice/tick runner -* Enqueue follow-up ticks until run completes -* Ensure time budget enforcement + retry/backoff - -### Phase 4: Spawn adapter + hardening - -* Spawn endpoint creates thread + run and enqueues tick -* Dead-letter state after N failures -* Policy profiles by trigger (chat vs heartbeat vs spawned) - -### Phase 5: Streaming transport (future) - -* Add `StreamingTransport` + LLM delta emission when WP AI Client supports it -* Keep polling mode as config fallback - ---- - -## Acceptance Criteria (UPDATED) - -Keep existing acceptance criteria, and add: - -* Engine supports **time-sliced** execution (`status=in_progress` + `resume_cursor`). -* Background runner can complete multi-step runs without a long-lived HTTP request. -* Event log exists and UI can poll incremental progress (minimum viable observability). -* Transport is configurable: `polling` now; `streaming` later, without loop rewrite. -* Lock/lease is safe across multiple slices and resilient to duplicate scheduler invocations. - ---- - -### Notes / Definitions - -* “Slice” = bounded unit of work (one LLM call OR bounded tool batch). -* “Transport” = how progress events are delivered (persisted polling vs streaming). - -[1]: https://github.com/bradvin/clawpress/issues/12 "Refactor to reusable Agent Loop Helper for chat + heartbeat + future spawning · Issue #12 · bradvin/clawpress · GitHub" diff --git a/docs/agent-loop-spec-final.md b/docs/agent-loop-spec-final.md new file mode 100644 index 0000000..51ac005 --- /dev/null +++ b/docs/agent-loop-spec-final.md @@ -0,0 +1,334 @@ +## Agent Loop Spec (Final) + +## Summary +Refactor the current chat-bound agent execution into a reusable, transport-agnostic **Agent Loop runtime** that can be called from: + +1. synchronous chat requests, +2. heartbeat/background jobs, +3. future spawned-agent APIs. + +The loop must work reliably with **non-streaming providers now** and support **streaming later** without rewriting core execution logic. + +--- + +## Current State + +### What exists +- Chat execution path currently drives model/tool loop: + - `includes/rest/class-chat-controller.php` -> `Chat_Helper::generate_ai_reply()` + - core loop logic currently in `includes/helpers/class-chat-helper.php` +- Heartbeat scheduler exists: + - `includes/class-heartbeat.php` + - schedules `clawpress_heartbeat_tick` every 15 minutes via Action Scheduler + - tick triggers: `do_action( 'clawpress_run_scheduled_tasks' )` + +### What is missing +- No production-grade consumer attached to `clawpress_run_scheduled_tasks` for autonomous runs. +- No fully independent runtime loop surface shared by chat/background/spawn adapters. +- No completed run coordinator across triggers with robust claim/lock/retry lifecycle for all paths. + +Result: execution is still effectively request-bound in practice. + +--- + +## Constraints + +### Non-streaming first +- Current LLM integration uses WP AI Client, which does not yet provide streaming support. +- Long run-to-completion turns inside a single HTTP request are unreliable. + +### Non-negotiable design rule +The agent loop must be deterministic and resumable independent of delivery transport. + +- Streaming is a transport optimization. +- Streaming must not require control-plane rewrites. + +--- + +## Final Goal +Build one reusable loop runtime that provides: + +- identical core behavior across chat, heartbeat, and spawned agents, +- safe asynchronous/background execution via time-sliced runs, +- durable state and event persistence for polling-based progress now, +- a direct upgrade path to streaming transport later. + +--- + +## Final Architecture + +### 1) Explicit layers +Implement clear layers: + +1. **Loop Engine**: pure execution logic; no DB, no HTTP, no scheduler coupling. +2. **Stores**: DB-backed persistence for sessions/runs/events and locking metadata. + - session store: `ClawPress\Stores\Agent_Session_Store` + - run store: `ClawPress\Stores\Agent_Run_Store` + - event store: `ClawPress\Stores\Agent_Event_Store` for append-only run/session events +3. **Helpers**: orchestration-facing APIs over stores. + - session helper: `ClawPress\Helpers\Agent_Session_Helper` + - run helper: `ClawPress\Helpers\Agent_Run_Helper` + - event helper: `ClawPress\Helpers\Agent_Event_Helper` + - rule: loop/runner/transport/controllers call helpers; helpers call stores; stores own DB logic. +4. **Runner**: Action Scheduler tick executor that claims work and runs slices. +5. **Transport**: delivery channel for progress/events (`polling` now, `streaming` later). + +### 2) Loop Engine responsibilities +Extract loop responsibilities from `Chat_Helper` into `includes/helpers/class-agent-loop-helper.php` (name flexible): + +- provider/model resolution, +- context assembly and prompt preparation, +- model call orchestration, +- tool-call loop with bounded limits, +- confirmation batching behavior, +- usage/context metadata collection, +- normalized result payloads, +- event emission hooks. + +`Chat_Helper` becomes a thin adapter. + +### 3) Step-based and slice-based execution +Support both: + +- `run_turn(...)`: full turn execution for synchronous contexts when budget permits. +- `run_slice(...)`: bounded execution chunk (for runner/background). + +A slice should do bounded work (for example, one model call or a limited tool batch), then return resumable state. + +### 4) Transport abstraction +Define internal transport interface: + +- `emit( AgentEvent $event ): void` +- `close(): void` + +Implementations: + +- **PollingTransport** (default now): append events via `ClawPress\Helpers\Agent_Event_Helper` backed by `ClawPress\Stores\Agent_Event_Store`. +- **StreamingTransport** (future): emits deltas live; may still persist events for observability. + +The loop engine must not branch by transport-specific behavior. + +--- + +## Contracts + +### TurnRequest +Required/common fields: + +- `session_id` +- `trigger` (`chat`, `heartbeat`, `spawned_agent`, ...) +- `message` (optional for heartbeat-driven turns) +- `requesting_user_id` +- `execution_user_id` +- policy knobs (`allow_tools`, `require_confirmation`, limits/timeouts) + +Additions for resumability and slicing: + +- `run_id` (unique per attempt) +- `attempt` (int) +- `slice_budget_ms` (hard per-tick budget) +- `max_steps_per_slice` (hard cap per slice) +- `transport_mode` (`polling` | `streaming`) +- `resume_cursor` (opaque engine state token; optional) + +### TurnResult +Core fields: + +- `assistant_text` +- `tool_calls` trace +- optional `card` +- usage/context metadata + +Status values: + +- `success` +- `requires_confirmation` +- `error` +- `timeout` +- `in_progress` (slice paused due to budget) + +Next-action hints: + +- `continue_now` +- `continue_later` +- `stop` + +Additions for resumability and UI polling: + +- `resume_cursor` (when `status=in_progress`) +- `events_cursor` (optional incremental cursor) + +--- + +## Persistence Model + +### agent_sessions (long-lived) +Backed by: `ClawPress\Stores\Agent_Session_Store` +Accessed via: `ClawPress\Helpers\Agent_Session_Helper` + +- `session_id` +- `status` (`idle|running|paused|error|dead`) +- policy profile +- schedule fields (`last_run_at`, `next_run_at`) +- lock/lease metadata (or lock table) + +### agent_runs (per attempt) +Backed by: `ClawPress\Stores\Agent_Run_Store` +Accessed via: `ClawPress\Helpers\Agent_Run_Helper` + +- `run_id`, `session_id` +- `trigger` +- `status` (`queued|running|waiting_llm|waiting_tools|paused|done|error`) +- `attempt`, retry/backoff fields +- `resume_cursor` +- usage totals and error classification + +### agent_events (append-only) +Backed by: `ClawPress\Stores\Agent_Event_Store` +Accessed via: `ClawPress\Helpers\Agent_Event_Helper` + +- monotonic `event_id` +- `run_id`, `session_id` +- `type` +- JSON `payload` +- `created_at` + +UI polls event stream incrementally via cursor. + +--- + +## Runner (Action Scheduler) + +Runner algorithm per tick: + +1. Find and claim runnable sessions/runs. +2. Acquire lock/lease. +3. Load or create run state. +4. Execute one bounded slice. +5. Persist updated run/session state and emitted events. +6. If needed, enqueue follow-up tick immediately or with backoff. +7. Release lock/lease. + +### Lock semantics +- Lock scope is one slice execution, not entire multi-slice lifecycle. +- Lease renewed per tick. +- Stale lease recovery requeues safely. +- Idempotency checks prevent duplicate progress on repeated ticks. + +--- + +## Adapter Behavior + +### Chat adapter +Keep synchronous for small turns, but remain thin: + +1. persist inbound user message, +2. call loop runtime, +3. persist outputs, +4. return response. + +If request budget is exceeded, return `run_id` + `in_progress`, and client switches to polling. + +### Heartbeat/background adapter +Consume `clawpress_run_scheduled_tasks` and execute slices through runner. + +### Spawn adapter +Spawn endpoint should: + +1. create session, +2. seed initial context/message, +3. enqueue first run, +4. return session/run identifiers. + +No loop internals in spawn endpoint. + +--- + +## Policy and Safety + +### Policy by trigger +- `chat`: interactive confirmation behavior. +- `heartbeat` / `spawned`: destructive tools denied or queued by default. +- optional per-session policy profiles. + +### Guardrails +- max wall time per run/slice, +- max tool calls per run, +- bounded retries with exponential backoff, +- dead-letter terminal state after N failures, +- idempotency keys for run attempts. + +--- + +## Observability +Log structured run data (reuse/extend action log + run/event records): + +- `run_id`, `session_id`, trigger, +- tool trace + statuses, +- provider/model and usage, +- error type + retry count, +- final outcome. + +Polling endpoint must expose incremental run events. + +--- + +## API Implications +Minimum endpoints: + +- `POST /agent/runs` -> create run and return `run_id` +- `POST /agent/runs/{run_id}/enqueue` (optional) +- `GET /agent/runs/{run_id}` -> status summary +- `GET /agent/runs/{run_id}/events?after={event_id}` -> incremental events + +--- + +## Implementation Phases + +### Phase 1: Internal extraction (no behavior regression) +- Extract Loop Engine from `Chat_Helper`. +- Add transport interface with polling implementation. +- Emit structured events. + +### Phase 2: Persistence + lock manager +- Extend/finalize `Agent_Session_Helper` + `Agent_Session_Store` for full session requirements. +- Extend/finalize `Agent_Run_Helper` + `Agent_Run_Store` for full run requirements. +- Use `Agent_Event_Helper` + `Agent_Event_Store` for append-only run/session events. +- Finalize lock/lease handling across stores. +- Ensure stale recovery and idempotency. + +### Phase 3: Time-sliced runner +- Implement Action Scheduler slice executor. +- Enforce time/step budgets. +- Re-enqueue until completion. + +### Phase 4: Spawn support + hardening +- Add spawn adapter/endpoint. +- Add retry/backoff/dead-letter policies. +- Add trigger-based policy profiles. + +### Phase 5: Streaming transport (future) +- Add `StreamingTransport` once provider supports deltas. +- Keep polling as fallback. +- No loop/state-machine rewrite required. + +--- + +## Acceptance Criteria + +- Chat uses shared loop runtime (no duplicated loop logic). +- Runner supports multi-slice execution with `in_progress` and `resume_cursor`. +- Background runs complete safely without long-lived HTTP requests. +- Per-session concurrency safety: no duplicate concurrent execution. +- Event persistence supports incremental UI polling. +- Destructive tool behavior is explicitly policy-controlled by trigger. +- Transport mode is pluggable (`polling` now, `streaming` later) without core loop rewrite. + +--- + +## Why this is worth doing + +- One agent brain across all transports/triggers. +- Clean async/autonomous execution path. +- Safe path to spawned parallel sessions. +- Lower long-term maintenance cost than duplicating chat logic in background adapters. diff --git a/docs/agent-loop-spec.md b/docs/agent-loop-spec.md deleted file mode 100644 index fae18cf..0000000 --- a/docs/agent-loop-spec.md +++ /dev/null @@ -1,201 +0,0 @@ -## Summary -Refactor current chat-bound agent execution into a reusable **Agent Loop Helper** (runtime service) that can be called from: - -1. current synchronous chat requests, -2. heartbeat/background jobs, -3. future agent spawning APIs. - -This should make agent execution transport-agnostic and enable true async/background runs without duplicating core loop logic. - ---- - -## Current State (as of now) - -### What exists -- Chat execution path currently drives model/tool loop: - - `includes/rest/class-chat-controller.php` → `Chat_Helper::generate_ai_reply()` - - Core loop currently in `includes/helpers/class-chat-helper.php` -- Heartbeat scheduler exists: - - `includes/class-heartbeat.php` - - Schedules `clawpress_heartbeat_tick` every 15 min using Action Scheduler - - Tick triggers: `do_action( 'clawpress_run_scheduled_tasks' )` - -### What is missing -- No consumer/handler currently attached to `clawpress_run_scheduled_tasks` in plugin code. -- No independent agent thread/session runtime (outside chat request path). -- No spawn manager / spawn endpoint that creates and runs separate agent threads. -- No async run coordinator (claim/lock/retry/failure lifecycle) for autonomous runs. - -Result: loop logic is effectively request-bound to chat right now. - ---- - -## Goal -Create a reusable **Agent Loop Helper** so one core loop can be used by multiple entry points (chat, heartbeat, spawning) with consistent behavior and policy. - ---- - -## Proposed Architecture - -### 1) Extract a core runtime service -Create e.g. `includes/helpers/class-agent-loop-helper.php` (name flexible) and move loop responsibilities out of `Chat_Helper`: - -- provider/model resolution -- context assembly + prompt prep -- model call -- tool-call loop (`MAX_TOOL_ROUNDS`, `MAX_TOOL_CALLS_PER_ROUND`) -- confirmation batching behavior -- context/token usage collection -- normalized result payload - -`Chat_Helper` should become an adapter that invokes this service. - ---- - -### 2) Define canonical request/response contracts -Introduce internal DTO-like arrays/classes: - -#### `TurnRequest` -- `thread_id` (or session id) -- `trigger` (`chat`, `heartbeat`, `spawned_agent`, etc.) -- `message` (optional for heartbeat-driven turns) -- `requesting_user_id` -- `execution_user_id` -- policy knobs: `allow_tools`, `require_confirmation`, limits/timeouts - -#### `TurnResult` -- `assistant_text` -- `tool_calls` trace -- `card` payload (optional) -- `status` (`success`, `requires_confirmation`, `error`, `timeout`) -- usage/context metadata -- optional `next_action` hint (`continue`, `stop`, `reschedule`) - -This contract is key to reusability across adapters. - ---- - -### 3) Separate state persistence from execution -Add explicit agent-thread/session state (rather than implicitly relying on chat history only). - -Minimum state needed: -- thread/session identity -- lifecycle status -- `last_run_at`, `next_run_at` -- lock/lease metadata (owner + expiry) -- failure/retry counters -- trigger metadata - -Storage can be CPT-based or custom table (table likely better for concurrency/locking). - ---- - -### 4) Add background runner adapter (heartbeat path) -Implement a consumer for `clawpress_run_scheduled_tasks` that: - -1. finds/claims runnable agent threads -2. acquires lock/lease -3. builds `TurnRequest` -4. calls Agent Loop Helper -5. persists run outputs/logs/state -6. schedules follow-up if required -7. releases lock - -This enables async operation without rewriting loop logic. - ---- - -### 5) Keep chat path synchronous but thin -`Chat_Controller` flow should be: -1. persist inbound user message -2. call Agent Loop Helper synchronously -3. persist assistant response/meta -4. return response - -No duplicated loop logic in chat layer. - ---- - -### 6) Add spawn entry point later as another adapter -Future spawn endpoint should: -- create a new thread/session record, -- seed initial context/message, -- enqueue first run via Action Scheduler, -- return spawned thread id. - -Spawn endpoint should not contain loop internals. - ---- - -## Policy & Safety Considerations - -### Confirmation/destructive tools by trigger -Define policy by trigger source: -- `chat`: current confirmation behavior acceptable -- `heartbeat` / `spawned`: likely deny or queue destructive calls by default -- optionally allow policy profiles per agent/thread - -### Runtime guardrails -- max wall time per run -- max tool calls per run -- bounded retries + exponential backoff -- dead-letter/failure terminal state after N failures -- idempotency key per run attempt - -### Concurrency controls -- at-most-one active run per thread/session (lock/lease) -- stale lock recovery -- avoid duplicate processing by concurrent scheduler invocations - ---- - -## Observability / Debuggability -Add structured run logging (reuse/extend action log): -- `run_id`, `thread_id`, trigger source -- tool trace + per-call status -- provider/model + token/context usage -- error classification + retry count -- final run outcome - -Without this, async failures will be hard to diagnose. - ---- - -## Suggested Implementation Phases - -### Phase 1: Internal refactor (no behavior change) -- Introduce Agent Loop Helper -- Move loop logic from `Chat_Helper` into helper -- Keep current chat behavior identical - -### Phase 2: Background execution wiring -- Implement `clawpress_run_scheduled_tasks` consumer -- Add minimal thread/run state + locking -- Run one background thread safely - -### Phase 3: Spawn support -- Add spawn API/service -- Create separate threads and schedule independent runs - -### Phase 4: Hardening -- retries/backoff/dead-letter -- policy profiles by trigger -- richer observability and admin inspection UI - ---- - -## Acceptance Criteria -- Chat uses Agent Loop Helper (no duplicated loop logic in chat layer). -- Heartbeat can run at least one agent thread asynchronously. -- Background runs are lock-safe (no duplicate concurrent turn execution per thread). -- Destructive tool behavior is explicitly policy-controlled per trigger. -- Run status/errors are inspectable via logs. - ---- - -## Why this is worth doing -This makes ClawPress extensible: -- one “agent brain,” many transports/triggers, -- clean path to autonomous agents, -- clean path to true spawned parallel threads, -- less tech debt than cloning chat logic into heartbeat/spawn flows. diff --git a/docs/agent-run-helper.md b/docs/agent-run-helper.md new file mode 100644 index 0000000..802cf03 --- /dev/null +++ b/docs/agent-run-helper.md @@ -0,0 +1,83 @@ +# Agent Run Helper + +`Agent_Run_Helper` manages run lifecycle and lock semantics for background agent execution. + +- Class: `ClawPress\Helpers\Agent_Run_Helper` +- DB store behind it: `ClawPress\Stores\Agent_Run_Store` + +Use this helper from workers/executors. It handles claim rules, stale lock reclaim, completion, and session rollup orchestration. + +## Common Usage + +```php +use ClawPress\Helpers\Agent_Run_Helper; + +$run_helper = Agent_Run_Helper::get_instance(); + +$run_id = $run_helper->create_run( $session_id ); +$claim = $run_helper->claim_run( $run_id, 'worker-a', 120 ); + +if ( ! empty( $claim['claimed'] ) ) { + $run_helper->complete_run( + $run_id, + (string) $claim['lock_token'], + 'success', + [ + 'meta' => [ 'tools' => 3 ], + 'next_run_at_gmt' => null, + ] + ); +} +``` + +## API + +### `get_instance(): Agent_Run_Helper` +Singleton accessor. + +### `create_run( int $session_id ): int` +Creates a queued run row and returns the run ID (or `0` on failure). + +### `claim_run( int $run_id, string $worker_id, int $lease_ttl_seconds = 120 ): array` +Attempts to claim a queued run (or reclaim a stale running lock). + +Success payload includes: + +- `claimed` (`true`) +- `run_id` +- `lock_token` +- `attempt` +- `reclaimed` (`true` when stale lock was reclaimed) + +Failure payload includes `claimed => false` and a `reason`: + +- `run_not_found` +- `not_claimable` +- `claim_collision` + +### `complete_run( int $run_id, string $lock_token, string $status, array $args = [] ): bool` +Completes a claimed run and updates session rollup state in one transaction. + +Allowed terminal statuses: + +- `success` +- `failed` +- `cancelled` +- `canceled` + +Supported `$args` keys: + +- `error_code` (string|null) +- `error_message` (string|null) +- `meta` (array; JSON-encoded) +- `next_run_at_gmt` (string|null; passed to session helper) + +### `get_run( int $run_id ): array` +Returns a run row as an associative array, or `[]` when not found. + +## Notes + +- `complete_run()` enforces lock-token ownership before writing completion state. +- Completion rolls back if run or session update fails, preventing partial state. +- For new behavior, put workflow logic in the helper and keep SQL-only logic in the store. +- Schema/table management for runs is owned by `Agent_Run_Store` and called from plugin activation. diff --git a/docs/agent-session-helper.md b/docs/agent-session-helper.md new file mode 100644 index 0000000..f894b23 --- /dev/null +++ b/docs/agent-session-helper.md @@ -0,0 +1,63 @@ +# Agent Session Helper + +`Agent_Session_Helper` manages session-level lifecycle state for the agent runtime. + +- Class: `ClawPress\Helpers\Agent_Session_Helper` +- DB store behind it: `ClawPress\Stores\Agent_Session_Store` + +Use this helper for session creation and run-completion rollups. It encapsulates defaults and delegates persistence to the store. + +## Common Usage + +```php +use ClawPress\Helpers\Agent_Session_Helper; + +$session_helper = Agent_Session_Helper::get_instance(); + +$session_id = $session_helper->create_session( + [ + 'trigger_type' => 'chat', + 'requesting_user_id' => get_current_user_id(), + 'execution_user_id' => 123, + 'policy_profile' => 'default', + ] +); +``` + +## API + +### `get_instance(): Agent_Session_Helper` +Singleton accessor. + +### `create_session( array $args = [] ): int` +Creates one session row and returns the session ID (or `0` on failure). + +Supported `$args` keys: + +- `uuid` (string; auto-generated when omitted) +- `status` (string; default `active`) +- `trigger_type` (string; default `chat`) +- `requesting_user_id` (int|null) +- `execution_user_id` (int|null) +- `policy_profile` (string|null) +- `next_run_at_gmt` (string|null) + +### `apply_run_completion( int $session_id, string $run_status, ?string $next_run_at_gmt = null ): bool` +Updates session rollup fields after a run completes: + +- `last_run_at_gmt` +- `last_run_status` +- `next_run_at_gmt` +- `updated_at_gmt` +- `consecutive_failures` + +Failure counter behavior: + +- resets to `0` when `$run_status === 'success'` +- increments by `1` for all other statuses + +## Notes + +- This helper is intended to be called by `Agent_Run_Helper` after run completion. +- Session persistence details are isolated in `includes/stores/class-agent-session-store.php`. +- Schema/table management for sessions is owned by `Agent_Session_Store` and called from plugin activation. diff --git a/includes/class-plugin.php b/includes/class-plugin.php index e398fc0..cba76e8 100644 --- a/includes/class-plugin.php +++ b/includes/class-plugin.php @@ -12,11 +12,14 @@ use ClawPress\Abilities\Abilities; use ClawPress\AdminPage\Admin_Page; use ClawPress\Heartbeat\Heartbeat; -use ClawPress\Helpers\Action_Log_Helper; use ClawPress\Helpers\Panel_Helper; use ClawPress\Panel\Panel; use ClawPress\PostTypes\Post_Types; use ClawPress\RestAPI\Rest_API; +use ClawPress\Stores\Action_Log_Store; +use ClawPress\Stores\Agent_Event_Store; +use ClawPress\Stores\Agent_Run_Store; +use ClawPress\Stores\Agent_Session_Store; defined( 'ABSPATH' ) || exit; @@ -60,7 +63,10 @@ public static function get_instance(): self { * Plugin activation callback. */ public static function activate(): void { - Action_Log_Helper::get_instance()->create_table(); + Action_Log_Store::get_instance()->create_table(); + Agent_Session_Store::get_instance()->create_table(); + Agent_Run_Store::get_instance()->create_table(); + Agent_Event_Store::get_instance()->create_table(); $user_id = get_current_user_id(); if ( $user_id <= 0 ) { diff --git a/includes/helpers/class-abilities-helper.php b/includes/helpers/class-abilities-helper.php index 998a592..47452b2 100644 --- a/includes/helpers/class-abilities-helper.php +++ b/includes/helpers/class-abilities-helper.php @@ -58,11 +58,11 @@ final class Abilities_Helper { private Security $security; /** - * Action log helper. + * Agent event helper. * - * @var Action_Log_Helper + * @var Agent_Event_Helper */ - private Action_Log_Helper $action_log_helper; + private Agent_Event_Helper $agent_event_helper; /** * Policy helper. @@ -75,10 +75,10 @@ final class Abilities_Helper { * Constructor. */ private function __construct() { - $this->settings_helper = Settings_Helper::get_instance(); - $this->security = Security::get_instance(); - $this->action_log_helper = Action_Log_Helper::get_instance(); - $this->policy_helper = Policy_Helper::get_instance(); + $this->settings_helper = Settings_Helper::get_instance(); + $this->security = Security::get_instance(); + $this->agent_event_helper = Agent_Event_Helper::get_instance(); + $this->policy_helper = Policy_Helper::get_instance(); } /** @@ -202,6 +202,10 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e $allowed_confirmation_tokens = $this->normalize_allowed_confirmation_tokens( $execution_context['allowed_confirmation_tokens'] ?? null ); + $event_context = [ + 'session_id' => isset( $execution_context['session_id'] ) ? (int) $execution_context['session_id'] : 0, + 'run_id' => isset( $execution_context['run_id'] ) ? (int) $execution_context['run_id'] : 0, + ]; $trigger_type = isset( $execution_context['trigger_type'] ) ? (string) $execution_context['trigger_type'] : 'chat'; @@ -213,8 +217,8 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e ? $execution_context['session_metadata'] : [], isset( $execution_context['policy_overrides'] ) && is_array( $execution_context['policy_overrides'] ) - ? $execution_context['policy_overrides'] - : [] + ? $execution_context['policy_overrides'] + : [] ); $args_json = wp_json_encode( $args ); @@ -229,7 +233,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e ], 'tool' => $normalized_tool_name, ]; - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload, $event_context ); return $payload; } @@ -243,7 +247,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e 'tool' => $normalized_tool_name, 'ability' => $ability_name, ]; - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload, $event_context ); return $payload; } @@ -258,7 +262,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e 'tool' => $normalized_tool_name, 'ability' => $ability_name, ]; - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload, $event_context ); return $payload; } @@ -273,7 +277,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e 'tool' => $normalized_tool_name, 'ability' => $ability_name, ]; - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload, $event_context ); return $payload; } @@ -290,7 +294,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e $runtime_policy, 'deny_tools' ); - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload, $event_context ); return $payload; } @@ -304,7 +308,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e $runtime_policy, 'deny_destructive_tools' ); - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload, $event_context ); return $payload; } @@ -318,7 +322,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e $runtime_policy, 'deny_file_delete' ); - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload, $event_context ); return $payload; } @@ -335,7 +339,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e 'ability' => $ability_name, 'safety_class' => $safety_class, ]; - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload, $event_context ); return $payload; } @@ -362,7 +366,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e 'ability' => $ability_name, 'safety_class' => $safety_class, ]; - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'warning', $args_hash, $payload, $event_context ); return $payload; } } @@ -385,7 +389,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e 'ability' => $ability_name, 'safety_class' => $safety_class, ]; - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'error', $args_hash, $payload, $event_context ); return $payload; } @@ -397,7 +401,7 @@ public function execute_tool_call( string $tool_name, $raw_args = null, array $e 'result' => $result, ]; - $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'success', $args_hash, $payload ); + $this->log_tool_call( $normalized_tool_name, $ability_name, $requesting_user_id, $execution_user_id, 'success', $args_hash, $payload, $event_context ); return $payload; } @@ -586,7 +590,7 @@ private function build_policy_violation_payload( } /** - * Write one tool-call action ledger row. + * Emit one tool-call event row. * * @param string $tool_name Tool name. * @param string $ability_name Ability ID. @@ -595,6 +599,7 @@ private function build_policy_violation_payload( * @param string $status Log status. * @param string $args_hash Hash of arguments. * @param array $payload Tool payload. + * @param array $event_context Optional run/session context. */ private function log_tool_call( string $tool_name, @@ -603,27 +608,18 @@ private function log_tool_call( int $execution_user_id, string $status, string $args_hash, - array $payload + array $payload, + array $event_context ): void { - $this->action_log_helper->log_event( - 'tool/' . $tool_name, - [ - 'event_type' => 'tool_call', - 'status' => $status, - 'message' => isset( $payload['error']['message'] ) - ? (string) $payload['error']['message'] - : __( 'Tool execution completed.', 'clawpress' ), - 'requesting_user_id' => $requesting_user_id > 0 ? $requesting_user_id : null, - 'execution_user_id' => $execution_user_id > 0 ? $execution_user_id : null, - 'context' => [ - 'tool_name' => $tool_name, - 'ability_name' => $ability_name, - 'args_hash' => $args_hash, - 'success' => ! empty( $payload['success'] ), - 'result' => isset( $payload['result'] ) ? $payload['result'] : null, - 'error' => isset( $payload['error'] ) ? $payload['error'] : null, - ], - ] + $this->agent_event_helper->emit_tool_call( + $tool_name, + $ability_name, + $requesting_user_id, + $execution_user_id, + $status, + $args_hash, + $payload, + $event_context ); } } diff --git a/includes/helpers/class-action-log-helper.php b/includes/helpers/class-action-log-helper.php index 799a782..c10c27f 100644 --- a/includes/helpers/class-action-log-helper.php +++ b/includes/helpers/class-action-log-helper.php @@ -9,17 +9,14 @@ namespace ClawPress\Helpers; +use ClawPress\Stores\Action_Log_Store; + defined( 'ABSPATH' ) || exit; /** * Central action log helper for writing/querying action/event records. */ final class Action_Log_Helper { - /** - * Database table suffix. - */ - private const TABLE_SUFFIX = 'clawpress_action_logs'; - /** * Supported log status values. * @@ -34,10 +31,19 @@ final class Action_Log_Helper { */ private static ?self $instance = null; + /** + * Store instance for DB access. + * + * @var Action_Log_Store + */ + private Action_Log_Store $store; + /** * Constructor. */ - private function __construct() {} + private function __construct() { + $this->store = Action_Log_Store::get_instance(); + } /** * Get singleton instance. @@ -50,65 +56,6 @@ public static function get_instance(): self { return self::$instance; } - /** - * Resolve full action log table name. - */ - public function get_table_name(): string { - global $wpdb; - - if ( ! $this->is_wpdb_ready( $wpdb ) ) { - return self::TABLE_SUFFIX; - } - - return $wpdb->prefix . self::TABLE_SUFFIX; - } - - /** - * Create/update action log table schema. - */ - public function create_table(): bool { - global $wpdb; - - if ( ! $this->is_wpdb_ready( $wpdb ) ) { - return false; - } - - $table_name = $this->get_table_name(); - $charset_collate = method_exists( $wpdb, 'get_charset_collate' ) - ? (string) $wpdb->get_charset_collate() - : ''; - - $sql = "CREATE TABLE {$table_name} ( - id bigint(20) unsigned NOT NULL AUTO_INCREMENT, - event_type varchar(64) NOT NULL DEFAULT 'event', - action_name varchar(191) NOT NULL, - status varchar(20) NOT NULL DEFAULT 'info', - message text NULL, - requesting_user_id bigint(20) unsigned NULL, - execution_user_id bigint(20) unsigned NULL, - context longtext NULL, - created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (id), - KEY event_type (event_type), - KEY action_name (action_name), - KEY status (status), - KEY requesting_user_id (requesting_user_id), - KEY execution_user_id (execution_user_id), - KEY created_at (created_at) - ) {$charset_collate};"; - - if ( ! function_exists( 'dbDelta' ) ) { - require_once ABSPATH . 'wp-admin/includes/upgrade.php'; - } - - if ( ! function_exists( 'dbDelta' ) ) { - return false; - } - - dbDelta( $sql ); - return true; - } - /** * Persist one action/event log record. * @@ -116,12 +63,6 @@ public function create_table(): bool { * @param array $args Optional log payload. */ public function log_event( string $action_name, array $args = [] ): bool { - global $wpdb; - - if ( ! $this->is_wpdb_ready( $wpdb ) || ! method_exists( $wpdb, 'insert' ) ) { - return false; - } - $normalized_action_name = $this->sanitize_action_name( $action_name ); if ( '' === $normalized_action_name ) { return false; @@ -156,9 +97,7 @@ public function log_event( string $action_name, array $args = [] ): bool { } } - // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery -- centralized logging insert for plugin action ledger. - $inserted = $wpdb->insert( - $this->get_table_name(), + return $this->store->insert_log( [ 'event_type' => $event_type, 'action_name' => $normalized_action_name, @@ -167,19 +106,8 @@ public function log_event( string $action_name, array $args = [] ): bool { 'requesting_user_id' => $requesting_user_id > 0 ? $requesting_user_id : null, 'execution_user_id' => $execution_user_id > 0 ? $execution_user_id : null, 'context' => $encoded_context, - ], - [ - '%s', - '%s', - '%s', - '%s', - '%d', - '%d', - '%s', ] ); - - return false !== $inserted; } /** @@ -189,68 +117,39 @@ public function log_event( string $action_name, array $args = [] ): bool { * @return array> */ public function get_recent_logs( array $args = [] ): array { - global $wpdb; - - if ( ! $this->is_wpdb_ready( $wpdb ) || ! method_exists( $wpdb, 'prepare' ) || ! method_exists( $wpdb, 'get_results' ) ) { - return []; - } - $limit = isset( $args['limit'] ) ? (int) $args['limit'] : 50; $offset = isset( $args['offset'] ) ? (int) $args['offset'] : 0; $limit = $limit > 0 ? min( $limit, 500 ) : 50; $offset = $offset >= 0 ? $offset : 0; - $where_clauses = []; - $where_values = []; + $query_args = [ + 'limit' => $limit, + 'offset' => $offset, + ]; if ( isset( $args['event_type'] ) ) { $event_type = $this->sanitize_event_type( (string) $args['event_type'] ); if ( '' !== $event_type ) { - $where_clauses[] = 'event_type = %s'; - $where_values[] = $event_type; + $query_args['event_type'] = $event_type; } } if ( isset( $args['status'] ) ) { $status = $this->sanitize_status( (string) $args['status'] ); if ( '' !== $status ) { - $where_clauses[] = 'status = %s'; - $where_values[] = $status; + $query_args['status'] = $status; } } if ( isset( $args['requesting_user_id'] ) && (int) $args['requesting_user_id'] > 0 ) { - $where_clauses[] = 'requesting_user_id = %d'; - $where_values[] = (int) $args['requesting_user_id']; + $query_args['requesting_user_id'] = (int) $args['requesting_user_id']; } if ( isset( $args['execution_user_id'] ) && (int) $args['execution_user_id'] > 0 ) { - $where_clauses[] = 'execution_user_id = %d'; - $where_values[] = (int) $args['execution_user_id']; - } - - $where_sql = [] !== $where_clauses ? 'WHERE ' . implode( ' AND ', $where_clauses ) : ''; - $where_values[] = $limit; - $where_values[] = $offset; - - // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared -- table name is fixed plugin-owned identifier. - $query = "SELECT id, event_type, action_name, status, message, requesting_user_id, execution_user_id, context, created_at - FROM {$this->get_table_name()} - {$where_sql} - ORDER BY id DESC - LIMIT %d OFFSET %d"; - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- query is prepared via `$wpdb->prepare()` on this line. - $prepared_query = $wpdb->prepare( $query, $where_values ); - if ( ! is_string( $prepared_query ) || '' === $prepared_query ) { - return []; + $query_args['execution_user_id'] = (int) $args['execution_user_id']; } - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared,WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- log queries are intentional and already bounded. - $rows = $wpdb->get_results( $prepared_query, 'ARRAY_A' ); - if ( ! is_array( $rows ) ) { - return []; - } + $rows = $this->store->get_recent_logs( $query_args ); return array_values( array_map( [ $this, 'normalize_log_row' ], $rows ) @@ -325,13 +224,4 @@ private function sanitize_status( string $status ): string { return 'info'; } - - /** - * Check whether a usable `$wpdb` object is present. - * - * @param mixed $wpdb Candidate wpdb object. - */ - private function is_wpdb_ready( $wpdb ): bool { - return is_object( $wpdb ) && isset( $wpdb->prefix ); - } } diff --git a/includes/helpers/class-agent-event-helper.php b/includes/helpers/class-agent-event-helper.php new file mode 100644 index 0000000..dd2cae4 --- /dev/null +++ b/includes/helpers/class-agent-event-helper.php @@ -0,0 +1,227 @@ +store = Agent_Event_Store::get_instance(); + } + + /** + * Get singleton instance. + */ + public static function get_instance(): self { + if ( null === self::$instance ) { + self::$instance = new self(); + } + + return self::$instance; + } + + /** + * Emit one append-only event. + * + * @param string $event_type Event type name. + * @param array $args Optional event payload. + */ + public function emit( string $event_type, array $args = [] ): int { + $normalized_event_type = $this->sanitize_event_type( $event_type ); + if ( '' === $normalized_event_type ) { + return 0; + } + + $payload = isset( $args['payload'] ) && is_array( $args['payload'] ) + ? $args['payload'] + : []; + $encoded_payload = null; + + if ( [] !== $payload ) { + $payload_json = wp_json_encode( $payload ); + if ( false !== $payload_json ) { + $encoded_payload = $payload_json; + } + } + + $run_id = isset( $args['run_id'] ) ? (int) $args['run_id'] : 0; + $session_id = isset( $args['session_id'] ) ? (int) $args['session_id'] : 0; + + return $this->store->insert_event( + [ + 'run_id' => $run_id > 0 ? $run_id : null, + 'session_id' => $session_id > 0 ? $session_id : null, + 'event_type' => $normalized_event_type, + 'payload_json' => $encoded_payload, + 'created_at_gmt' => isset( $args['created_at_gmt'] ) ? (string) $args['created_at_gmt'] : gmdate( 'Y-m-d H:i:s' ), + ] + ); + } + + /** + * Emit a standardized tool-call event. + * + * @param string $tool_name Tool name. + * @param string $ability_name Ability name. + * @param int $requesting_user_id Requesting user ID. + * @param int $execution_user_id Execution user ID. + * @param string $status Tool execution status. + * @param string $args_hash Hash of tool arguments. + * @param array $payload Tool result payload. + * @param array $context Optional event context. + */ + public function emit_tool_call( + string $tool_name, + string $ability_name, + int $requesting_user_id, + int $execution_user_id, + string $status, + string $args_hash, + array $payload, + array $context = [] + ): int { + return $this->emit( + 'tool_call', + [ + 'run_id' => isset( $context['run_id'] ) ? (int) $context['run_id'] : 0, + 'session_id' => isset( $context['session_id'] ) ? (int) $context['session_id'] : 0, + 'payload' => [ + 'tool_name' => $tool_name, + 'ability_name' => $ability_name, + 'status' => strtolower( trim( sanitize_text_field( $status ) ) ), + 'args_hash' => $args_hash, + 'success' => ! empty( $payload['success'] ), + 'result' => $payload['result'] ?? null, + 'error' => $payload['error'] ?? null, + 'requesting_user_id' => $requesting_user_id > 0 ? $requesting_user_id : null, + 'execution_user_id' => $execution_user_id > 0 ? $execution_user_id : null, + ], + ] + ); + } + + /** + * Get run-scoped incremental events. + * + * @param int $run_id Run identifier. + * @param int $after_event_id Cursor of last delivered event. + * @param int $limit Maximum rows to return. + * @return array> + */ + public function get_run_events( int $run_id, int $after_event_id = 0, int $limit = 100 ): array { + if ( $run_id <= 0 ) { + return []; + } + + return $this->normalize_event_rows( + $this->store->get_events( + [ + 'run_id' => $run_id, + 'after_event_id' => $after_event_id, + 'limit' => $limit, + ] + ) + ); + } + + /** + * Get session-scoped incremental events. + * + * @param int $session_id Session identifier. + * @param int $after_event_id Cursor of last delivered event. + * @param int $limit Maximum rows to return. + * @return array> + */ + public function get_session_events( int $session_id, int $after_event_id = 0, int $limit = 100 ): array { + if ( $session_id <= 0 ) { + return []; + } + + return $this->normalize_event_rows( + $this->store->get_events( + [ + 'session_id' => $session_id, + 'after_event_id' => $after_event_id, + 'limit' => $limit, + ] + ) + ); + } + + /** + * Normalize one event row. + * + * @param array $row DB row. + * @return array + */ + private function normalize_event_row( array $row ): array { + $payload = []; + if ( isset( $row['payload_json'] ) && is_string( $row['payload_json'] ) && '' !== trim( $row['payload_json'] ) ) { + $decoded = json_decode( $row['payload_json'], true ); + if ( is_array( $decoded ) ) { + $payload = $decoded; + } + } + + return [ + 'event_id' => isset( $row['id'] ) ? (int) $row['id'] : 0, + 'run_id' => isset( $row['run_id'] ) && null !== $row['run_id'] ? (int) $row['run_id'] : null, + 'session_id' => isset( $row['session_id'] ) && null !== $row['session_id'] ? (int) $row['session_id'] : null, + 'event_type' => isset( $row['event_type'] ) ? (string) $row['event_type'] : 'event', + 'payload' => $payload, + 'created_at_gmt' => isset( $row['created_at_gmt'] ) ? (string) $row['created_at_gmt'] : '', + ]; + } + + /** + * Normalize DB rows into API-facing event payloads. + * + * @param array> $rows Raw DB rows. + * @return array> + */ + private function normalize_event_rows( array $rows ): array { + return array_values( + array_map( [ $this, 'normalize_event_row' ], $rows ) + ); + } + + /** + * Normalize event type. + * + * @param string $event_type Raw event type. + */ + private function sanitize_event_type( string $event_type ): string { + $event_type = strtolower( trim( sanitize_text_field( $event_type ) ) ); + $event_type = preg_replace( '/[^a-z0-9._:-]/', '', $event_type ); + return '' !== (string) $event_type ? (string) $event_type : ''; + } +} diff --git a/includes/helpers/class-agent-run-helper.php b/includes/helpers/class-agent-run-helper.php new file mode 100644 index 0000000..cf74e32 --- /dev/null +++ b/includes/helpers/class-agent-run-helper.php @@ -0,0 +1,236 @@ + + */ + private const TERMINAL_STATUSES = [ 'success', 'failed', 'cancelled', 'canceled' ]; + + /** + * Singleton instance. + * + * @var ?self + */ + private static ?self $instance = null; + + /** + * Run store instance for DB access. + * + * @var Agent_Run_Store + */ + private Agent_Run_Store $store; + + /** + * Constructor. + */ + private function __construct() { + $this->store = Agent_Run_Store::get_instance(); + } + + /** + * Get singleton instance. + */ + public static function get_instance(): self { + if ( null === self::$instance ) { + self::$instance = new self(); + } + + return self::$instance; + } + + /** + * Create a queued run. + * + * @param int $session_id Parent session identifier. + */ + public function create_run( int $session_id ): int { + $now = gmdate( 'Y-m-d H:i:s' ); + return $this->store->insert_run( $session_id, $this->generate_uuid(), $now ); + } + + /** + * Claim one run for a worker. + * + * @param int $run_id Run identifier. + * @param string $worker_id Worker claim id. + * @param int $lease_ttl_seconds Lease TTL in seconds. + * @return array + */ + public function claim_run( int $run_id, string $worker_id, int $lease_ttl_seconds = 120 ): array { + $run = $this->get_run( $run_id ); + if ( [] === $run ) { + return [ + 'claimed' => false, + 'reason' => 'run_not_found', + ]; + } + + $now = gmdate( 'Y-m-d H:i:s' ); + $lock_token = hash( 'sha256', uniqid( $worker_id . ':', true ) ); + $expires_at = gmdate( 'Y-m-d H:i:s', strtotime( $now ) + max( 1, $lease_ttl_seconds ) ); + + $is_stale = 'running' === (string) $run['status'] + && isset( $run['lock_expires_at_gmt'] ) + && is_string( $run['lock_expires_at_gmt'] ) + && '' !== $run['lock_expires_at_gmt'] + && strtotime( $run['lock_expires_at_gmt'] ) < strtotime( $now ); + + if ( 'queued' !== (string) $run['status'] && ! $is_stale ) { + return [ + 'claimed' => false, + 'reason' => 'not_claimable', + ]; + } + + $next_attempt = (int) ( $run['attempt'] ?? 1 ); + if ( $is_stale ) { + ++$next_attempt; + } + + $updated = $this->store->update_claim( + $run_id, + (string) $run['status'], + $is_stale ? (string) $run['lock_expires_at_gmt'] : null, + [ + 'status' => 'running', + 'claimed_by' => $worker_id, + 'lock_token' => $lock_token, + 'lock_acquired_at_gmt' => $now, + 'lock_expires_at_gmt' => $expires_at, + 'attempt' => $next_attempt, + 'started_at_gmt' => $now, + 'updated_at_gmt' => $now, + ], + $is_stale + ); + + if ( false === $updated || 0 === $updated ) { + return [ + 'claimed' => false, + 'reason' => 'claim_collision', + ]; + } + + return [ + 'claimed' => true, + 'run_id' => $run_id, + 'lock_token' => $lock_token, + 'attempt' => $next_attempt, + 'reclaimed' => $is_stale, + ]; + } + + /** + * Complete a run and update parent session state. + * + * @param int $run_id Run identifier. + * @param string $lock_token Lock token from claim response. + * @param string $status Terminal status. + * @param array $args Optional completion details. + */ + public function complete_run( int $run_id, string $lock_token, string $status, array $args = [] ): bool { + if ( ! in_array( $status, self::TERMINAL_STATUSES, true ) ) { + return false; + } + + if ( ! $this->store->begin_transaction() ) { + return false; + } + + $run = $this->get_run( $run_id ); + if ( [] === $run || 'running' !== (string) $run['status'] ) { + $this->store->rollback_transaction(); + return false; + } + + if ( (string) ( $run['lock_token'] ?? '' ) !== $lock_token ) { + $this->store->rollback_transaction(); + return false; + } + + $meta_json = null; + if ( isset( $args['meta'] ) && is_array( $args['meta'] ) ) { + $encoded = wp_json_encode( $args['meta'] ); + $meta_json = false === $encoded ? null : $encoded; + } + + $updated = $this->store->update_completion( + $run_id, + $lock_token, + [ + 'status' => $status, + 'finished_at_gmt' => gmdate( 'Y-m-d H:i:s' ), + 'error_code' => isset( $args['error_code'] ) ? (string) $args['error_code'] : null, + 'error_message' => isset( $args['error_message'] ) ? (string) $args['error_message'] : null, + 'meta_json' => $meta_json, + 'updated_at_gmt' => gmdate( 'Y-m-d H:i:s' ), + ] + ); + + if ( false === $updated || 0 === $updated ) { + $this->store->rollback_transaction(); + return false; + } + + $session_updated = Agent_Session_Helper::get_instance()->apply_run_completion( + (int) $run['session_id'], + $status, + isset( $args['next_run_at_gmt'] ) ? (string) $args['next_run_at_gmt'] : null + ); + + if ( ! $session_updated ) { + $this->store->rollback_transaction(); + return false; + } + + if ( ! $this->store->commit_transaction() ) { + $this->store->rollback_transaction(); + return false; + } + + return true; + } + + /** + * Fetch one run by id. + * + * @param int $run_id Run identifier. + * @return array + */ + public function get_run( int $run_id ): array { + return $this->store->get_run( $run_id ); + } + + /** + * Generate uuid-like id without WP dependency. + */ + private function generate_uuid(): string { + $seed = md5( uniqid( '', true ) ); + return sprintf( + '%s-%s-%s-%s-%s', + substr( $seed, 0, 8 ), + substr( $seed, 8, 4 ), + substr( $seed, 12, 4 ), + substr( $seed, 16, 4 ), + substr( $seed, 20, 12 ) + ); + } +} diff --git a/includes/helpers/class-agent-session-helper.php b/includes/helpers/class-agent-session-helper.php new file mode 100644 index 0000000..3946a2e --- /dev/null +++ b/includes/helpers/class-agent-session-helper.php @@ -0,0 +1,108 @@ +store = Agent_Session_Store::get_instance(); + } + + /** + * Get singleton instance. + */ + public static function get_instance(): self { + if ( null === self::$instance ) { + self::$instance = new self(); + } + + return self::$instance; + } + + /** + * Create one session row. + * + * @param array $args Session payload. + */ + public function create_session( array $args = [] ): int { + $now = gmdate( 'Y-m-d H:i:s' ); + + return $this->store->insert_session( + [ + 'uuid' => isset( $args['uuid'] ) ? (string) $args['uuid'] : $this->generate_uuid(), + 'status' => isset( $args['status'] ) ? (string) $args['status'] : 'active', + 'trigger_type' => isset( $args['trigger_type'] ) ? (string) $args['trigger_type'] : 'chat', + 'requesting_user_id' => isset( $args['requesting_user_id'] ) ? (int) $args['requesting_user_id'] : null, + 'execution_user_id' => isset( $args['execution_user_id'] ) ? (int) $args['execution_user_id'] : null, + 'policy_profile' => isset( $args['policy_profile'] ) ? (string) $args['policy_profile'] : null, + 'last_run_at_gmt' => null, + 'next_run_at_gmt' => isset( $args['next_run_at_gmt'] ) ? (string) $args['next_run_at_gmt'] : null, + 'last_run_status' => null, + 'consecutive_failures' => 0, + 'created_at_gmt' => $now, + 'updated_at_gmt' => $now, + ] + ); + } + + /** + * Update parent session state after run completion. + * + * @param int $session_id Session identifier. + * @param string $run_status Terminal run status. + * @param string|null $next_run_at_gmt Optional next run timestamp. + */ + public function apply_run_completion( int $session_id, string $run_status, ?string $next_run_at_gmt = null ): bool { + return $this->store->update_run_completion( + $session_id, + $run_status, + $next_run_at_gmt, + gmdate( 'Y-m-d H:i:s' ) + ); + } + + /** + * Generate uuid-like id without WP dependency. + */ + private function generate_uuid(): string { + $seed = md5( uniqid( '', true ) ); + return sprintf( + '%s-%s-%s-%s-%s', + substr( $seed, 0, 8 ), + substr( $seed, 8, 4 ), + substr( $seed, 12, 4 ), + substr( $seed, 16, 4 ), + substr( $seed, 20, 12 ) + ); + } +} diff --git a/includes/stores/class-action-log-store.php b/includes/stores/class-action-log-store.php new file mode 100644 index 0000000..e658523 --- /dev/null +++ b/includes/stores/class-action-log-store.php @@ -0,0 +1,214 @@ +is_wpdb_ready( $wpdb ) ) { + return self::TABLE_SUFFIX; + } + + return $wpdb->prefix . self::TABLE_SUFFIX; + } + + /** + * Create/update action log table schema. + */ + public function create_table(): bool { + global $wpdb; + + if ( ! $this->is_wpdb_ready( $wpdb ) ) { + return false; + } + + $table_name = $this->get_table_name(); + $charset_collate = method_exists( $wpdb, 'get_charset_collate' ) + ? (string) $wpdb->get_charset_collate() + : ''; + + $sql = "CREATE TABLE {$table_name} ( + id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + event_type varchar(64) NOT NULL DEFAULT 'event', + action_name varchar(191) NOT NULL, + status varchar(20) NOT NULL DEFAULT 'info', + message text NULL, + requesting_user_id bigint(20) unsigned NULL, + execution_user_id bigint(20) unsigned NULL, + context longtext NULL, + created_at datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (id), + KEY event_type (event_type), + KEY action_name (action_name), + KEY status (status), + KEY requesting_user_id (requesting_user_id), + KEY execution_user_id (execution_user_id), + KEY created_at (created_at) + ) {$charset_collate};"; + + if ( ! function_exists( 'dbDelta' ) ) { + require_once ABSPATH . 'wp-admin/includes/upgrade.php'; + } + + if ( ! function_exists( 'dbDelta' ) ) { + return false; + } + + dbDelta( $sql ); + return true; + } + + /** + * Persist one action/event log record. + * + * @param array $data Insert payload. + */ + public function insert_log( array $data ): bool { + global $wpdb; + + if ( ! $this->is_wpdb_ready( $wpdb ) || ! method_exists( $wpdb, 'insert' ) ) { + return false; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery -- centralized logging insert for plugin action ledger. + $inserted = $wpdb->insert( + $this->get_table_name(), + [ + 'event_type' => isset( $data['event_type'] ) ? (string) $data['event_type'] : 'event', + 'action_name' => isset( $data['action_name'] ) ? (string) $data['action_name'] : '', + 'status' => isset( $data['status'] ) ? (string) $data['status'] : 'info', + 'message' => $data['message'] ?? null, + 'requesting_user_id' => $data['requesting_user_id'] ?? null, + 'execution_user_id' => $data['execution_user_id'] ?? null, + 'context' => $data['context'] ?? null, + ], + [ + '%s', + '%s', + '%s', + '%s', + '%d', + '%d', + '%s', + ] + ); + + return false !== $inserted; + } + + /** + * Fetch recent log rows. + * + * @param array $args Query filters. + * @return array> + */ + public function get_recent_logs( array $args = [] ): array { + global $wpdb; + + if ( ! $this->is_wpdb_ready( $wpdb ) || ! method_exists( $wpdb, 'prepare' ) || ! method_exists( $wpdb, 'get_results' ) ) { + return []; + } + + $limit = isset( $args['limit'] ) ? (int) $args['limit'] : 50; + $offset = isset( $args['offset'] ) ? (int) $args['offset'] : 0; + $limit = $limit > 0 ? min( $limit, 500 ) : 50; + $offset = $offset >= 0 ? $offset : 0; + + $where_clauses = []; + $where_values = []; + + if ( isset( $args['event_type'] ) && '' !== (string) $args['event_type'] ) { + $where_clauses[] = 'event_type = %s'; + $where_values[] = (string) $args['event_type']; + } + + if ( isset( $args['status'] ) && '' !== (string) $args['status'] ) { + $where_clauses[] = 'status = %s'; + $where_values[] = (string) $args['status']; + } + + if ( isset( $args['requesting_user_id'] ) && (int) $args['requesting_user_id'] > 0 ) { + $where_clauses[] = 'requesting_user_id = %d'; + $where_values[] = (int) $args['requesting_user_id']; + } + + if ( isset( $args['execution_user_id'] ) && (int) $args['execution_user_id'] > 0 ) { + $where_clauses[] = 'execution_user_id = %d'; + $where_values[] = (int) $args['execution_user_id']; + } + + $where_sql = [] !== $where_clauses ? 'WHERE ' . implode( ' AND ', $where_clauses ) : ''; + $where_values[] = $limit; + $where_values[] = $offset; + + // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared -- table name is fixed plugin-owned identifier. + $query = "SELECT id, event_type, action_name, status, message, requesting_user_id, execution_user_id, context, created_at + FROM {$this->get_table_name()} + {$where_sql} + ORDER BY id DESC + LIMIT %d OFFSET %d"; + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- query is prepared via `$wpdb->prepare()` on this line. + $prepared_query = $wpdb->prepare( $query, $where_values ); + if ( ! is_string( $prepared_query ) || '' === $prepared_query ) { + return []; + } + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared,WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- log queries are intentional and already bounded. + $rows = $wpdb->get_results( $prepared_query, 'ARRAY_A' ); + return is_array( $rows ) ? array_values( $rows ) : []; + } + + /** + * Check whether a usable `$wpdb` object is present. + * + * @param mixed $wpdb Candidate wpdb object. + */ + private function is_wpdb_ready( $wpdb ): bool { + return is_object( $wpdb ) && isset( $wpdb->prefix ); + } +} diff --git a/includes/stores/class-agent-event-store.php b/includes/stores/class-agent-event-store.php new file mode 100644 index 0000000..79055fb --- /dev/null +++ b/includes/stores/class-agent-event-store.php @@ -0,0 +1,196 @@ +is_wpdb_ready( $wpdb ) ) { + return self::TABLE_SUFFIX; + } + + return $wpdb->prefix . self::TABLE_SUFFIX; + } + + /** + * Create/update event table schema. + */ + public function create_table(): bool { + global $wpdb; + + if ( ! $this->is_wpdb_ready( $wpdb ) ) { + return false; + } + + $charset_collate = method_exists( $wpdb, 'get_charset_collate' ) + ? (string) $wpdb->get_charset_collate() + : ''; + + $sql = "CREATE TABLE {$this->get_table_name()} ( + id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + run_id bigint(20) unsigned NULL, + session_id bigint(20) unsigned NULL, + event_type varchar(64) NOT NULL, + payload_json longtext NULL, + created_at_gmt datetime NOT NULL, + PRIMARY KEY (id), + KEY run_id_id (run_id, id), + KEY session_id_id (session_id, id), + KEY event_type (event_type), + KEY created_at_gmt (created_at_gmt) + ) {$charset_collate};"; + + if ( ! function_exists( 'dbDelta' ) ) { + require_once ABSPATH . 'wp-admin/includes/upgrade.php'; + } + + if ( ! function_exists( 'dbDelta' ) ) { + return false; + } + + dbDelta( $sql ); + return true; + } + + /** + * Insert one event row. + * + * @param array $data Event payload. + */ + public function insert_event( array $data ): int { + global $wpdb; + + if ( ! $this->is_wpdb_ready( $wpdb ) || ! method_exists( $wpdb, 'insert' ) ) { + return 0; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery -- centralized append-only event insert. + $inserted = $wpdb->insert( + $this->get_table_name(), + [ + 'run_id' => $data['run_id'] ?? null, + 'session_id' => $data['session_id'] ?? null, + 'event_type' => isset( $data['event_type'] ) ? (string) $data['event_type'] : 'event', + 'payload_json' => $data['payload_json'] ?? null, + 'created_at_gmt' => isset( $data['created_at_gmt'] ) ? (string) $data['created_at_gmt'] : gmdate( 'Y-m-d H:i:s' ), + ], + [ '%d', '%d', '%s', '%s', '%s' ] + ); + + if ( false === $inserted || ! isset( $wpdb->insert_id ) ) { + return 0; + } + + return (int) $wpdb->insert_id; + } + + /** + * Fetch event rows incrementally. + * + * @param array $args Query filters. + * @return array> + */ + public function get_events( array $args = [] ): array { + global $wpdb; + + if ( ! $this->is_wpdb_ready( $wpdb ) || ! method_exists( $wpdb, 'prepare' ) || ! method_exists( $wpdb, 'get_results' ) ) { + return []; + } + + $limit = isset( $args['limit'] ) ? (int) $args['limit'] : 100; + $limit = $limit > 0 ? min( $limit, 500 ) : 100; + $after = isset( $args['after_event_id'] ) ? (int) $args['after_event_id'] : 0; + $after = $after > 0 ? $after : 0; + + $where_clauses = [ 'id > %d' ]; + $where_values = [ $after ]; + + if ( isset( $args['run_id'] ) && (int) $args['run_id'] > 0 ) { + $where_clauses[] = 'run_id = %d'; + $where_values[] = (int) $args['run_id']; + } + + if ( isset( $args['session_id'] ) && (int) $args['session_id'] > 0 ) { + $where_clauses[] = 'session_id = %d'; + $where_values[] = (int) $args['session_id']; + } + + if ( isset( $args['event_type'] ) && '' !== (string) $args['event_type'] ) { + $where_clauses[] = 'event_type = %s'; + $where_values[] = (string) $args['event_type']; + } + + $where_values[] = $limit; + $where_sql = 'WHERE ' . implode( ' AND ', $where_clauses ); + + // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared -- table name is fixed plugin-owned identifier. + $query = "SELECT id, run_id, session_id, event_type, payload_json, created_at_gmt + FROM {$this->get_table_name()} + {$where_sql} + ORDER BY id ASC + LIMIT %d"; + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- query is prepared via `$wpdb->prepare()` on this line. + $prepared_query = $wpdb->prepare( $query, $where_values ); + if ( ! is_string( $prepared_query ) || '' === $prepared_query ) { + return []; + } + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared,WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- bounded incremental event read. + $rows = $wpdb->get_results( $prepared_query, 'ARRAY_A' ); + return is_array( $rows ) ? array_values( $rows ) : []; + } + + /** + * Check whether a usable `$wpdb` object is present. + * + * @param mixed $wpdb Candidate wpdb object. + */ + private function is_wpdb_ready( $wpdb ): bool { + return is_object( $wpdb ) && isset( $wpdb->prefix ); + } +} diff --git a/includes/stores/class-agent-run-store.php b/includes/stores/class-agent-run-store.php new file mode 100644 index 0000000..36df87e --- /dev/null +++ b/includes/stores/class-agent-run-store.php @@ -0,0 +1,298 @@ +prefix ) ) { + return self::TABLE_SUFFIX; + } + + return $wpdb->prefix . self::TABLE_SUFFIX; + } + + /** + * Create/update run table schema. + */ + public function create_table(): bool { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! isset( $wpdb->prefix ) ) { + return false; + } + + $charset_collate = method_exists( $wpdb, 'get_charset_collate' ) + ? (string) $wpdb->get_charset_collate() + : ''; + + $sql = "CREATE TABLE {$this->get_table_name()} ( + id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + session_id bigint(20) unsigned NOT NULL, + run_uuid char(36) NOT NULL, + status varchar(32) NOT NULL DEFAULT 'queued', + claimed_by varchar(64) NULL, + lock_token char(64) NULL, + lock_acquired_at_gmt datetime NULL, + lock_expires_at_gmt datetime NULL, + attempt int(11) NOT NULL DEFAULT 1, + started_at_gmt datetime NULL, + finished_at_gmt datetime NULL, + error_code varchar(128) NULL, + error_message text NULL, + meta_json longtext NULL, + created_at_gmt datetime NOT NULL, + updated_at_gmt datetime NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY run_uuid (run_uuid), + KEY session_status (session_id, status), + KEY status_lock_expires_at_gmt (status, lock_expires_at_gmt), + KEY claimed_by (claimed_by) + ) {$charset_collate};"; + + if ( ! function_exists( 'dbDelta' ) ) { + require_once ABSPATH . 'wp-admin/includes/upgrade.php'; + } + + if ( ! function_exists( 'dbDelta' ) ) { + return false; + } + + dbDelta( $sql ); + return true; + } + + /** + * Insert a queued run row. + * + * @param int $session_id Session identifier. + * @param string $run_uuid Run UUID. + * @param string $created_at_gmt Created-at timestamp (UTC). + */ + public function insert_run( int $session_id, string $run_uuid, string $created_at_gmt ): int { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'insert' ) ) { + return 0; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery -- centralized repository insert. + $wpdb->insert( + $this->get_table_name(), + [ + 'session_id' => $session_id, + 'run_uuid' => $run_uuid, + 'status' => 'queued', + 'attempt' => 1, + 'created_at_gmt' => $created_at_gmt, + 'updated_at_gmt' => $created_at_gmt, + ], + [ '%d', '%s', '%s', '%d', '%s', '%s' ] + ); + + return isset( $wpdb->insert_id ) ? (int) $wpdb->insert_id : 0; + } + + /** + * Compare-and-swap claim update for a run. + * + * @param int $run_id Run identifier. + * @param string $current_status Expected status. + * @param string|null $current_lock_expires_at_gmt Expected lock expiry when reclaiming. + * @param array $data Update data. + * @param bool $is_stale Whether this claim is a stale reclaim. + * @return int|false + */ + public function update_claim( + int $run_id, + string $current_status, + ?string $current_lock_expires_at_gmt, + array $data, + bool $is_stale + ) { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'update' ) ) { + return false; + } + + $where = [ + 'id' => $run_id, + 'status' => $current_status, + ]; + $where_format = [ '%d', '%s' ]; + + if ( $is_stale ) { + $where['lock_expires_at_gmt'] = $current_lock_expires_at_gmt; + $where_format[] = '%s'; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- bounded compare-and-swap style update. + return $wpdb->update( + $this->get_table_name(), + [ + 'status' => isset( $data['status'] ) ? (string) $data['status'] : 'running', + 'claimed_by' => isset( $data['claimed_by'] ) ? (string) $data['claimed_by'] : null, + 'lock_token' => isset( $data['lock_token'] ) ? (string) $data['lock_token'] : null, + 'lock_acquired_at_gmt' => isset( $data['lock_acquired_at_gmt'] ) ? (string) $data['lock_acquired_at_gmt'] : null, + 'lock_expires_at_gmt' => isset( $data['lock_expires_at_gmt'] ) ? (string) $data['lock_expires_at_gmt'] : null, + 'attempt' => isset( $data['attempt'] ) ? (int) $data['attempt'] : 1, + 'started_at_gmt' => isset( $data['started_at_gmt'] ) ? (string) $data['started_at_gmt'] : null, + 'updated_at_gmt' => isset( $data['updated_at_gmt'] ) ? (string) $data['updated_at_gmt'] : null, + ], + $where, + [ '%s', '%s', '%s', '%s', '%s', '%d', '%s', '%s' ], + $where_format + ); + } + + /** + * Complete a run with lock-token guard. + * + * @param int $run_id Run identifier. + * @param string $lock_token Lock token guard. + * @param array $data Completion data. + * @return int|false + */ + public function update_completion( int $run_id, string $lock_token, array $data ) { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'update' ) ) { + return false; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- bounded repository completion update. + return $wpdb->update( + $this->get_table_name(), + [ + 'status' => isset( $data['status'] ) ? (string) $data['status'] : null, + 'lock_token' => null, + 'claimed_by' => null, + 'lock_acquired_at_gmt' => null, + 'lock_expires_at_gmt' => null, + 'finished_at_gmt' => isset( $data['finished_at_gmt'] ) ? (string) $data['finished_at_gmt'] : null, + 'error_code' => $data['error_code'] ?? null, + 'error_message' => $data['error_message'] ?? null, + 'meta_json' => $data['meta_json'] ?? null, + 'updated_at_gmt' => isset( $data['updated_at_gmt'] ) ? (string) $data['updated_at_gmt'] : null, + ], + [ + 'id' => $run_id, + 'lock_token' => $lock_token, + ], + [ '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s' ], + [ '%d', '%s' ] + ); + } + + /** + * Fetch one run by id. + * + * @param int $run_id Run identifier. + * @return array + */ + public function get_run( int $run_id ): array { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'prepare' ) || ! method_exists( $wpdb, 'get_row' ) ) { + return []; + } + + $table_name = $this->get_table_name(); + // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared -- table name is fixed plugin-owned identifier. + $query = $wpdb->prepare( "SELECT * FROM {$table_name} WHERE id = %d", $run_id ); + if ( ! is_string( $query ) || '' === $query ) { + return []; + } + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared,WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- bounded primary-key lookup. + $row = $wpdb->get_row( $query, 'ARRAY_A' ); + return is_array( $row ) ? $row : []; + } + + /** + * Begin transaction. + */ + public function begin_transaction(): bool { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'query' ) ) { + return false; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- bounded transaction control statement. + return false !== $wpdb->query( 'START TRANSACTION' ); + } + + /** + * Commit transaction. + */ + public function commit_transaction(): bool { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'query' ) ) { + return false; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- bounded transaction control statement. + return false !== $wpdb->query( 'COMMIT' ); + } + + /** + * Roll back transaction. + */ + public function rollback_transaction(): void { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'query' ) ) { + return; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- bounded transaction control statement. + $wpdb->query( 'ROLLBACK' ); + } +} diff --git a/includes/stores/class-agent-session-store.php b/includes/stores/class-agent-session-store.php new file mode 100644 index 0000000..12384dd --- /dev/null +++ b/includes/stores/class-agent-session-store.php @@ -0,0 +1,203 @@ +prefix ) ) { + return self::TABLE_SUFFIX; + } + + return $wpdb->prefix . self::TABLE_SUFFIX; + } + + /** + * Create/update session table schema. + */ + public function create_table(): bool { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! isset( $wpdb->prefix ) ) { + return false; + } + + $charset_collate = method_exists( $wpdb, 'get_charset_collate' ) + ? (string) $wpdb->get_charset_collate() + : ''; + + $sql = "CREATE TABLE {$this->get_table_name()} ( + id bigint(20) unsigned NOT NULL AUTO_INCREMENT, + uuid char(36) NOT NULL, + status varchar(32) NOT NULL DEFAULT 'active', + trigger_type varchar(32) NOT NULL DEFAULT 'chat', + requesting_user_id bigint(20) unsigned NULL, + execution_user_id bigint(20) unsigned NULL, + policy_profile varchar(64) NULL, + last_run_at_gmt datetime NULL, + next_run_at_gmt datetime NULL, + last_run_status varchar(32) NULL, + consecutive_failures int(11) NOT NULL DEFAULT 0, + created_at_gmt datetime NOT NULL, + updated_at_gmt datetime NOT NULL, + PRIMARY KEY (id), + UNIQUE KEY uuid (uuid), + KEY status_next_run_at_gmt (status, next_run_at_gmt), + KEY trigger_type (trigger_type) + ) {$charset_collate};"; + + if ( ! function_exists( 'dbDelta' ) ) { + require_once ABSPATH . 'wp-admin/includes/upgrade.php'; + } + + if ( ! function_exists( 'dbDelta' ) ) { + return false; + } + + dbDelta( $sql ); + return true; + } + + /** + * Insert one session row. + * + * @param array $data Session payload. + */ + public function insert_session( array $data ): int { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'insert' ) ) { + return 0; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery -- Centralized repository insert. + $wpdb->insert( + $this->get_table_name(), + [ + 'uuid' => isset( $data['uuid'] ) ? (string) $data['uuid'] : '', + 'status' => isset( $data['status'] ) ? (string) $data['status'] : 'active', + 'trigger_type' => isset( $data['trigger_type'] ) ? (string) $data['trigger_type'] : 'chat', + 'requesting_user_id' => $data['requesting_user_id'] ?? null, + 'execution_user_id' => $data['execution_user_id'] ?? null, + 'policy_profile' => $data['policy_profile'] ?? null, + 'last_run_at_gmt' => $data['last_run_at_gmt'] ?? null, + 'next_run_at_gmt' => $data['next_run_at_gmt'] ?? null, + 'last_run_status' => $data['last_run_status'] ?? null, + 'consecutive_failures' => isset( $data['consecutive_failures'] ) ? (int) $data['consecutive_failures'] : 0, + 'created_at_gmt' => isset( $data['created_at_gmt'] ) ? (string) $data['created_at_gmt'] : gmdate( 'Y-m-d H:i:s' ), + 'updated_at_gmt' => isset( $data['updated_at_gmt'] ) ? (string) $data['updated_at_gmt'] : gmdate( 'Y-m-d H:i:s' ), + ], + [ + '%s', + '%s', + '%s', + '%d', + '%d', + '%s', + '%s', + '%s', + '%s', + '%d', + '%s', + '%s', + ] + ); + + if ( ! isset( $wpdb->insert_id ) ) { + return 0; + } + + return (int) $wpdb->insert_id; + } + + /** + * Update parent session state after run completion. + * + * @param int $session_id Session identifier. + * @param string $run_status Terminal run status. + * @param string|null $next_run_at_gmt Optional next-run timestamp. + * @param string $updated_at_gmt Update timestamp (UTC). + */ + public function update_run_completion( int $session_id, string $run_status, ?string $next_run_at_gmt, string $updated_at_gmt ): bool { + global $wpdb; + + if ( ! is_object( $wpdb ) || ! method_exists( $wpdb, 'prepare' ) || ! method_exists( $wpdb, 'query' ) ) { + return false; + } + + $table_name = $this->get_table_name(); + $query = $wpdb->prepare( + // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared -- table name is fixed plugin-owned identifier. + "UPDATE {$table_name} + SET + last_run_at_gmt = %s, + last_run_status = %s, + consecutive_failures = CASE + WHEN %s = 'success' THEN 0 + ELSE consecutive_failures + 1 + END, + next_run_at_gmt = %s, + updated_at_gmt = %s + WHERE id = %d", + $updated_at_gmt, + $run_status, + $run_status, + $next_run_at_gmt, + $updated_at_gmt, + $session_id + ); + + if ( ! is_string( $query ) || '' === $query ) { + return false; + } + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared,WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching -- prepared bounded update on primary-key id. + $updated = $wpdb->query( $query ); + + return false !== $updated; + } +} diff --git a/tests/Unit/AbilitiesHelperTest.php b/tests/Unit/AbilitiesHelperTest.php index 62a0758..8cb8569 100644 --- a/tests/Unit/AbilitiesHelperTest.php +++ b/tests/Unit/AbilitiesHelperTest.php @@ -15,7 +15,7 @@ use WordPress\AiClient\Tools\DTO\FunctionDeclaration; /** - * Minimal wpdb stub for abilities helper action-log writes. + * Minimal wpdb stub for abilities helper agent-event writes. */ final class AbilitiesHelperTestWpdb { /** @@ -101,9 +101,12 @@ public function test_tool_execution_logs_requesting_and_execution_actors(): void $this->assertTrue( $result['success'] ); $this->assertNotEmpty( $GLOBALS['wpdb']->insert_calls ); - $this->assertSame( 12, $GLOBALS['wpdb']->insert_calls[0]['data']['requesting_user_id'] ); - $this->assertSame( 9, $GLOBALS['wpdb']->insert_calls[0]['data']['execution_user_id'] ); + $this->assertSame( 'wp_clawpress_agent_events', $GLOBALS['wpdb']->insert_calls[0]['table'] ); $this->assertSame( 'tool_call', $GLOBALS['wpdb']->insert_calls[0]['data']['event_type'] ); + $payload = json_decode( (string) $GLOBALS['wpdb']->insert_calls[0]['data']['payload_json'], true ); + $this->assertIsArray( $payload ); + $this->assertSame( 12, $payload['requesting_user_id'] ); + $this->assertSame( 9, $payload['execution_user_id'] ); } public function test_destructive_tools_are_denied_for_heartbeat_trigger_policy(): void { diff --git a/tests/Unit/ActionLogHelperTest.php b/tests/Unit/ActionLogHelperTest.php index 0662076..46dba9e 100644 --- a/tests/Unit/ActionLogHelperTest.php +++ b/tests/Unit/ActionLogHelperTest.php @@ -30,6 +30,7 @@ function dbDelta( string $queries ): array { use ClawPress\Helpers\Action_Log_Helper; use ClawPress\Plugin; +use ClawPress\Stores\Action_Log_Store; use ClawPress\Tests\Support\TestCase; use ClawPress\Tests\Support\WordPress_Stubs; @@ -129,9 +130,8 @@ protected function tearDown(): void { parent::tearDown(); } - public function test_create_table_registers_clawpress_action_logs_schema(): void { - $helper = Action_Log_Helper::get_instance(); - $result = $helper->create_table(); + public function test_store_create_table_registers_clawpress_action_logs_schema(): void { + $result = Action_Log_Store::get_instance()->create_table(); $this->assertTrue( $result ); $this->assertIsArray( $GLOBALS['clawpress_test_dbdelta_queries'] ); @@ -140,6 +140,11 @@ public function test_create_table_registers_clawpress_action_logs_schema(): void $this->assertStringContainsString( 'action_name', (string) $GLOBALS['clawpress_test_dbdelta_queries'][0] ); } + public function test_helper_does_not_expose_schema_methods(): void { + $this->assertFalse( method_exists( Action_Log_Helper::class, 'create_table' ) ); + $this->assertFalse( method_exists( Action_Log_Helper::class, 'get_table_name' ) ); + } + public function test_plugin_activation_creates_action_log_table(): void { Plugin::activate(); diff --git a/tests/Unit/AgentEventHelperTest.php b/tests/Unit/AgentEventHelperTest.php new file mode 100644 index 0000000..ac81c71 --- /dev/null +++ b/tests/Unit/AgentEventHelperTest.php @@ -0,0 +1,216 @@ + + */ + function dbDelta( string $queries ): array { + if ( ! isset( $GLOBALS['clawpress_test_dbdelta_queries'] ) || ! is_array( $GLOBALS['clawpress_test_dbdelta_queries'] ) ) { + $GLOBALS['clawpress_test_dbdelta_queries'] = []; + } + + $GLOBALS['clawpress_test_dbdelta_queries'][] = $queries; + return []; + } + } +} + +namespace ClawPress\Tests\Unit { + +use ClawPress\Helpers\Agent_Event_Helper; +use ClawPress\Plugin; +use ClawPress\Stores\Agent_Event_Store; +use ClawPress\Tests\Support\TestCase; + +/** + * Minimal wpdb stub for agent event helper tests. + */ +final class AgentEventHelperTestWpdb { + /** + * Table prefix. + * + * @var string + */ + public string $prefix = 'wp_'; + + /** + * Captured insert calls. + * + * @var array> + */ + public array $insert_calls = []; + + /** + * Captured prepared args. + * + * @var array + */ + public array $last_prepare_args = []; + + /** + * Prepared query result set. + * + * @var array> + */ + public array $results = []; + + /** + * Insert id stub. + */ + public int $insert_id = 0; + + /** + * Get charset/collation SQL. + */ + public function get_charset_collate(): string { + return 'DEFAULT CHARSET=utf8mb4'; + } + + /** + * Capture insert operation. + * + * @param string $table Table name. + * @param array $data Insert row. + * @param array $format Insert formats. + */ + public function insert( string $table, array $data, array $format ) { + $this->insert_id++; + $this->insert_calls[] = [ + 'table' => $table, + 'data' => $data, + 'format' => $format, + ]; + + return 1; + } + + /** + * Capture prepared SQL call. + * + * @param string $query Query string. + * @param array|mixed ...$args Prepare args. + */ + public function prepare( string $query, ...$args ): string { + if ( 1 === count( $args ) && is_array( $args[0] ) ) { + $args = $args[0]; + } + + $this->last_prepare_args = $args; + return $query; + } + + /** + * Return prepared query rows. + * + * @param string $query Query string. + * @param string $output Output mode. + * @return array> + */ + public function get_results( string $query, string $output ): array { + unset( $query, $output ); + return $this->results; + } +} + +final class AgentEventHelperTest extends TestCase { + protected function setUp(): void { + parent::setUp(); + $GLOBALS['clawpress_test_dbdelta_queries'] = []; + $GLOBALS['wpdb'] = new AgentEventHelperTestWpdb(); + } + + protected function tearDown(): void { + unset( $GLOBALS['wpdb'], $GLOBALS['clawpress_test_dbdelta_queries'] ); + parent::tearDown(); + } + + public function test_store_create_table_registers_clawpress_agent_events_schema(): void { + $result = Agent_Event_Store::get_instance()->create_table(); + + $this->assertTrue( $result ); + $this->assertIsArray( $GLOBALS['clawpress_test_dbdelta_queries'] ); + $this->assertNotEmpty( $GLOBALS['clawpress_test_dbdelta_queries'] ); + $this->assertStringContainsString( 'clawpress_agent_events', (string) $GLOBALS['clawpress_test_dbdelta_queries'][0] ); + $this->assertStringContainsString( 'payload_json', (string) $GLOBALS['clawpress_test_dbdelta_queries'][0] ); + } + + public function test_helper_does_not_expose_schema_methods(): void { + $this->assertFalse( method_exists( Agent_Event_Helper::class, 'create_table' ) ); + $this->assertFalse( method_exists( Agent_Event_Helper::class, 'get_table_name' ) ); + } + + public function test_plugin_activation_creates_agent_event_table(): void { + Plugin::activate(); + + $this->assertNotEmpty( $GLOBALS['clawpress_test_dbdelta_queries'] ); + $all_queries = implode( "\n", array_map( 'strval', $GLOBALS['clawpress_test_dbdelta_queries'] ) ); + $this->assertStringContainsString( 'clawpress_agent_events', $all_queries ); + } + + public function test_emit_tool_call_persists_row_into_agent_event_table(): void { + $event_id = Agent_Event_Helper::get_instance()->emit_tool_call( + 'file_list', + 'clawpress/file-list', + 12, + 9, + 'success', + 'abc123', + [ + 'success' => true, + 'result' => [ 'items' => [ 'README.md' ] ], + ], + [ + 'run_id' => 77, + 'session_id' => 11, + ] + ); + + $this->assertSame( 1, $event_id ); + $this->assertCount( 1, $GLOBALS['wpdb']->insert_calls ); + $this->assertSame( 'wp_clawpress_agent_events', $GLOBALS['wpdb']->insert_calls[0]['table'] ); + $this->assertSame( 'tool_call', $GLOBALS['wpdb']->insert_calls[0]['data']['event_type'] ); + $this->assertSame( 77, $GLOBALS['wpdb']->insert_calls[0]['data']['run_id'] ); + $this->assertSame( 11, $GLOBALS['wpdb']->insert_calls[0]['data']['session_id'] ); + + $payload = json_decode( (string) $GLOBALS['wpdb']->insert_calls[0]['data']['payload_json'], true ); + $this->assertIsArray( $payload ); + $this->assertSame( 'file_list', $payload['tool_name'] ); + $this->assertSame( 'clawpress/file-list', $payload['ability_name'] ); + $this->assertSame( 12, $payload['requesting_user_id'] ); + $this->assertSame( 9, $payload['execution_user_id'] ); + } + + public function test_get_run_events_returns_normalized_incremental_rows(): void { + $GLOBALS['wpdb']->results = [ + [ + 'id' => '14', + 'run_id' => '77', + 'session_id' => '11', + 'event_type' => 'tool_call', + 'payload_json' => '{"tool_name":"file_list","success":true}', + 'created_at_gmt' => '2026-02-20 10:00:00', + ], + ]; + + $rows = Agent_Event_Helper::get_instance()->get_run_events( 77, 10, 25 ); + + $this->assertCount( 1, $rows ); + $this->assertSame( 14, $rows[0]['event_id'] ); + $this->assertSame( 77, $rows[0]['run_id'] ); + $this->assertSame( 11, $rows[0]['session_id'] ); + $this->assertSame( true, $rows[0]['payload']['success'] ); + $this->assertSame( [ 10, 77, 25 ], $GLOBALS['wpdb']->last_prepare_args ); + } +} +} diff --git a/tests/Unit/AgentRunHelperTest.php b/tests/Unit/AgentRunHelperTest.php new file mode 100644 index 0000000..8cb22b4 --- /dev/null +++ b/tests/Unit/AgentRunHelperTest.php @@ -0,0 +1,361 @@ + + */ + function dbDelta( string $queries ): array { + if ( ! isset( $GLOBALS['clawpress_test_dbdelta_queries'] ) || ! is_array( $GLOBALS['clawpress_test_dbdelta_queries'] ) ) { + $GLOBALS['clawpress_test_dbdelta_queries'] = []; + } + + $GLOBALS['clawpress_test_dbdelta_queries'][] = $queries; + return []; + } + } +} + +namespace ClawPress\Tests\Unit { + +use ClawPress\Helpers\Agent_Run_Helper; +use ClawPress\Helpers\Agent_Session_Helper; +use ClawPress\Plugin; +use ClawPress\Tests\Support\TestCase; + +/** + * Minimal in-memory wpdb stub for run/session store tests. + */ +final class AgentRunHelperTestWpdb { + public string $prefix = 'wp_'; + + public int $insert_id = 0; + + /** @var array> */ + public array $sessions = []; + + /** @var array> */ + public array $runs = []; + + /** @var array */ + public array $last_prepare_args = []; + + public bool $fail_session_update = false; + + private bool $in_transaction = false; + + /** @var array{sessions:array>,runs:array>,insert_id:int}|null */ + private ?array $transaction_snapshot = null; + + public function get_charset_collate(): string { + return 'DEFAULT CHARSET=utf8mb4'; + } + + /** + * @param string $table Table name. + * @param array $data Data payload. + * @param array $format Format list. + */ + public function insert( string $table, array $data, array $format ) { + unset( $format ); + + $this->insert_id++; + $data['id'] = $this->insert_id; + + if ( false !== strpos( $table, 'agent_sessions' ) ) { + $this->sessions[ $this->insert_id ] = $data; + return 1; + } + + if ( false !== strpos( $table, 'agent_runs' ) ) { + $this->runs[ $this->insert_id ] = $data; + return 1; + } + + return false; + } + + /** + * @param string $table Table name. + * @param array $data Data payload. + * @param array $where Where payload. + * @param array $format Formats. + * @param array $where_format Where formats. + */ + public function update( string $table, array $data, array $where, ?array $format = null, ?array $where_format = null ) { + unset( $format, $where_format ); + + $target = false !== strpos( $table, 'agent_sessions' ) ? 'sessions' : ( false !== strpos( $table, 'agent_runs' ) ? 'runs' : '' ); + if ( '' === $target ) { + return false; + } + if ( $this->fail_session_update && 'sessions' === $target ) { + return false; + } + + $rows = $this->{$target}; + $updated = 0; + + foreach ( $rows as $id => $row ) { + $matches = true; + foreach ( $where as $key => $value ) { + $current = $row[ $key ] ?? null; + if ( (string) $current !== (string) $value ) { + $matches = false; + break; + } + } + + if ( ! $matches ) { + continue; + } + + $this->{$target}[ $id ] = array_merge( $row, $data ); + ++$updated; + } + + return $updated; + } + + public function query( string $sql ) { + $sql = strtoupper( trim( $sql ) ); + + if ( 'START TRANSACTION' === $sql ) { + $this->in_transaction = true; + $this->transaction_snapshot = [ + 'sessions' => $this->sessions, + 'runs' => $this->runs, + 'insert_id' => $this->insert_id, + ]; + return 1; + } + + if ( 'COMMIT' === $sql ) { + $this->in_transaction = false; + $this->transaction_snapshot = null; + return 1; + } + + if ( 'ROLLBACK' === $sql ) { + if ( $this->in_transaction && is_array( $this->transaction_snapshot ) ) { + $this->sessions = $this->transaction_snapshot['sessions']; + $this->runs = $this->transaction_snapshot['runs']; + $this->insert_id = $this->transaction_snapshot['insert_id']; + } + + $this->in_transaction = false; + $this->transaction_snapshot = null; + return 1; + } + + if ( str_starts_with( $sql, 'UPDATE' ) && false !== strpos( $sql, 'AGENT_SESSIONS' ) ) { + if ( $this->fail_session_update ) { + return false; + } + + $session_id = isset( $this->last_prepare_args[5] ) ? (int) $this->last_prepare_args[5] : 0; + if ( $session_id <= 0 || ! isset( $this->sessions[ $session_id ] ) ) { + return 0; + } + + $run_status = isset( $this->last_prepare_args[1] ) ? (string) $this->last_prepare_args[1] : ''; + $failures = (int) ( $this->sessions[ $session_id ]['consecutive_failures'] ?? 0 ); + if ( 'success' === $run_status ) { + $failures = 0; + } else { + ++$failures; + } + + $this->sessions[ $session_id ]['last_run_at_gmt'] = isset( $this->last_prepare_args[0] ) ? (string) $this->last_prepare_args[0] : null; + $this->sessions[ $session_id ]['last_run_status'] = $run_status; + $this->sessions[ $session_id ]['consecutive_failures'] = $failures; + $this->sessions[ $session_id ]['next_run_at_gmt'] = $this->last_prepare_args[3] ?? null; + $this->sessions[ $session_id ]['updated_at_gmt'] = isset( $this->last_prepare_args[4] ) ? (string) $this->last_prepare_args[4] : null; + return 1; + } + + return false; + } + + /** + * @param string $query Query string. + * @param array|mixed ...$args Prepare args. + */ + public function prepare( string $query, ...$args ): string { + if ( 1 === count( $args ) && is_array( $args[0] ) ) { + $args = $args[0]; + } + + $this->last_prepare_args = $args; + return $query; + } + + /** + * @return array|null + */ + public function get_row( string $query, string $output ) { + unset( $output ); + + $id = 0; + if ( preg_match( '/WHERE id =\s*(\d+)/', $query, $matches ) ) { + $id = (int) $matches[1]; + } + if ( $id <= 0 ) { + $id = isset( $this->last_prepare_args[0] ) ? (int) $this->last_prepare_args[0] : 0; + } + + if ( $id <= 0 ) { + return null; + } + + if ( false !== strpos( $query, 'agent_sessions' ) ) { + return $this->sessions[ $id ] ?? null; + } + + if ( false !== strpos( $query, 'agent_runs' ) ) { + return $this->runs[ $id ] ?? null; + } + + return null; + } +} + +final class AgentRunHelperTest extends TestCase { + protected function setUp(): void { + parent::setUp(); + $GLOBALS['clawpress_test_dbdelta_queries'] = []; + $GLOBALS['wpdb'] = new AgentRunHelperTestWpdb(); + } + + protected function tearDown(): void { + unset( $GLOBALS['wpdb'], $GLOBALS['clawpress_test_dbdelta_queries'] ); + parent::tearDown(); + } + + public function test_plugin_activation_registers_session_and_run_tables(): void { + Plugin::activate(); + + $all_queries = implode( "\n", $GLOBALS['clawpress_test_dbdelta_queries'] ); + $this->assertStringContainsString( 'clawpress_agent_sessions', $all_queries ); + $this->assertStringContainsString( 'clawpress_agent_runs', $all_queries ); + } + + public function test_claim_run_success(): void { + $session_id = Agent_Session_Helper::get_instance()->create_session(); + $run_id = Agent_Run_Helper::get_instance()->create_run( $session_id ); + + $result = Agent_Run_Helper::get_instance()->claim_run( $run_id, 'worker-a', 120 ); + + $this->assertTrue( $result['claimed'] ); + $this->assertSame( 'running', $GLOBALS['wpdb']->runs[ $run_id ]['status'] ); + $this->assertSame( 'worker-a', $GLOBALS['wpdb']->runs[ $run_id ]['claimed_by'] ); + $this->assertNotEmpty( $GLOBALS['wpdb']->runs[ $run_id ]['lock_token'] ); + } + + public function test_claim_collision_fails_for_second_worker(): void { + $session_id = Agent_Session_Helper::get_instance()->create_session(); + $run_id = Agent_Run_Helper::get_instance()->create_run( $session_id ); + + $first = Agent_Run_Helper::get_instance()->claim_run( $run_id, 'worker-a', 120 ); + $second = Agent_Run_Helper::get_instance()->claim_run( $run_id, 'worker-b', 120 ); + + $this->assertTrue( $first['claimed'] ); + $this->assertFalse( $second['claimed'] ); + $this->assertSame( 'not_claimable', $second['reason'] ); + } + + public function test_stale_lock_can_be_reclaimed_and_attempt_increments(): void { + $session_id = Agent_Session_Helper::get_instance()->create_session(); + $run_id = Agent_Run_Helper::get_instance()->create_run( $session_id ); + $GLOBALS['wpdb']->runs[ $run_id ]['status'] = 'running'; + $GLOBALS['wpdb']->runs[ $run_id ]['attempt'] = 1; + $GLOBALS['wpdb']->runs[ $run_id ]['lock_expires_at_gmt'] = '2000-01-01 00:00:00'; + + $result = Agent_Run_Helper::get_instance()->claim_run( $run_id, 'worker-reclaim', 120 ); + + $this->assertTrue( $result['claimed'] ); + $this->assertTrue( $result['reclaimed'] ); + $this->assertSame( 2, $result['attempt'] ); + $this->assertSame( 2, (int) $GLOBALS['wpdb']->runs[ $run_id ]['attempt'] ); + } + + public function test_complete_run_clears_lock_and_updates_session_state(): void { + $session_id = Agent_Session_Helper::get_instance()->create_session(); + $run_id = Agent_Run_Helper::get_instance()->create_run( $session_id ); + $claim = Agent_Run_Helper::get_instance()->claim_run( $run_id, 'worker-a', 120 ); + + $completed = Agent_Run_Helper::get_instance()->complete_run( + $run_id, + (string) $claim['lock_token'], + 'success', + [ + 'meta' => [ 'tools' => 3 ], + ] + ); + + $this->assertTrue( $completed ); + $this->assertSame( 'success', $GLOBALS['wpdb']->runs[ $run_id ]['status'] ); + $this->assertNull( $GLOBALS['wpdb']->runs[ $run_id ]['lock_token'] ); + $this->assertSame( 'success', $GLOBALS['wpdb']->sessions[ $session_id ]['last_run_status'] ); + $this->assertSame( 0, (int) $GLOBALS['wpdb']->sessions[ $session_id ]['consecutive_failures'] ); + } + + public function test_complete_run_rolls_back_when_session_update_fails(): void { + $session_id = Agent_Session_Helper::get_instance()->create_session(); + $run_id = Agent_Run_Helper::get_instance()->create_run( $session_id ); + $claim = Agent_Run_Helper::get_instance()->claim_run( $run_id, 'worker-a', 120 ); + $lock_token = (string) $claim['lock_token']; + + $GLOBALS['wpdb']->fail_session_update = true; + $completed = Agent_Run_Helper::get_instance()->complete_run( $run_id, $lock_token, 'success' ); + + $this->assertFalse( $completed ); + $this->assertSame( 'running', $GLOBALS['wpdb']->runs[ $run_id ]['status'] ); + $this->assertSame( $lock_token, $GLOBALS['wpdb']->runs[ $run_id ]['lock_token'] ); + $this->assertNull( $GLOBALS['wpdb']->sessions[ $session_id ]['last_run_status'] ); + } + + public function test_apply_run_completion_increments_failures_and_resets_on_success(): void { + $session_id = Agent_Session_Helper::get_instance()->create_session(); + + $this->assertTrue( Agent_Session_Helper::get_instance()->apply_run_completion( $session_id, 'failed', null ) ); + $this->assertSame( 1, (int) $GLOBALS['wpdb']->sessions[ $session_id ]['consecutive_failures'] ); + + $this->assertTrue( Agent_Session_Helper::get_instance()->apply_run_completion( $session_id, 'failed', null ) ); + $this->assertSame( 2, (int) $GLOBALS['wpdb']->sessions[ $session_id ]['consecutive_failures'] ); + + $this->assertTrue( Agent_Session_Helper::get_instance()->apply_run_completion( $session_id, 'success', null ) ); + $this->assertSame( 0, (int) $GLOBALS['wpdb']->sessions[ $session_id ]['consecutive_failures'] ); + } + + public function test_complete_run_rejects_non_terminal_status(): void { + $session_id = Agent_Session_Helper::get_instance()->create_session(); + $run_id = Agent_Run_Helper::get_instance()->create_run( $session_id ); + $claim = Agent_Run_Helper::get_instance()->claim_run( $run_id, 'worker-a', 120 ); + $lock_token = (string) $claim['lock_token']; + + $completed = Agent_Run_Helper::get_instance()->complete_run( $run_id, $lock_token, 'running' ); + + $this->assertFalse( $completed ); + $this->assertSame( 'running', $GLOBALS['wpdb']->runs[ $run_id ]['status'] ); + $this->assertSame( $lock_token, $GLOBALS['wpdb']->runs[ $run_id ]['lock_token'] ); + } + + public function test_helpers_do_not_expose_schema_methods(): void { + $this->assertFalse( method_exists( Agent_Run_Helper::class, 'create_table' ) ); + $this->assertFalse( method_exists( Agent_Run_Helper::class, 'get_table_name' ) ); + $this->assertFalse( method_exists( Agent_Session_Helper::class, 'create_table' ) ); + $this->assertFalse( method_exists( Agent_Session_Helper::class, 'get_table_name' ) ); + } +} +}