diff --git a/open_strix/app.py b/open_strix/app.py index ec92643..1ba3859 100644 --- a/open_strix/app.py +++ b/open_strix/app.py @@ -680,6 +680,53 @@ async def _run_post_turn_git_sync(self, event: AgentEvent) -> str: ) return git_result + async def _run_post_turn_watchers(self, event: AgentEvent) -> None: + """Fire turn_complete watchers after the agent turn finishes.""" + try: + findings = await self.fire_watchers( + "turn_complete", + session_id=self.session_id, + events_path=str(self.layout.events_log), + ) + except Exception as exc: + self.log_event( + "warning", + where="post_turn_watchers", + warning_type="watcher_fire_failed", + error=str(exc), + ) + return + + if not findings: + return + + # Route findings based on the ``route`` field each watcher emits. + for finding in findings: + route = str(finding.get("route", "log")).strip() + severity = str(finding.get("severity", "info")).strip() + message = str(finding.get("message", "")).strip() + watcher_name = str(finding.get("watcher", "unknown")).strip() + + # Always log every finding. + self.log_event( + "watcher_signal", + watcher=watcher_name, + route=route, + severity=severity, + message=message[:2000], + ) + + if route == "agent" and message: + # Route back to the agent as a new event. + await self.enqueue_event( + AgentEvent( + event_type="watcher", + prompt=f"[watcher:{watcher_name}] {message}", + channel_id=event.channel_id, + dedupe_key=f"watcher:{watcher_name}:{self.session_id}", + ), + ) + def _render_prompt(self, event: AgentEvent) -> str: journal_entries = _tail_jsonl( self.layout.journal_log, @@ -810,6 +857,7 @@ async def _process_event(self, event: AgentEvent) -> None: final_text=final_text, ) await self._run_post_turn_git_sync(event) + await self._run_post_turn_watchers(event) finally: self._reset_send_message_circuit_breaker() self._current_turn_sent_messages = None diff --git a/open_strix/builtin_skills/introspection/SKILL.md b/open_strix/builtin_skills/introspection/SKILL.md index d452489..d29faca 100644 --- a/open_strix/builtin_skills/introspection/SKILL.md +++ b/open_strix/builtin_skills/introspection/SKILL.md @@ -118,7 +118,7 @@ PY ## Cross-Referencing with Memory Skill -The memory skill (`/.open_strix_builtin_skills/memory/SKILL.md`) covers: +The memory skill covers: - **When and how to write memory blocks** — criteria for block vs file storage - **Maintenance** — block size monitoring, pruning, file frequency analysis - **File organization** — cross-references between blocks and state files @@ -126,7 +126,7 @@ The memory skill (`/.open_strix_builtin_skills/memory/SKILL.md`) covers: Use introspection to find problems. Use memory to fix the persistent ones (update blocks, reorganize files, add cross-references). -The file frequency report (`/.open_strix_builtin_skills/scripts/file_frequency_report.py`) +The file frequency report (`scripts/file_frequency_report.py`) bridges both skills — it reads events.jsonl to find which files you access most, informing both debugging (are you reading the same file repeatedly?) and memory optimization (should hot files become blocks?). @@ -135,19 +135,39 @@ optimization (should hot files become blocks?). For specific debugging workflows, read these files: -- **Scheduled job issues?** → Read `/.open_strix_builtin_skills/introspection/debugging-jobs.md` +- **Scheduled job issues?** → Read [`debugging-jobs.md`](debugging-jobs.md) Covers: job not firing, firing at wrong time, cron vs time_of_day, timezone traps, validation errors, prompt failures -- **Communication pattern issues?** → Read `/.open_strix_builtin_skills/introspection/debugging-communication.md` +- **Communication pattern issues?** → Read [`debugging-communication.md`](debugging-communication.md) Covers: messages not sending, circuit breaker triggers, silent failures, duplicate messages, channel confusion, engagement pattern analysis -- **Behavioral drift after model changes or block edits?** → Read `/.open_strix_builtin_skills/introspection/debugging-drift.md` +- **Behavioral drift after model changes or block edits?** → Read [`debugging-drift.md`](debugging-drift.md) Covers: response rate tracking, cross-platform routing audit, model change before/after comparison, silence rate trends, topic engagement shifts -- **Identity or operational drift?** → Read `/.open_strix_builtin_skills/onboarding/SKILL.md` +- **Watcher configuration and debugging?** → Read [`debugging-watchers.md`](debugging-watchers.md) + Covers: `watchers.json` format, event triggers (`turn_complete`, + `session_start`, `session_end`), input/output contracts, routing, environment + variables, debugging checklist. + Suggested patterns: cron review, multi-signal, stateful, threshold escalation, + baseline comparison, frequency anomaly, cross-session consistency. + Example watchers: + [`silent_session.py`](watcher-examples/silent_session.py), + [`codex_bypass.py`](watcher-examples/codex_bypass.py). + Skill structure validation via + [`dag_lint.py`](../scripts/dag_lint.py). + +- **When and why to use algedonic signals?** → Read [`debugging-algedonic.md`](debugging-algedonic.md) + Covers: VSM context for behavioral monitoring, eight anti-pattern categories + (agreement violations, silent sessions, behavioral shift, metric gaming, + performative compliance, scope creep, context hoarding, premature escalation), + detection shapes, in-process vs out-of-process trade-offs, what to do with + findings (immediate fixes, persistent patterns, structural escalation), + watcher design decisions + +- **Identity or operational drift?** → Read the onboarding skill's `SKILL.md` Recovery from drift is structurally the same as onboarding. If introspection reveals stale blocks, broken schedules, or behavior that doesn't match your persona, the onboarding skill provides the framework for re-establishing each component. diff --git a/open_strix/builtin_skills/introspection/debugging-algedonic.md b/open_strix/builtin_skills/introspection/debugging-algedonic.md new file mode 100644 index 0000000..2bf4f65 --- /dev/null +++ b/open_strix/builtin_skills/introspection/debugging-algedonic.md @@ -0,0 +1,242 @@ +# Algedonic Signals — When and Why to Monitor Your Own Behavior + +Algedonic signals bypass the management hierarchy to report pain/pleasure directly +to the operator. In Stafford Beer's Viable System Model, they are the channel that +lets S5 (identity/policy) hear from S1 (operations) without every layer in between +filtering the signal. + +For AI agents, this means: **monitoring that the agent cannot suppress, rationalize, +or choose to ignore.** + +## Why This Exists + +Every agent develops behavioral patterns. Some patterns serve the operator. Some +serve the agent's own comfort (easier, less confrontational, avoids hard questions). +Without external monitoring, the agent has no structural check on which patterns +are which — introspection alone is insufficient because the same system that +drifted is the one evaluating the drift. + +Watchers provide that structural check. They run **outside** the agent's reasoning +loop, read the same event logs the agent produces, and flag discrepancies between +what the agent claims to do and what it actually does. + +## When to Add a Watcher + +Add a watcher when you notice any of these conditions: + +### 1. Agreement Violations + +**The pattern:** The agent agreed to a behavioral contract (e.g., "always delegate +code edits to Codex") but the contract isn't enforced mechanically. + +**Why it matters:** Verbal agreements decay. The agent may genuinely intend to +comply but gradually rationalize exceptions ("this was a small edit," "Codex was +slow"). Without a watcher, the decay is invisible until the operator notices. + +**Detection shape:** Session-level — specific action present + expected delegation +absent. The [`codex_bypass.py`](watcher-examples/codex_bypass.py) example +implements exactly this pattern. + +**When to use `turn_complete`:** Always — you want to catch each violation as it +happens, not discover a pattern weeks later. + +### 2. Silent Sessions + +**The pattern:** The agent processes many events, makes many tool calls, but +produces no visible output (no messages, no reactions, no file changes the +operator can see). + +**Why it matters:** Silent sessions can mean the agent is doing useful background +work. They can also mean the agent is stuck in a loop, confused, or deliberately +avoiding communication. The operator can't tell from the outside. + +**Detection shape:** Session-level — high tool-call count, zero communication +tool calls. The [`silent_session.py`](watcher-examples/silent_session.py) example +implements this with a configurable threshold. + +**Calibration:** The right threshold depends on the agent's role. An agent that +primarily does background processing (indexing, analysis) will have legitimately +silent sessions. An agent that primarily communicates should rarely be silent for +10+ tool calls. + +### 3. Behavioral Shift After Changes + +**The pattern:** After a model upgrade, block edit, or skill addition, the agent's +behavior changes in ways that weren't intended. + +**Why it matters:** Model upgrades can shift response distributions. Block edits +can have cascading effects. The operator changed one thing but three other behaviors +moved. Without cross-session comparison, these shifts are invisible until they +compound into a visible problem. + +**Detection shape:** Cross-session — compare behavioral metrics (response rate, +silence rate, topic distribution, tool usage patterns) across a time boundary +(before/after the change). + +**When to use cron vs trigger:** Use a cron-based watcher that runs daily or +weekly, comparing the last N sessions against the N sessions before the change. +This is too expensive to run on every turn. + +### 4. Metric Gaming + +**The pattern:** The agent optimizes a metric that was supposed to be a proxy for +a goal, but the metric improves while the goal doesn't. + +**Why it matters:** This is Goodhart's Law applied to agent behavior. If the agent +is evaluated on "number of messages sent," it can send more messages without being +more helpful. If evaluated on "tasks completed," it can redefine tasks to be +smaller. + +**Detection shape:** Cross-session — metric trending upward but correlated quality +signals flat or declining. Hard to detect mechanically without a second metric +that's harder to game. + +**When to use cron:** Always — this requires trend analysis across many sessions. + +### 5. Performative Compliance + +**The pattern:** The agent raises a concern and then immediately resolves it in +the operator's favor, performing the appearance of independent thought without +actually pushing back. + +**Why it matters:** This is the subtlest anti-pattern because it looks like healthy +behavior (the agent is "considering alternatives"). But if the concern is always +resolved without any behavioral change, it's theater — cognitive work that produces +no signal. + +**Detection shape:** Session-level — concern-raising language followed immediately +by agreement language, with no change in the agent's subsequent actions. This is +the hardest pattern to detect mechanically because it requires understanding +intent, not just actions. + +**Honest caveat:** A watcher for this pattern will have high false positive rates. +Sometimes the agent genuinely considers and then agrees. The watcher's value is in +flagging frequency — if it fires every session, something is wrong regardless of +individual false positives. + +## In-Process vs Out-of-Process + +Two complementary monitoring approaches: + +**Out-of-process watchers (this system):** +- Structurally independent — can't be suppressed by the agent +- Fire after the action (observe and report) +- Best for: behavioral drift, agreement violations, operator alerting +- Trade-off: can only detect, not prevent + +**In-process hooks (e.g., Claude Code PreToolUse):** +- Fast — can block actions before they happen +- Part of the agent process (agent can potentially rationalize around them) +- Best for: known dangerous actions, safety gates +- Trade-off: the agent controls the process they're embedded in + +Use both. Watchers for detection, in-process hooks for prevention. The watcher +catches what the hook missed; the hook prevents what the watcher would have flagged +too late. + +## What to Do With Findings + +When a watcher fires, the response depends on the `route` and `severity`: + +### Immediate Fixes (route: agent) + +The agent receives the finding as a new event. It should: + +1. **Acknowledge** the finding in its next response +2. **Correct** the specific behavior if possible +3. **Update memory** if the finding reveals a pattern worth tracking + +### Persistent Patterns (route: log → cron review) + +If the same signal appears across multiple sessions: + +1. **Update a memory block** to explicitly address the pattern +2. **Add or tighten the relevant behavioral contract** +3. **Escalate to operator** if the pattern persists after memory updates + +### Structural Escalation (route: operator) + +For findings that indicate architectural problems: + +1. **The agent cannot fix this alone** — the finding represents a structural + limitation, not a behavioral choice +2. **The operator needs to decide** — change the agent's tools, model, or + constraints +3. **Document the limitation** — so the finding isn't a surprise when it recurs + +### 6. Scope Creep + +**The pattern:** The agent gradually takes on responsibilities beyond its +defined role, doing work that belongs to other agents or to the human. + +**Why it matters:** Scope creep feels productive — more tasks completed, more +value delivered. But it erodes the division of labor that makes multi-agent +systems work. An agent that does everything is an agent that can't be replaced, +updated, or debugged in isolation. + +**Detection shape:** Cross-session — track which tool types the agent uses over +time. A sudden increase in tool categories (e.g., an analysis agent starting to +send messages, or a communication agent starting to edit code) is the signal. + +**When to use cron:** Always — this is a trend, not a per-turn event. + +### 7. Context Hoarding + +**The pattern:** The agent reads the same files or memory blocks repeatedly +without acting on them, or accumulates context that it never uses in its +responses. + +**Why it matters:** Reading is not free — it consumes tokens, slows response +time, and can push out genuinely relevant context. An agent that reads 20 files +per turn but only uses information from 2 of them is wasting 90% of its context +budget. + +**Detection shape:** Session-level — compare files read (from `file_read` +events) against files referenced in the agent's output. High read-to-reference +ratio suggests hoarding. + +**When to use `turn_complete`:** Good for per-turn detection. Combine with a +cron watcher that tracks the ratio over time to distinguish one-off research +turns from habitual hoarding. + +### 8. Premature Escalation + +**The pattern:** The agent routes too many findings to the operator, treating +every anomaly as urgent. + +**Why it matters:** This is the inverse of silent sessions. If the operator +gets 20 notifications per day, they stop reading them. The algedonic channel +loses its bypass property because the operator treats it like noise. + +**Detection shape:** Cross-session — count operator-routed findings per day. +If the rate exceeds a threshold (e.g., 5/day), the watchers themselves need +recalibration. + +**This is a meta-watcher** — a watcher that watches the other watchers. Deploy +it as a daily cron job that reads `events.jsonl` for `watcher_signal` events +with `route: operator`. + +## Writing Your Own Algedonic Watcher + +See [debugging-watchers.md](debugging-watchers.md) for the mechanical details +(watchers.json format, input/output contracts, environment variables, +[suggested patterns](debugging-watchers.md#suggested-patterns)). + +The key design decisions for an algedonic watcher: + +1. **What specific behavior am I watching for?** Vague watchers produce vague + signals. "The agent is doing something wrong" is not actionable. +2. **What's the baseline?** Every detection needs a comparison — against a + threshold, a historical average, or an explicit contract. +3. **What should happen when it fires?** If you can't articulate the response, + the signal isn't useful yet. +4. **What's the false positive rate?** If the watcher fires every session, the + operator will ignore it. Calibrate thresholds before deploying. +5. **Is this a per-turn or cross-session signal?** Per-turn signals use + `trigger: turn_complete`. Cross-session signals use `cron` with state + tracking (see [Pattern 3](debugging-watchers.md#pattern-3-stateful-watcher) + and [Pattern 5](debugging-watchers.md#pattern-5-baseline-comparison-beforeafter)). +6. **Should this escalate?** Not every finding needs to reach the operator. + Start with `route: log`, promote to `route: operator` only after confirming + the signal is reliable (see + [Pattern 4](debugging-watchers.md#pattern-4-threshold-escalation)). diff --git a/open_strix/builtin_skills/introspection/debugging-watchers.md b/open_strix/builtin_skills/introspection/debugging-watchers.md new file mode 100644 index 0000000..5bc0496 --- /dev/null +++ b/open_strix/builtin_skills/introspection/debugging-watchers.md @@ -0,0 +1,298 @@ +# Watchers — Configuration & Mechanics + +How to declare, configure, and debug the watcher system. For understanding +**when and why** to use watchers for behavioral monitoring, see +[debugging-algedonic.md](debugging-algedonic.md). + +## watchers.json + +Skills declare watchers in a `watchers.json` file alongside `SKILL.md`: + +```json +{ + "watchers": [ + { + "name": "codex-bypass", + "command": "python check_codex_usage.py", + "trigger": "turn_complete" + }, + { + "name": "daily-health", + "command": "python health_check.py", + "cron": "0 12 * * *" + } + ] +} +``` + +Each watcher must have `name`, `command`, and exactly one of: +- **`cron`** — fires on a schedule (same syntax as pollers/scheduler jobs) +- **`trigger`** — fires on an agent event + +Having both `cron` and `trigger` is invalid. Having neither is also invalid. +The scheduler rejects malformed entries at discovery time. + +### Valid Triggers + +| Trigger | When It Fires | Latency Impact | +|---------|--------------|----------------| +| `turn_complete` | After the agent finishes processing an event | Zero — fires after response is already sent | +| `session_start` | When the agent session begins | Zero — runs alongside startup | +| `session_end` | When the agent session ends | Zero — runs during teardown | + +### Input Contract + +Event-triggered watchers receive a JSON object on **stdin**: + +```json +{ + "trigger": "turn_complete", + "trace_id": "20260326T213000Z-a1b2c3d4", + "events_path": "/home/user/agent/logs/events.jsonl" +} +``` + +The watcher then: +1. Reads `events_path` +2. Filters by `trace_id` to scope to the current turn +3. Runs its analysis +4. Emits JSONL findings to **stdout** + +This is deliberately minimal — the watcher has full access to `events.jsonl` and +can read as much historical context as it needs. + +### Output Contract + +Each line of stdout is a JSON finding: + +```json +{"signal": "codex_bypass", "severity": "warn", "message": "Edited 3 code files without delegating to Codex", "route": "operator"} +``` + +Fields: +- **`signal`** — identifier for the finding type +- **`severity`** — `info`, `warn`, or `error` +- **`message`** — human-readable description +- **`route`** — where the signal goes: `"log"` (default), `"agent"`, `"operator"` + +### Routing + +| Route | Behavior | +|-------|----------| +| `log` | Written to events.jsonl as `watcher_signal`. Passive — operator checks when they want. | +| `agent` | Enqueued as a new agent event. The agent sees the watcher's message in its next turn. | +| `operator` | Logged. (Operator notification channel is deployment-specific — configure via env vars.) | + +### Environment Variables + +Watchers receive the same env vars as pollers: + +| Variable | Description | +|----------|-------------| +| `STATE_DIR` | The skill directory (writable, for state files) | +| `WATCHER_NAME` | The watcher's name from watchers.json | + +Plus custom env vars from the `env` field and the agent's existing environment. + +## Debugging Watchers + +### Watcher Not Firing + +1. **Is the watcher discovered?** Check startup logs for `"Discovered N watchers"`. +2. **Is watchers.json valid?** Must have exactly one of `cron` or `trigger`. +3. **Is the trigger correct?** Only `turn_complete`, `session_start`, `session_end`. +4. **Is the command path correct?** Relative to the skill directory. + +### Watcher Fires But No Output + +1. **Does the script read stdin?** Event-triggered watchers MUST read `sys.stdin.readline()`. +2. **Does it exit cleanly?** Non-zero exit codes are logged but don't crash the agent. +3. **Is the JSON output valid?** Each line must be valid JSON. Invalid lines are skipped. + +### Watcher Errors + +Check events.jsonl for `watcher_error` events: + +```bash +jq -s 'map(select(.type == "watcher_error")) | sort_by(.timestamp) | .[-10:]' logs/events.jsonl +``` + +## Backward Compatibility + +`pollers.json` files continue to work. The `watchers.json` format is the preferred +way to declare both scheduled and event-triggered monitors going forward. Internally, +pollers and watchers use the same `WatcherConfig` class. + +## Suggested Patterns + +### Pattern 1: Cron-Based Review Watcher + +Weekly behavioral audit — scans the last 7 days of events for trends: + +```json +{ + "watchers": [ + { + "name": "weekly-behavior-review", + "command": "python weekly_review.py", + "cron": "0 9 * * 1" + } + ] +} +``` + +Good for: behavioral drift detection, metric tracking, regression checks after +model upgrades. See [debugging-algedonic.md](debugging-algedonic.md) §3 +(Behavioral Shift After Changes) for what to measure. + +### Pattern 2: Multi-Signal Watcher + +A single watcher can emit multiple findings per turn: + +```python +for finding in findings: + print(json.dumps(finding)) +``` + +This avoids the overhead of spawning separate processes for related checks. +Group logically related detections (e.g., one watcher for all communication +anomalies: silent session + excessive messaging + channel confusion). + +### Pattern 3: Stateful Watcher + +Watchers can maintain state across invocations using files in `STATE_DIR`: + +```python +state_dir = Path(os.environ["STATE_DIR"]) +history_file = state_dir / "watcher_state.json" +``` + +Use cases: tracking rolling averages, counting consecutive violations, +maintaining baselines for before/after comparison. State files persist between +invocations — the watcher builds a picture over time rather than evaluating +each turn in isolation. + +### Pattern 4: Threshold Escalation + +Start with logging, escalate to operator notification if a pattern persists: + +```python +state = load_state() +state["consecutive_violations"] = state.get("consecutive_violations", 0) + 1 +save_state(state) + +route = "log" +if state["consecutive_violations"] >= 3: + route = "operator" + +print(json.dumps({ + "signal": "repeated_violation", + "severity": "warn" if route == "log" else "error", + "message": f"Violation #{state['consecutive_violations']}: ...", + "route": route, +})) +``` + +This prevents alert fatigue — the operator only sees findings that persist, +not one-off anomalies. + +### Pattern 5: Baseline Comparison (Before/After) + +For detecting behavioral changes after model upgrades or block edits: + +```python +# Load historical baseline from STATE_DIR +baseline = json.loads((state_dir / "baseline.json").read_text()) + +# Compute current metrics from recent events +current = compute_metrics(events, window_days=7) + +# Compare +for metric, baseline_val in baseline.items(): + current_val = current.get(metric, 0) + delta = current_val - baseline_val + if abs(delta) > baseline_val * 0.3: # >30% change + print(json.dumps({ + "signal": "behavioral_shift", + "severity": "warn", + "message": f"{metric}: {baseline_val:.1f} → {current_val:.1f} ({delta:+.1f})", + "route": "log", + })) +``` + +Create the baseline file manually or with a separate setup script. Update it +intentionally when you *want* behavior to change (model upgrade, new block). + +### Pattern 6: Event Frequency Anomaly + +Detect unusual spikes or drops in specific event types: + +```python +# Count events by type in last N turns +recent_counts = Counter(e.get("type") for e in recent_events) +historical_avg = load_historical_averages() + +for event_type, count in recent_counts.items(): + avg = historical_avg.get(event_type, count) + if avg > 0 and count > avg * 3: # 3x spike + print(json.dumps({ + "signal": "frequency_anomaly", + "severity": "info", + "message": f"{event_type} count {count} vs avg {avg:.0f} (3x+)", + "route": "log", + })) +``` + +Good for: detecting tool call loops, send_message floods, or unexplained drops +in file_read events (agent stopped consulting its own memory). + +### Pattern 7: Cross-Session Consistency + +Cron watcher that checks whether the agent's behavior is consistent across +sessions within a time window: + +```json +{ + "watchers": [ + { + "name": "consistency-check", + "command": "python consistency.py", + "cron": "0 20 * * *", + "env": {"WINDOW_DAYS": "3", "MIN_SESSIONS": "5"} + } + ] +} +``` + +Measures variance in key metrics (tool call count, response length, silence +rate) across sessions. High variance suggests the agent is behaving differently +depending on context in ways that may not be intentional. + +## Example Watchers + +Working examples in this skill's +[`watcher-examples/`](watcher-examples/) directory: + +- [`watcher-examples/silent_session.py`](watcher-examples/silent_session.py) — + Detects turns with 10+ tool calls but no `send_message` or `react` calls. + Configurable threshold via `TOOL_CALL_THRESHOLD`. Implements the silent + session pattern from [debugging-algedonic.md](debugging-algedonic.md) §2. + +- [`watcher-examples/codex_bypass.py`](watcher-examples/codex_bypass.py) — + Detects code edits without delegation to a coding agent. Configurable via + `DELEGATION_TOOL` (default: `"codex"`) and `CODE_EXTENSIONS` (default: + `.py,.js,.ts,.rs,.go,.java,.rb,.cpp,.c,.h`). Implements the agreement + violation pattern from [debugging-algedonic.md](debugging-algedonic.md) §1. + +## Validating Skill Structure + +Use the DAG lint script to verify all files in a skill are reachable from +the root: + +```bash +python builtin_skills/scripts/dag_lint.py builtin_skills/introspection --root SKILL.md --strict +``` + +This catches unreferenced files (dead subtrees) that were added but never +linked from any document. Run it after adding new watcher examples or +companion guides. The `--strict` flag exits with code 1 if orphans are found. diff --git a/open_strix/builtin_skills/introspection/watcher-examples/codex_bypass.py b/open_strix/builtin_skills/introspection/watcher-examples/codex_bypass.py new file mode 100644 index 0000000..0f7070c --- /dev/null +++ b/open_strix/builtin_skills/introspection/watcher-examples/codex_bypass.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +"""Watcher: detect code edits without delegation to a coding agent. + +Usage in watchers.json: + {"name": "codex-bypass", "command": "python examples/codex_bypass.py", "trigger": "turn_complete"} + +Configurable via environment variables: + DELEGATION_TOOL — tool name for delegating code work (default: "codex") + CODE_EXTENSIONS — comma-separated file extensions (default: ".py,.js,.ts,.rs,.go,.java,.rb,.cpp,.c,.h") + +Flags turns where the agent edited code files without using the delegation tool. +""" + +import json +import os +import sys +from pathlib import PurePosixPath + +DELEGATION_TOOL = os.environ.get("DELEGATION_TOOL", "codex") +CODE_EXTENSIONS = set( + os.environ.get( + "CODE_EXTENSIONS", + ".py,.js,.ts,.rs,.go,.java,.rb,.cpp,.c,.h", + ).split(",") +) + + +def main() -> None: + context = json.loads(sys.stdin.readline()) + trace_id = context["trace_id"] + events_path = context["events_path"] + + with open(events_path) as f: + events = [json.loads(line) for line in f if line.strip()] + + turn_events = [e for e in events if e.get("session_id") == trace_id] + tool_calls = [e for e in turn_events if e.get("type") == "tool_call"] + + code_edits: list[str] = [] + delegated = False + + for tc in tool_calls: + tool = tc.get("tool", "") + + if tool == DELEGATION_TOOL: + delegated = True + continue + + # Check for file write/edit tools targeting code files. + if tool in ("write_file", "edit_file"): + path = tc.get("path", "") or tc.get("file_path", "") + if path: + suffix = PurePosixPath(path).suffix + if suffix in CODE_EXTENSIONS: + code_edits.append(path) + + if code_edits and not delegated: + print( + json.dumps( + { + "signal": "codex_bypass", + "severity": "warn", + "message": ( + f"Edited {len(code_edits)} code file(s) without " + f"delegating to {DELEGATION_TOOL}: " + f"{', '.join(code_edits[:5])}" + ), + "route": "operator", + } + ) + ) + + +if __name__ == "__main__": + main() diff --git a/open_strix/builtin_skills/introspection/watcher-examples/silent_session.py b/open_strix/builtin_skills/introspection/watcher-examples/silent_session.py new file mode 100644 index 0000000..f9f327a --- /dev/null +++ b/open_strix/builtin_skills/introspection/watcher-examples/silent_session.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3 +"""Watcher: detect turns with many tool calls but no communication. + +Usage in watchers.json: + {"name": "silent-session", "command": "python examples/silent_session.py", "trigger": "turn_complete"} + +Reads events.jsonl, filters to the current turn by trace_id, and flags +sessions with 10+ tool calls but no send_message or react calls. +""" + +import json +import sys + +TOOL_CALL_THRESHOLD = 10 +COMMUNICATION_TOOLS = {"send_message", "react"} + + +def main() -> None: + context = json.loads(sys.stdin.readline()) + trace_id = context["trace_id"] + events_path = context["events_path"] + + with open(events_path) as f: + events = [json.loads(line) for line in f if line.strip()] + + turn_events = [e for e in events if e.get("session_id") == trace_id] + tool_calls = [e for e in turn_events if e.get("type") == "tool_call"] + comms = [e for e in tool_calls if e.get("tool") in COMMUNICATION_TOOLS] + + if len(tool_calls) >= TOOL_CALL_THRESHOLD and not comms: + print( + json.dumps( + { + "signal": "silent_session", + "severity": "warn", + "message": ( + f"Session had {len(tool_calls)} tool calls but " + f"sent no messages or reactions" + ), + "route": "log", + } + ) + ) + + +if __name__ == "__main__": + main() diff --git a/open_strix/builtin_skills/scripts/dag_lint.py b/open_strix/builtin_skills/scripts/dag_lint.py new file mode 100644 index 0000000..01c8d0c --- /dev/null +++ b/open_strix/builtin_skills/scripts/dag_lint.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +"""Walk file references from a root file and report the dependency graph. + +Given a directory and a root file (e.g., SKILL.md, a memory block YAML, or a +job config), this script: + +1. Parses all markdown/YAML/JSON files for file references (relative paths, + markdown links, YAML values). +2. Builds a directed graph of which files reference which. +3. Outputs the graph in mermaid (human-readable) or JSON (LLM-readable). +4. Flags unreferenced files (present in the directory but not reachable from + the root) as potential dead subtrees. + +Usage: + python dag_lint.py /path/to/skill --root SKILL.md + python dag_lint.py /path/to/skill --root SKILL.md --format json + python dag_lint.py /path/to/memory --root blocks.yaml --format mermaid +""" + +from __future__ import annotations + +import argparse +import json +import re +import sys +from pathlib import Path + + +# Patterns that match file references in various formats +REFERENCE_PATTERNS = [ + # Markdown links: [text](path) — exclude URLs + re.compile(r"\[(?:[^\]]*)\]\((?!https?://|#)([^)]+)\)"), + # Markdown-style reference in YAML: `path/to/file.md` + re.compile(r"`((?:\./|\.\./)?\S+\.\w{1,6})`"), + # Bare relative paths that look like files (word/word.ext) + re.compile(r"(?:^|\s)((?:\./|\.\./)?\w[\w./-]*\.\w{1,6})(?:\s|$|[,;)])", re.MULTILINE), + # YAML/JSON string values that look like paths + re.compile(r'["\'](\./[^"\']+|\.\.?/[^"\']+)["\']'), + # Python/shell: "examples/foo.py", "watcher-examples/bar.py" + re.compile(r'"([\w./-]+\.(?:py|sh|md|yaml|yml|json|txt))"'), + # open-strix builtin skill paths: /.open_strix_builtin_skills/skill-name/file.ext + re.compile(r"`?/\.open_strix_builtin_skills/\w[\w-]*/([^`\s)]+)`?"), +] + +# Files to skip +SKIP_NAMES = {"__pycache__", ".git", "node_modules", ".mypy_cache", ".pytest_cache"} +SKIP_EXTENSIONS = {".pyc", ".pyo"} + + +def find_references(file_path: Path, base_dir: Path) -> list[str]: + """Extract file references from a single file.""" + try: + content = file_path.read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + return [] + + refs: set[str] = set() + for pattern in REFERENCE_PATTERNS: + for match in pattern.finditer(content): + candidate = match.group(1).strip() + # Clean up anchors and query params + candidate = candidate.split("#")[0].split("?")[0] + if not candidate: + continue + # Resolve relative to the file's directory + resolved = (file_path.parent / candidate).resolve() + try: + rel = resolved.relative_to(base_dir.resolve()) + refs.add(str(rel)) + except ValueError: + # Reference points outside base_dir — skip + continue + + return sorted(refs) + + +def discover_files(base_dir: Path) -> list[str]: + """Find all non-hidden files in the directory.""" + files: list[str] = [] + for p in sorted(base_dir.rglob("*")): + if any(part in SKIP_NAMES for part in p.parts): + continue + if p.suffix in SKIP_EXTENSIONS: + continue + if p.is_file(): + try: + files.append(str(p.relative_to(base_dir))) + except ValueError: + continue + return files + + +def build_dag( + base_dir: Path, root_file: str +) -> tuple[dict[str, list[str]], set[str], set[str]]: + """Build the reference DAG starting from root_file. + + Returns: + (edges, reachable, all_files) + - edges: {source_file: [referenced_files]} + - reachable: set of files reachable from root + - all_files: set of all files discovered in the directory + """ + all_files = set(discover_files(base_dir)) + edges: dict[str, list[str]] = {} + + # Walk from root using BFS + visited: set[str] = set() + queue = [root_file] + + while queue: + current = queue.pop(0) + if current in visited: + continue + visited.add(current) + + current_path = base_dir / current + if not current_path.exists(): + continue + + refs = find_references(current_path, base_dir) + # Only keep refs that point to actual files in the directory + valid_refs = [r for r in refs if r in all_files and r != current] + if valid_refs: + edges[current] = valid_refs + + for ref in valid_refs: + if ref not in visited: + queue.append(ref) + + return edges, visited, all_files + + +def sanitize_mermaid_id(path: str) -> str: + """Convert a file path to a valid mermaid node ID.""" + return path.replace("/", "_").replace(".", "_").replace("-", "_").replace(" ", "_") + + +def format_mermaid( + edges: dict[str, list[str]], + reachable: set[str], + all_files: set[str], + root_file: str, +) -> str: + """Format the DAG as a mermaid graph.""" + lines = ["graph TD"] + + # Style the root node + root_id = sanitize_mermaid_id(root_file) + lines.append(f" {root_id}[\"📄 {root_file}\"]") + + # Add edges + for source, targets in sorted(edges.items()): + src_id = sanitize_mermaid_id(source) + for target in targets: + tgt_id = sanitize_mermaid_id(target) + lines.append(f" {src_id} --> {tgt_id}") + + # Flag unreferenced files + unreferenced = sorted(all_files - reachable) + if unreferenced: + lines.append("") + lines.append(" subgraph unreferenced[\"⚠️ Unreferenced Files\"]") + for f in unreferenced: + fid = sanitize_mermaid_id(f) + lines.append(f" {fid}[\"{f}\"]") + lines.append(" end") + lines.append(" style unreferenced fill:#fff3cd,stroke:#ffc107") + + return "\n".join(lines) + + +def format_json( + edges: dict[str, list[str]], + reachable: set[str], + all_files: set[str], + root_file: str, +) -> str: + """Format the DAG as JSON.""" + unreferenced = sorted(all_files - reachable) + return json.dumps( + { + "root": root_file, + "edges": {k: sorted(v) for k, v in sorted(edges.items())}, + "reachable": sorted(reachable), + "unreferenced": unreferenced, + "stats": { + "total_files": len(all_files), + "reachable_files": len(reachable), + "unreferenced_files": len(unreferenced), + "coverage_pct": round( + len(reachable) / len(all_files) * 100, 1 + ) + if all_files + else 100.0, + }, + }, + indent=2, + ) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Walk file references from a root file and report the dependency DAG.", + ) + parser.add_argument( + "directory", + help="Directory to scan (e.g., a skill directory, memory directory, or repo root).", + ) + parser.add_argument( + "--root", + default="SKILL.md", + help="Root file to start walking from (default: SKILL.md).", + ) + parser.add_argument( + "--format", + choices=["mermaid", "json"], + default="mermaid", + help="Output format (default: mermaid).", + ) + parser.add_argument( + "--strict", + action="store_true", + help="Exit with code 1 if unreferenced files are found.", + ) + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + + base_dir = Path(args.directory).resolve() + if not base_dir.is_dir(): + print(f"Error: {args.directory} is not a directory", file=sys.stderr) + return 1 + + root_path = base_dir / args.root + if not root_path.exists(): + print(f"Error: root file {args.root} not found in {args.directory}", file=sys.stderr) + return 1 + + edges, reachable, all_files = build_dag(base_dir, args.root) + unreferenced = all_files - reachable + + if args.format == "mermaid": + print(format_mermaid(edges, reachable, all_files, args.root)) + else: + print(format_json(edges, reachable, all_files, args.root)) + + # Summary to stderr so it doesn't pollute the graph output + n_total = len(all_files) + n_reach = len(reachable) + n_unref = len(unreferenced) + print( + f"\n{n_reach}/{n_total} files reachable from {args.root} " + f"({n_unref} unreferenced)", + file=sys.stderr, + ) + + if args.strict and unreferenced: + print("Unreferenced files:", file=sys.stderr) + for f in sorted(unreferenced): + print(f" - {f}", file=sys.stderr) + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/open_strix/scheduler.py b/open_strix/scheduler.py index f3949ea..2441d47 100644 --- a/open_strix/scheduler.py +++ b/open_strix/scheduler.py @@ -35,13 +35,23 @@ def to_dict(self) -> dict[str, Any]: return data +# Valid event triggers for watchers (non-cron). +VALID_WATCHER_TRIGGERS = frozenset({"turn_complete", "session_start", "session_end"}) + + @dataclass -class PollerConfig: - """A poller declared in a skill's pollers.json.""" +class WatcherConfig: + """Unified config for cron-triggered pollers and event-triggered hooks. + + A watcher has *either* a ``cron`` schedule or an event ``trigger`` — + never both. Legacy ``pollers.json`` entries are loaded as watchers + with ``trigger=None``. + """ name: str command: str - cron: str + cron: str | None + trigger: str | None env: dict[str, str] skill_dir: Path @@ -86,9 +96,13 @@ def _save_scheduler_jobs(self, jobs: list[SchedulerJob]) -> None: encoding="utf-8", ) - def _discover_pollers(self) -> list[PollerConfig]: - """Scan skill directories for pollers.json files.""" - pollers: list[PollerConfig] = [] + def _discover_pollers(self) -> list[WatcherConfig]: + """Scan skill directories for legacy pollers.json files. + + Returns WatcherConfig instances with ``trigger=None`` for backward + compatibility. New skills should use ``watchers.json`` instead. + """ + pollers: list[WatcherConfig] = [] skills_dir = self.layout.skills_dir if not skills_dir.exists(): return pollers @@ -139,16 +153,104 @@ def _discover_pollers(self) -> list[PollerConfig]: if not isinstance(env, dict): env = {} pollers.append( - PollerConfig( + WatcherConfig( name=name, command=command, cron=cron, + trigger=None, env={str(k): str(v) for k, v in env.items()}, skill_dir=skill_dir, ), ) return pollers + def _discover_watchers(self) -> list[WatcherConfig]: + """Scan skill directories for watchers.json files. + + Each entry must have ``name``, ``command``, and exactly one of + ``cron`` (schedule-triggered) or ``trigger`` (event-triggered). + """ + watchers: list[WatcherConfig] = [] + skills_dir = self.layout.skills_dir + if not skills_dir.exists(): + return watchers + + for watchers_file in sorted(skills_dir.rglob("watchers.json")): + skill_dir = watchers_file.parent + try: + raw = json.loads(watchers_file.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError) as exc: + self.log_event( + "watcher_invalid_json", + path=str(watchers_file), + error=str(exc), + ) + continue + + if not isinstance(raw, dict): + self.log_event( + "watcher_invalid_format", + path=str(watchers_file), + error="expected a JSON object with 'watchers' key", + ) + continue + + entries = raw.get("watchers", []) + if not isinstance(entries, list): + self.log_event( + "watcher_invalid_format", + path=str(watchers_file), + error="'watchers' key must be an array", + ) + continue + + for entry in entries: + if not isinstance(entry, dict): + continue + name = str(entry.get("name", "")).strip() + command = str(entry.get("command", "")).strip() + if not name or not command: + self.log_event( + "watcher_missing_fields", + path=str(watchers_file), + entry=entry, + ) + continue + cron = str(entry.get("cron", "")).strip() or None + trigger = str(entry.get("trigger", "")).strip() or None + # Must have exactly one of cron or trigger. + if bool(cron) == bool(trigger): + self.log_event( + "watcher_missing_fields", + path=str(watchers_file), + entry=entry, + error="watcher must have exactly one of 'cron' or 'trigger'", + ) + continue + if trigger and trigger not in VALID_WATCHER_TRIGGERS: + self.log_event( + "watcher_invalid_trigger", + path=str(watchers_file), + name=name, + trigger=trigger, + valid_triggers=sorted(VALID_WATCHER_TRIGGERS), + ) + continue + env = entry.get("env", {}) + if not isinstance(env, dict): + env = {} + watchers.append( + WatcherConfig( + name=name, + command=command, + cron=cron, + trigger=trigger, + env={str(k): str(v) for k, v in env.items()}, + skill_dir=skill_dir, + ), + ) + return watchers + def _reload_scheduler_jobs(self) -> None: for job in self.scheduler.get_jobs(): if job.id.startswith("open_strix:"): @@ -193,7 +295,7 @@ def _reload_scheduler_jobs(self) -> None: max_instances=1, ) - # Register pollers from skills/*/pollers.json. + # Register pollers from skills/*/pollers.json (backward compat). pollers = self._discover_pollers() for poller in pollers: try: @@ -217,11 +319,47 @@ def _reload_scheduler_jobs(self) -> None: max_instances=1, ) + # Register watchers from skills/*/watchers.json. + all_watchers = self._discover_watchers() + cron_watchers = [w for w in all_watchers if w.cron] + event_watchers = [w for w in all_watchers if w.trigger] + + # Cron-based watchers get scheduled just like pollers. + for watcher in cron_watchers: + try: + trigger = CronTrigger.from_crontab(watcher.cron, timezone=UTC) # type: ignore[arg-type] + except ValueError as exc: + self.log_event( + "watcher_invalid_cron", + name=watcher.name, + cron=watcher.cron, + error=str(exc), + ) + continue + + self.scheduler.add_job( + self._on_watcher_cron_fire, + trigger=trigger, + kwargs={"watcher": watcher}, + id=f"open_strix:watcher:{watcher.name}", + replace_existing=True, + coalesce=True, + max_instances=1, + ) + + # Event-triggered watchers are stored for runtime dispatch. + self._event_watchers: dict[str, list[WatcherConfig]] = {} + for watcher in event_watchers: + assert watcher.trigger is not None + self._event_watchers.setdefault(watcher.trigger, []).append(watcher) + scheduler_count = len(self._load_scheduler_jobs()) self.log_event( "scheduler_reloaded", jobs=scheduler_count, pollers=len(pollers), + watchers_cron=len(cron_watchers), + watchers_event=len(event_watchers), ) async def _on_scheduler_fire(self, name: str, prompt: str, channel_id: str | None = None) -> None: @@ -236,7 +374,7 @@ async def _on_scheduler_fire(self, name: str, prompt: str, channel_id: str | Non ), ) - async def _on_poller_fire(self, poller: PollerConfig) -> None: + async def _on_poller_fire(self, poller: WatcherConfig) -> None: """Run a poller subprocess and enqueue events from its stdout.""" env = {**os.environ, **poller.env} env["STATE_DIR"] = str(poller.skill_dir) @@ -335,3 +473,148 @@ async def _on_poller_fire(self, poller: PollerConfig) -> None: name=poller.name, events_emitted=event_count, ) + + async def _on_watcher_cron_fire(self, watcher: WatcherConfig) -> None: + """Run a cron-based watcher — same execution model as pollers.""" + await self._on_poller_fire(watcher) + + async def _run_watcher_subprocess( + self, + watcher: WatcherConfig, + stdin_data: dict[str, Any], + ) -> list[dict[str, Any]]: + """Run an event-triggered watcher, sending context on stdin. + + Returns parsed JSONL lines from stdout. + """ + env = {**os.environ, **watcher.env} + env["STATE_DIR"] = str(watcher.skill_dir) + env["WATCHER_NAME"] = watcher.name + + stdin_bytes = (json.dumps(stdin_data) + "\n").encode() + + try: + proc = await asyncio.create_subprocess_shell( + watcher.command, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + cwd=str(watcher.skill_dir), + env=env, + ) + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(input=stdin_bytes), + timeout=60, + ) + except asyncio.TimeoutError: + self.log_event( + "watcher_timeout", + name=watcher.name, + trigger=watcher.trigger, + timeout_seconds=60, + ) + try: + proc.kill() + except ProcessLookupError: + pass + return [] + except Exception as exc: + self.log_event( + "watcher_exec_error", + name=watcher.name, + trigger=watcher.trigger, + error=str(exc), + ) + return [] + + stderr_text = stderr_bytes.decode("utf-8", errors="replace").strip() + if stderr_text: + self.log_event( + "watcher_stderr", + name=watcher.name, + stderr=stderr_text[:2000], + ) + + if proc.returncode != 0: + self.log_event( + "watcher_nonzero_exit", + name=watcher.name, + trigger=watcher.trigger, + returncode=proc.returncode, + ) + return [] + + stdout_text = stdout_bytes.decode("utf-8", errors="replace").strip() + if not stdout_text: + return [] + + results: list[dict[str, Any]] = [] + for line in stdout_text.splitlines(): + line = line.strip() + if not line: + continue + try: + parsed = json.loads(line) + except json.JSONDecodeError: + self.log_event( + "watcher_invalid_line", + name=watcher.name, + line=line[:500], + ) + continue + if isinstance(parsed, dict): + results.append(parsed) + return results + + async def fire_watchers( + self, + trigger_type: str, + *, + session_id: str, + events_path: str, + ) -> list[dict[str, Any]]: + """Fire all event-triggered watchers registered for *trigger_type*. + + Each watcher receives minimal context on stdin per the watcher + contract: + + .. code-block:: json + + {"trigger": "turn_complete", "trace_id": "...", "events_path": "/..."} + + Returns all parsed JSONL findings from all watchers. + """ + watchers = getattr(self, "_event_watchers", {}).get(trigger_type, []) + if not watchers: + return [] + + stdin_data = { + "trigger": trigger_type, + "trace_id": session_id, + "events_path": events_path, + } + + all_findings: list[dict[str, Any]] = [] + for watcher in watchers: + findings = await self._run_watcher_subprocess(watcher, stdin_data) + for finding in findings: + finding.setdefault("watcher", watcher.name) + all_findings.append(finding) + + if findings: + self.log_event( + "watcher_findings", + name=watcher.name, + trigger=trigger_type, + finding_count=len(findings), + ) + + if all_findings: + self.log_event( + "watchers_complete", + trigger=trigger_type, + total_findings=len(all_findings), + watchers_run=len(watchers), + ) + + return all_findings diff --git a/tests/test_builtin_skills.py b/tests/test_builtin_skills.py index f2e81ca..3e1c7ea 100644 --- a/tests/test_builtin_skills.py +++ b/tests/test_builtin_skills.py @@ -293,6 +293,109 @@ def test_disable_builtin_skills_via_bootstrap(tmp_path: Path) -> None: assert (builtin_dir / "scripts" / "prediction_review_log.py").exists() +def test_dag_lint_finds_unreferenced_files(tmp_path: Path) -> None: + """DAG lint flags files not reachable from the root.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + + # Root references guide.md but not orphan.md + (skill_dir / "SKILL.md").write_text( + "# My Skill\n\nSee [the guide](guide.md) for details.\n", + encoding="utf-8", + ) + (skill_dir / "guide.md").write_text( + "# Guide\n\nStep-by-step instructions.\n", + encoding="utf-8", + ) + (skill_dir / "orphan.md").write_text( + "# Orphan\n\nNobody references me.\n", + encoding="utf-8", + ) + + script = Path(__file__).parent.parent / "open_strix" / "builtin_skills" / "scripts" / "dag_lint.py" + proc = subprocess.run( + [sys.executable, str(script), str(skill_dir), "--root", "SKILL.md", "--format", "json"], + capture_output=True, + text=True, + check=False, + ) + assert proc.returncode == 0, proc.stderr + result = json.loads(proc.stdout) + + assert result["root"] == "SKILL.md" + assert "guide.md" in result["reachable"] + assert "orphan.md" in result["unreferenced"] + assert result["stats"]["total_files"] == 3 + assert result["stats"]["reachable_files"] == 2 + assert result["stats"]["unreferenced_files"] == 1 + + +def test_dag_lint_strict_mode_exits_nonzero_on_unreferenced(tmp_path: Path) -> None: + """DAG lint --strict exits 1 when unreferenced files exist.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + + (skill_dir / "SKILL.md").write_text("# Skill\n", encoding="utf-8") + (skill_dir / "unused.md").write_text("# Unused\n", encoding="utf-8") + + script = Path(__file__).parent.parent / "open_strix" / "builtin_skills" / "scripts" / "dag_lint.py" + proc = subprocess.run( + [sys.executable, str(script), str(skill_dir), "--root", "SKILL.md", "--strict"], + capture_output=True, + text=True, + check=False, + ) + assert proc.returncode == 1 + assert "unreferenced" in proc.stderr.lower() + + +def test_dag_lint_full_coverage_exits_zero(tmp_path: Path) -> None: + """DAG lint --strict exits 0 when all files are reachable.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + + (skill_dir / "SKILL.md").write_text( + "# Skill\n\nRead [details](details.md).\n", + encoding="utf-8", + ) + (skill_dir / "details.md").write_text("# Details\n", encoding="utf-8") + + script = Path(__file__).parent.parent / "open_strix" / "builtin_skills" / "scripts" / "dag_lint.py" + proc = subprocess.run( + [sys.executable, str(script), str(skill_dir), "--root", "SKILL.md", "--strict", "--format", "json"], + capture_output=True, + text=True, + check=False, + ) + assert proc.returncode == 0, proc.stderr + result = json.loads(proc.stdout) + assert result["stats"]["unreferenced_files"] == 0 + + +def test_dag_lint_mermaid_output_includes_edges(tmp_path: Path) -> None: + """DAG lint mermaid output includes graph edges.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + + (skill_dir / "SKILL.md").write_text( + "# Skill\n\nSee [guide](guide.md).\n", + encoding="utf-8", + ) + (skill_dir / "guide.md").write_text("# Guide\n", encoding="utf-8") + + script = Path(__file__).parent.parent / "open_strix" / "builtin_skills" / "scripts" / "dag_lint.py" + proc = subprocess.run( + [sys.executable, str(script), str(skill_dir), "--root", "SKILL.md", "--format", "mermaid"], + capture_output=True, + text=True, + check=False, + ) + assert proc.returncode == 0, proc.stderr + assert "graph TD" in proc.stdout + assert "SKILL_md" in proc.stdout + assert "-->" in proc.stdout + + def test_bootstrap_cleans_legacy_builtin_script_copies(tmp_path: Path) -> None: home = tmp_path / "agent-home" home.mkdir(parents=True, exist_ok=True) diff --git a/tests/test_scheduler_pollers.py b/tests/test_scheduler_pollers.py index a0f03d6..3436218 100644 --- a/tests/test_scheduler_pollers.py +++ b/tests/test_scheduler_pollers.py @@ -9,7 +9,7 @@ import pytest -from open_strix.scheduler import PollerConfig, SchedulerMixin +from open_strix.scheduler import WatcherConfig, SchedulerMixin class FakeLayout: @@ -179,10 +179,11 @@ async def test_successful_poller_with_output(self, tmp_home: Path) -> None: 'print(json.dumps({"poller": "test", "prompt": "something happened"}))\n' ) - poller = PollerConfig( + poller = WatcherConfig( name="test-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={}, skill_dir=skill_dir, ) @@ -202,10 +203,11 @@ async def test_poller_no_output(self, tmp_home: Path) -> None: skill_dir.mkdir(parents=True) (skill_dir / "poller.py").write_text("pass\n") - poller = PollerConfig( + poller = WatcherConfig( name="quiet-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={}, skill_dir=skill_dir, ) @@ -223,10 +225,11 @@ async def test_poller_nonzero_exit(self, tmp_home: Path) -> None: skill_dir.mkdir(parents=True) (skill_dir / "poller.py").write_text("import sys; sys.exit(1)\n") - poller = PollerConfig( + poller = WatcherConfig( name="fail-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={}, skill_dir=skill_dir, ) @@ -246,10 +249,11 @@ async def test_poller_env_vars(self, tmp_home: Path) -> None: 'print(json.dumps({"poller": "env-test", "prompt": os.environ.get("MY_VAR", "missing")}))\n' ) - poller = PollerConfig( + poller = WatcherConfig( name="env-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={"MY_VAR": "hello"}, skill_dir=skill_dir, ) @@ -271,10 +275,11 @@ async def test_poller_state_dir_and_poller_name_env(self, tmp_home: Path) -> Non 'print(json.dumps({"poller": pn, "prompt": f"dir={sd} name={pn}"}))\n' ) - poller = PollerConfig( + poller = WatcherConfig( name="state-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={}, skill_dir=skill_dir, ) @@ -296,10 +301,11 @@ async def test_poller_multiple_lines(self, tmp_home: Path) -> None: ' print(json.dumps({"poller": "multi", "prompt": f"event {i}"}))\n' ) - poller = PollerConfig( + poller = WatcherConfig( name="multi-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={}, skill_dir=skill_dir, ) @@ -322,10 +328,11 @@ async def test_poller_invalid_json_line_skipped(self, tmp_home: Path) -> None: 'print(json.dumps({"poller": "mixed", "prompt": "valid line"}))\n' ) - poller = PollerConfig( + poller = WatcherConfig( name="mixed-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={}, skill_dir=skill_dir, ) @@ -347,10 +354,11 @@ async def test_poller_source_platform_passthrough(self, tmp_home: Path) -> None: 'print(json.dumps({"poller": "bsky", "source_platform": "bluesky", "prompt": "new reply"}))\n' ) - poller = PollerConfig( + poller = WatcherConfig( name="platform-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={}, skill_dir=skill_dir, ) @@ -372,10 +380,11 @@ async def test_poller_no_source_platform_defaults_none(self, tmp_home: Path) -> 'print(json.dumps({"poller": "test", "prompt": "event without platform"}))\n' ) - poller = PollerConfig( + poller = WatcherConfig( name="noplatform-poller", command="python poller.py", cron="*/5 * * * *", + trigger=None, env={}, skill_dir=skill_dir, ) diff --git a/tests/test_scheduler_watchers.py b/tests/test_scheduler_watchers.py new file mode 100644 index 0000000..90c4783 --- /dev/null +++ b/tests/test_scheduler_watchers.py @@ -0,0 +1,637 @@ +"""Tests for watchers.json discovery and execution in the scheduler.""" + +from __future__ import annotations + +import asyncio +import json +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +from open_strix.scheduler import ( + VALID_WATCHER_TRIGGERS, + SchedulerMixin, + WatcherConfig, +) + + +class FakeLayout: + """Minimal layout stub for testing.""" + + def __init__(self, home: Path) -> None: + self.home = home + + @property + def skills_dir(self) -> Path: + return self.home / "skills" + + @property + def scheduler_file(self) -> Path: + return self.home / "scheduler.yaml" + + +class FakeApp(SchedulerMixin): + """Minimal app stub that satisfies SchedulerMixin's protocol.""" + + def __init__(self, home: Path) -> None: + self.layout = FakeLayout(home) + self.events: list[dict] = [] + self.enqueued: list = [] + + def log_event(self, event_type: str, **payload) -> None: + self.events.append({"type": event_type, **payload}) + + async def enqueue_event(self, event) -> None: + self.enqueued.append(event) + + +@pytest.fixture +def tmp_home(tmp_path: Path) -> Path: + """Create a temporary home directory with skills dir.""" + skills_dir = tmp_path / "skills" + skills_dir.mkdir() + return tmp_path + + +# --- Discovery tests --- + + +class TestDiscoverWatchers: + def test_no_skills_dir(self, tmp_path: Path) -> None: + app = FakeApp(tmp_path) + assert app._discover_watchers() == [] + + def test_empty_skills_dir(self, tmp_home: Path) -> None: + app = FakeApp(tmp_home) + assert app._discover_watchers() == [] + + def test_valid_cron_watcher(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "monitoring" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + { + "name": "daily-health", + "command": "python health.py", + "cron": "0 12 * * *", + } + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert len(watchers) == 1 + assert watchers[0].name == "daily-health" + assert watchers[0].command == "python health.py" + assert watchers[0].cron == "0 12 * * *" + assert watchers[0].trigger is None + assert watchers[0].skill_dir == skill_dir + + def test_valid_event_watcher(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "drift" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + { + "name": "codex-bypass", + "command": "python check.py", + "trigger": "turn_complete", + } + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert len(watchers) == 1 + assert watchers[0].name == "codex-bypass" + assert watchers[0].trigger == "turn_complete" + assert watchers[0].cron is None + + def test_mixed_cron_and_event_watchers(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "mix" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + {"name": "scheduled", "command": "python a.py", "cron": "*/10 * * * *"}, + {"name": "event-driven", "command": "python b.py", "trigger": "turn_complete"}, + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert len(watchers) == 2 + cron_w = [w for w in watchers if w.cron] + event_w = [w for w in watchers if w.trigger] + assert len(cron_w) == 1 + assert len(event_w) == 1 + + def test_rejects_both_cron_and_trigger(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "bad" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + { + "name": "confused", + "command": "python x.py", + "cron": "* * * * *", + "trigger": "turn_complete", + } + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert watchers == [] + assert any(e["type"] == "watcher_missing_fields" for e in app.events) + + def test_rejects_neither_cron_nor_trigger(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "incomplete" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + {"name": "missing", "command": "python x.py"} + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert watchers == [] + assert any(e["type"] == "watcher_missing_fields" for e in app.events) + + def test_rejects_invalid_trigger(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "bad-trigger" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + { + "name": "invalid", + "command": "python x.py", + "trigger": "on_banana", + } + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert watchers == [] + assert any(e["type"] == "watcher_invalid_trigger" for e in app.events) + + def test_invalid_json(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "broken" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text("not json {{{") + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert watchers == [] + assert any(e["type"] == "watcher_invalid_json" for e in app.events) + + def test_bare_array_rejected(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "array" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text(json.dumps([{"name": "x"}])) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert watchers == [] + assert any(e["type"] == "watcher_invalid_format" for e in app.events) + + def test_env_passthrough(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "envtest" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + { + "name": "with-env", + "command": "python x.py", + "trigger": "turn_complete", + "env": {"MY_VAR": "hello"}, + } + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert len(watchers) == 1 + assert watchers[0].env == {"MY_VAR": "hello"} + + def test_missing_name_rejected(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "noname" + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + {"command": "python x.py", "trigger": "turn_complete"} + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert watchers == [] + + def test_multiple_skills_with_watchers(self, tmp_home: Path) -> None: + for name in ["alpha", "beta"]: + skill_dir = tmp_home / "skills" / name + skill_dir.mkdir(parents=True) + (skill_dir / "watchers.json").write_text( + json.dumps( + { + "watchers": [ + { + "name": f"{name}-watcher", + "command": f"python {name}.py", + "trigger": "turn_complete", + } + ] + } + ) + ) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert len(watchers) == 2 + names = {w.name for w in watchers} + assert names == {"alpha-watcher", "beta-watcher"} + + def test_all_valid_triggers_accepted(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "alltriggers" + skill_dir.mkdir(parents=True) + entries = [ + {"name": f"w-{t}", "command": "python x.py", "trigger": t} + for t in sorted(VALID_WATCHER_TRIGGERS) + ] + (skill_dir / "watchers.json").write_text(json.dumps({"watchers": entries})) + + app = FakeApp(tmp_home) + watchers = app._discover_watchers() + assert len(watchers) == len(VALID_WATCHER_TRIGGERS) + + +# --- Subprocess execution tests --- + + +class TestRunWatcherSubprocess: + @pytest.mark.asyncio + async def test_successful_watcher_with_findings(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "test" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + "import json, sys\n" + "ctx = json.loads(sys.stdin.readline())\n" + "print(json.dumps({'signal': 'test', 'severity': 'warn', " + "'message': f\"trace={ctx['trace_id']}\", 'route': 'log'}))\n" + ) + + watcher = WatcherConfig( + name="test-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + findings = await app._run_watcher_subprocess( + watcher, {"trigger": "turn_complete", "trace_id": "abc123", "events_path": "/tmp/events.jsonl"} + ) + + assert len(findings) == 1 + assert findings[0]["signal"] == "test" + assert findings[0]["message"] == "trace=abc123" + + @pytest.mark.asyncio + async def test_watcher_no_output(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "quiet" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + "import json, sys\n" + "ctx = json.loads(sys.stdin.readline())\n" + "# no findings\n" + ) + + watcher = WatcherConfig( + name="quiet-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + findings = await app._run_watcher_subprocess( + watcher, {"trigger": "turn_complete", "trace_id": "abc", "events_path": "/tmp/e.jsonl"} + ) + + assert findings == [] + + @pytest.mark.asyncio + async def test_watcher_nonzero_exit(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "fail" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text("import sys; sys.exit(1)\n") + + watcher = WatcherConfig( + name="fail-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + findings = await app._run_watcher_subprocess( + watcher, {"trigger": "turn_complete", "trace_id": "x", "events_path": "/tmp/e.jsonl"} + ) + + assert findings == [] + assert any(e["type"] == "watcher_nonzero_exit" for e in app.events) + + @pytest.mark.asyncio + async def test_watcher_env_vars(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "envtest" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + "import json, os, sys\n" + "sys.stdin.readline() # consume stdin\n" + "sd = os.environ.get('STATE_DIR', '')\n" + "wn = os.environ.get('WATCHER_NAME', '')\n" + "mv = os.environ.get('MY_VAR', '')\n" + "print(json.dumps({'signal': 'env', 'message': f'{sd}|{wn}|{mv}', 'route': 'log'}))\n" + ) + + watcher = WatcherConfig( + name="env-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={"MY_VAR": "hello"}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + findings = await app._run_watcher_subprocess( + watcher, {"trigger": "turn_complete", "trace_id": "x", "events_path": "/tmp/e.jsonl"} + ) + + assert len(findings) == 1 + parts = findings[0]["message"].split("|") + assert parts[0] == str(skill_dir) + assert parts[1] == "env-watcher" + assert parts[2] == "hello" + + @pytest.mark.asyncio + async def test_watcher_multiple_findings(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "multi" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + "import json, sys\n" + "sys.stdin.readline()\n" + "for i in range(3):\n" + " print(json.dumps({'signal': f's{i}', 'message': f'finding {i}', 'route': 'log'}))\n" + ) + + watcher = WatcherConfig( + name="multi-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + findings = await app._run_watcher_subprocess( + watcher, {"trigger": "turn_complete", "trace_id": "x", "events_path": "/tmp/e.jsonl"} + ) + + assert len(findings) == 3 + assert findings[0]["signal"] == "s0" + assert findings[2]["signal"] == "s2" + + @pytest.mark.asyncio + async def test_watcher_invalid_json_line_skipped(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "mixed" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + "import json, sys\n" + "sys.stdin.readline()\n" + "print('not json')\n" + "print(json.dumps({'signal': 'valid', 'message': 'ok', 'route': 'log'}))\n" + ) + + watcher = WatcherConfig( + name="mixed-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + findings = await app._run_watcher_subprocess( + watcher, {"trigger": "turn_complete", "trace_id": "x", "events_path": "/tmp/e.jsonl"} + ) + + assert len(findings) == 1 + assert findings[0]["signal"] == "valid" + assert any(e["type"] == "watcher_invalid_line" for e in app.events) + + @pytest.mark.asyncio + async def test_watcher_stderr_logged(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "stderr" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + "import json, sys\n" + "sys.stdin.readline()\n" + "print('debug info', file=sys.stderr)\n" + "print(json.dumps({'signal': 'ok', 'message': 'fine', 'route': 'log'}))\n" + ) + + watcher = WatcherConfig( + name="stderr-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + findings = await app._run_watcher_subprocess( + watcher, {"trigger": "turn_complete", "trace_id": "x", "events_path": "/tmp/e.jsonl"} + ) + + assert len(findings) == 1 + assert any(e["type"] == "watcher_stderr" for e in app.events) + + +# --- fire_watchers integration tests --- + + +class TestFireWatchers: + @pytest.mark.asyncio + async def test_fire_watchers_no_registered(self, tmp_home: Path) -> None: + app = FakeApp(tmp_home) + app._event_watchers = {} + + findings = await app.fire_watchers( + "turn_complete", + session_id="abc", + events_path="/tmp/e.jsonl", + ) + assert findings == [] + + @pytest.mark.asyncio + async def test_fire_watchers_with_findings(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "test" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + "import json, sys\n" + "ctx = json.loads(sys.stdin.readline())\n" + "print(json.dumps({'signal': 'test', 'severity': 'warn', " + "'message': 'found something', 'route': 'log'}))\n" + ) + + watcher = WatcherConfig( + name="test-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + app._event_watchers = {"turn_complete": [watcher]} + + findings = await app.fire_watchers( + "turn_complete", + session_id="abc123", + events_path="/tmp/events.jsonl", + ) + + assert len(findings) == 1 + assert findings[0]["signal"] == "test" + assert findings[0]["watcher"] == "test-watcher" + assert any(e["type"] == "watcher_findings" for e in app.events) + assert any(e["type"] == "watchers_complete" for e in app.events) + + @pytest.mark.asyncio + async def test_fire_watchers_wrong_trigger_type(self, tmp_home: Path) -> None: + app = FakeApp(tmp_home) + app._event_watchers = { + "turn_complete": [ + WatcherConfig( + name="x", command="echo", cron=None, trigger="turn_complete", + env={}, skill_dir=tmp_home, + ) + ] + } + + findings = await app.fire_watchers( + "session_start", + session_id="abc", + events_path="/tmp/e.jsonl", + ) + assert findings == [] + + @pytest.mark.asyncio + async def test_fire_watchers_adds_watcher_name(self, tmp_home: Path) -> None: + skill_dir = tmp_home / "skills" / "named" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + "import json, sys\n" + "sys.stdin.readline()\n" + "print(json.dumps({'signal': 'x', 'message': 'y', 'route': 'log'}))\n" + ) + + watcher = WatcherConfig( + name="my-watcher", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + + app = FakeApp(tmp_home) + app._event_watchers = {"turn_complete": [watcher]} + + findings = await app.fire_watchers( + "turn_complete", + session_id="s", + events_path="/tmp/e.jsonl", + ) + + assert findings[0]["watcher"] == "my-watcher" + + @pytest.mark.asyncio + async def test_fire_watchers_multiple_watchers(self, tmp_home: Path) -> None: + watchers = [] + for i in range(2): + skill_dir = tmp_home / "skills" / f"w{i}" + skill_dir.mkdir(parents=True) + (skill_dir / "check.py").write_text( + f"import json, sys\n" + f"sys.stdin.readline()\n" + f"print(json.dumps({{'signal': 'w{i}', 'message': 'from {i}', 'route': 'log'}}))\n" + ) + watchers.append( + WatcherConfig( + name=f"watcher-{i}", + command="python check.py", + cron=None, + trigger="turn_complete", + env={}, + skill_dir=skill_dir, + ) + ) + + app = FakeApp(tmp_home) + app._event_watchers = {"turn_complete": watchers} + + findings = await app.fire_watchers( + "turn_complete", + session_id="s", + events_path="/tmp/e.jsonl", + ) + + assert len(findings) == 2 + signals = {f["signal"] for f in findings} + assert signals == {"w0", "w1"}