diff --git a/CLAUDE.md b/CLAUDE.md index 9ca3e97..67c695d 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -41,6 +41,12 @@ Bach is a CLI-first personal agentic development session launcher that turns dai - `uv run bach daemon install` - Install `com.bach.daemon` as a macOS LaunchAgent (RunAtLoad + KeepAlive); prints `bach daemon run` fallback if launchctl fails - `uv run bach daemon uninstall` - Unload and remove the LaunchAgent plist; idempotent +### PRD#17 — Scheduled Supervision Sweeps +- `uv run bach supervise [run] [--task ] [--force]` - One-shot sweep: enumerate live task-linked sessions → judge each once → apply verdicts → print summary table; `--task` restricts to one session; `--force` bypasses the active-hours window +- `uv run bach supervise install` - Install `com.bach.supervise` as a macOS LaunchAgent (calendar-scheduled, NOT keep-alive); fires N times/day inside the active-hours window (default N=5, hours [8,11,14,16,19]); prints `bach supervise` fallback if launchctl fails; re-run after changing sweep knobs +- `uv run bach supervise uninstall` - Unload and remove the sweep agent plist; idempotent; does not touch `com.bach.daemon` +- Sweep knobs (via `bach config set`): `supervise-sweeps-per-day` (default 5), `supervise-window-start-hour` (default 8), `supervise-window-end-hour` (default 22) + ## File References - `docs/internal/DISCOVERY.md` - real goal and scope - `docs/internal/PRD.md` - requirements diff --git a/docs/adr/018-scheduled-supervision-sweeps.md b/docs/adr/018-scheduled-supervision-sweeps.md new file mode 100644 index 0000000..4ca0a53 --- /dev/null +++ b/docs/adr/018-scheduled-supervision-sweeps.md @@ -0,0 +1,245 @@ +# ADR-018 — Scheduled Supervision Sweeps (PRD #17) + +**Status:** Accepted +**Date:** 2026-06-13 +**Extends:** ADR-015 (session observability, judge_session + verdict apply), + ADR-017 (persistent watcher daemon, always-on alternative). + +## Context + +ADR-017 shipped a persistent daemon (`com.bach.daemon`) that keeps one watcher +alive per live session. The watcher calls the judge on every `judge_interval_seconds` +tick (~300 s by default), producing roughly 180 codex calls per session per day +for a typical 15-hour active window — continuous cost paid regardless of whether +the user is actually monitoring. + +A lower-cost alternative emerged from two observations: + +1. Most users check in a few times per day rather than monitoring continuously. + Real-time observation is useful but not always worth its price. +2. A single sweep pass (enumerate sessions → judge each once → apply) costs one + LLM call per session. At 5 sweeps/day that is ~36× cheaper than the daemon's + continuous tick pace. + +PRD #17 asks: **can we make supervision opt-in at low flat cost, with a manual +escape hatch, while leaving the daemon available for users who genuinely need +near-real-time verdicts?** + +Two design questions needed explicit decisions: + +1. **New observer code path or shared path?** The sweep needs to read transcripts, + call the judge, and apply verdicts exactly as the watcher does. Forking a + second code path would create two places to keep in sync. + +2. **How to guard against waking up at 03:00?** A naive launchd `StartInterval` + agent fires on every reboot regardless of time-of-day. The sweep should be + a day-hours operation. + +## Decision + +### 1. One-shot sweep model + +`bach supervise` (bare) or `bach supervise run` runs **one sweep pass** and +exits. The sweep: + +1. Enumerates live, task-linked sessions via the same path as the daemon + (`scan_registry_artifact_paths` → `discover_sessions` → `list_sessions` → + `list_supervisable_artifacts`). +2. For each artifact, reads the transcript tail and calls the judge **once** + (one codex call per session — the cost-containment invariant). +3. Applies the verdict via the shared `verdict_apply.apply_verdict` path. +4. Prints a Rich per-session summary table. +5. Exits. + +There is no persistent process. The sweep is a dumb batch job: fire, iterate, +exit. All state after the sweep lives in the task artifacts. + +### 2. Shared judge + apply path — no new observer code + +The sweep reuses: + +- `judge_service.judge_session` — the same judge callable the watcher uses. +- `verdict_apply.apply_verdict` — the shared apply function (extracted in this + PR from `session_watcher.py` into its own module so both callers can import + it). This means the observer authority matrix (`source="observer"`), the + `observer_moves`/confidence-threshold guard, and the fallback log-append + on write failure all apply identically to sweeps and the continuous watcher. +- `list_supervisable_artifacts` — the same supervisable-artifacts filter. + +Rationale: one code path means one place to audit, one place to update when +the judge API changes, and guaranteed behavioral parity between the two +supervision modes. + +### 3. Active-hours window guard + +`run_sweep` checks `now.hour` against +`[supervise_window_start_hour, supervise_window_end_hour)` (defaults 08–22, +local time) before doing any work. If the current hour is outside the window +and `force=False`, the sweep returns a `SweepReport(skipped=True)` — no judge +calls, no disk writes, no output beyond a muted log line. + +`--force` bypasses the window entirely for manual at-any-hour use (e.g. +debugging, late-night emergency check). + +The window guard exists because launchd calendar agents can fire at unexpected +times (reboot at 02:00 catches up all missed intervals). The window makes +late-night sweeps a logged no-op rather than a surprise LLM charge. + +### 4. Per-session error isolation + +Each artifact is processed inside a `try/except` in `_sweep_one`. An +exception in one session is recorded as a `SweepResult` with `error` set and +the sweep continues to the next session. This mirrors the daemon's per-watcher +resilience: one bad artifact must not abort the entire batch. + +### 5. macOS launchd calendar agent with a distinct label + +`bach supervise install` writes a **calendar agent** plist at +`~/Library/LaunchAgents/com.bach.supervise.plist`. Key differences from the +daemon plist (`com.bach.daemon`): + +| Property | Daemon (`com.bach.daemon`) | Sweep agent (`com.bach.supervise`) | +|---|---|---| +| `RunAtLoad` | `true` | absent | +| `KeepAlive` | `true` | absent | +| `StartCalendarInterval` | absent | N `{Hour, Minute}` entries | + +`StartCalendarInterval` with an array of entries fires at each listed hour. +No `KeepAlive` or `RunAtLoad` — the sweep is not a persistent service; it is +a cron-like trigger for a one-shot command. + +The **distinct label** (`com.bach.supervise` vs `com.bach.daemon`) means: + +- The two agents are fully independent on disk and in launchd. Installing or + uninstalling one does not affect the other. +- A user can run `bach daemon install` and `bach supervise install` + simultaneously (both register successfully). Running both is allowed but + redundant: the sweep fires at calendar times while the daemon is already + judging continuously. The daemon's continuous verdicts are not harmed by the + periodic sweep calling the judge again; the apply path is idempotent. + +On `launchctl` failure (non-macOS, SIP restrictions, sandboxed environment), +the plist is still written and the fallback command `bach supervise` is printed. +This mirrors the daemon install pattern from ADR-017 §5. + +### 6. Sweep-hour spread formula + +The `_compute_sweep_hours` function computes N evenly-spaced integer hours: + +``` +entry[i].Hour = round(window_start + i * span / N) +``` + +`round()` (not `floor()`) is used so the last entry reaches near `window_end` +rather than clustering early. For N=5 over the default 08–22 window: + +``` +i=0: round(8 + 0 * 14/5) = round(8.0) = 8 +i=1: round(8 + 1 * 14/5) = round(10.8) = 11 +i=2: round(8 + 2 * 14/5) = round(13.6) = 14 +i=3: round(8 + 3 * 14/5) = round(16.4) = 16 +i=4: round(8 + 4 * 14/5) = round(19.2) = 19 +→ [8, 11, 14, 16, 19] +``` + +`floor()` would give `[8, 10, 13, 16, 18]` — the span is the same but the +distribution is less uniform (shorter first gap, longer last gap vs the full +14-hour window). `round()` spreads them closer to uniform. + +### 7. Cost comparison + +| Mode | Judge calls per session per 15-hour day | +|---|---| +| Daemon (continuous, 300 s tick) | ~180 | +| Sweeps at 5/day | 5 | +| **Ratio** | **~36× cheaper** | + +## Known limitations / accepted trade-offs + +*These are deliberate design choices, not bugs.* + +**(a) Staleness between sweeps.** +A session's status is observed at most N times per day. A blocked session +might stay undetected for hours between sweep fires. Users who need near-real- +time observation should use the daemon instead. + +**(b) Window is local-hour integer granularity.** +`supervise_window_start_hour` and `supervise_window_end_hour` are integers +(0–24). Sub-hour precision (e.g. "start at 08:30") is not supported. The +cost of adding minute-precision for a scheduling knob that changes at most +once a year is not worth the complexity. + +**(c) Sweep-hour spread is rounded integer hours.** +The `round()` formula produces near-uniform distribution but not perfectly +uniform — rounding creates small gaps (e.g. 14→16 is a 2-hour gap vs 11→14 +is 3 hours for N=5). This is accepted over floating-point scheduling, which +launchd `StartCalendarInterval` does not support anyway. + +**(d) Running both daemon and sweep agent is redundant.** +Bach does not prevent this configuration. The second call to `apply_verdict` +will find the verdict already applied and either do nothing (same status) or +see an `InvalidStatusError` caught and logged. No data is corrupted, but one +judge call per sweep fire is wasted. Users who install both should be aware +they gain no benefit from the sweep agent. + +## Consequences + +- **`bach supervise [run]` is a new CLI surface.** Implemented in + `cli/supervise_cmd.py`. Options: `--task `, `--force`. + +- **`bach supervise install` / `uninstall` are new CLI commands.** They manage + the `com.bach.supervise` LaunchAgent independently of `bach daemon`. + +- **Three new config knobs** (all via `bach config set`): + + | Key | Default | Effect | + |-----|---------|--------| + | `supervise-window-start-hour` | `8` | Active-hours window start (inclusive) | + | `supervise-window-end-hour` | `22` | Active-hours window end (exclusive) | + | `supervise-sweeps-per-day` | `5` | Calendar entries per day | + +- **`verdict_apply.py` is a new module** extracted from `session_watcher.py` so + both the watcher and the sweep can import the canonical apply path. Existing + behavior of the continuous watcher is unchanged. + +- **The daemon (`com.bach.daemon`) is unchanged.** No existing behavior was + modified. Scheduled sweeps are an additive, opt-in alternative. + +## Alternatives rejected + +- **Reuse the daemon with a low reconcile interval.** Lowering + `daemon_reconcile_seconds` makes supervision more frequent but keeps the + per-tick cost. You cannot make the daemon "cheap per fire" — it is always + on. A purpose-built one-shot sweep is the correct tool for calendar-triggered + batch supervision. + +- **Cron (`crontab -e`) instead of launchd.** cron is available on macOS + but is deprecated for user jobs and does not load automatically at login + without extra `launchctl` configuration. launchd is the correct macOS tool + and is already used for the daemon. Consistency over novelty. + +- **Per-sweep-hour minute-level control.** Supporting custom minute offsets + (e.g. `:30` instead of `:00`) would complicate the spread formula and the + config knob surface with negligible practical benefit. Top-of-hour scheduling + is standard and easy to reason about. + +- **Fork a new apply path for the sweep.** The sweep could have its own + apply logic tuned for batch use. Rejected — forking creates two sources of + truth for observer authority semantics. A shared path is the correct + architectural boundary. + +## References + +- ADR-015: `adr/015-session-observability-codex-parity.md` +- ADR-016: `adr/016-loop-control.md` +- ADR-017: `adr/017-persistent-watcher-daemon.md` +- Implementation: `src/bach/services/supervise_service.py` (run_sweep, SweepReport) +- Implementation: `src/bach/services/verdict_apply.py` (shared apply path) +- Implementation: `src/bach/cli/supervise_cmd.py` (sweep + install/uninstall) +- Implementation: `src/bach/services/launchd_service.py` + (`render_sweep_agent`, `install_sweep_agent`, `uninstall_sweep_agent`, + `_compute_sweep_hours`) +- Config: `src/bach/config/settings.py` + (`supervise_sweeps_per_day`, `supervise_window_start_hour`, + `supervise_window_end_hour`) +- How-to: `docs/how-to/run-supervision-sweeps.md` diff --git a/docs/how-to/run-supervision-sweeps.md b/docs/how-to/run-supervision-sweeps.md new file mode 100644 index 0000000..52c6d6c --- /dev/null +++ b/docs/how-to/run-supervision-sweeps.md @@ -0,0 +1,185 @@ +# Run Supervision Sweeps + +Set up Bach's low-cost scheduled supervision in about 2 minutes: one install +command, and Bach will judge every live task session 5 times per day during +your work hours — at roughly 36× lower cost than the always-on daemon. + +--- + +## Prerequisites + +- Bach installed and `uv run bach --help` works. +- At least one project registered (`bach project add `). +- Hooks installed so sessions are linked to tasks + (`bach hooks install claude-code --project `). +- macOS (the launchd path). Linux / other POSIX: use the manual fallback. + +--- + +## Step-by-step + +### 1. Run a manual sweep first (verify it works) + +Before installing the scheduled agent, check that the sweep finds your +sessions: + +```bash +bach supervise +``` + +You should see a table with one row per live, task-linked session. If you +see "nothing to supervise", confirm hooks are installed and a task is in +`executing` or `qa` status. + +Use `--task` to restrict the sweep to one session by task ID: + +```bash +bach supervise --task t47 +``` + +Use `--force` to bypass the active-hours window and sweep at any time +(useful for checking behavior outside business hours): + +```bash +bach supervise --force +``` + +--- + +### 2. Install the scheduled sweep agent + +```bash +bach supervise install +``` + +This writes `~/Library/LaunchAgents/com.bach.supervise.plist` and loads it +via `launchctl`. With the default config (5 sweeps/day, 08:00–22:00 window), +the agent fires at **08:00, 11:00, 14:00, 16:00, and 19:00** local time. + +If `launchctl` fails (sandboxed environment, non-macOS, etc.) the plist is +still written and Bach prints the manual fallback: + +``` +Run manually (or under a cron / launchctl): bach supervise +``` + +Keep that command handy — you can paste it into a cron job or a tmux session. + +Verify the agent loaded: + +```bash +launchctl list | grep com.bach.supervise +``` + +You should see the agent in the list with a PID of `-` (it is a calendar +agent, not a persistent daemon — it only runs at scheduled fire times). + +--- + +### 3. Check in over SSH + +The sweep writes observer verdicts to task artifacts and the board. There are +**no push notifications** — you pull status when you return. + +Open a new SSH session and run any of these: + +```bash +# Session list: linked tasks, status, liveness dots +bach sessions list + +# Kanban board with observer summary lines and ⚑ needs you markers +bach +bach > /board + +# Task detail with Observer section (summary, confidence, needs_human) +bach > /show t47 +``` + +If an artifact shows `⚑ needs you`, the last sweep flagged that the agent +is blocked. Resume the session to unblock it: + +```bash +bach sessions resume +``` + +--- + +## Tuning + +All three sweep knobs are set via `bach config set`: + +```bash +# How many times per day to sweep (default 5) +bach config set supervise-sweeps-per-day 3 + +# Start of the active-hours window — sweeps outside this window are skipped +bach config set supervise-window-start-hour 9 + +# End of the active-hours window +bach config set supervise-window-end-hour 21 +``` + +After changing `supervise-sweeps-per-day` or the window bounds, re-run +`bach supervise install` to update the plist with the new calendar entries. +The install is idempotent — it unloads the old agent before loading the new one. + +--- + +## When to use sweeps vs the daemon + +| | Scheduled sweeps | Persistent daemon | +|---|---|---| +| **Install command** | `bach supervise install` | `bach daemon install` | +| **LaunchAgent label** | `com.bach.supervise` | `com.bach.daemon` | +| **Observation frequency** | N times per day (default 5) | Continuously (~every 5 min) | +| **Cost (1 session/day)** | ~5 codex calls | ~180 codex calls | +| **Staleness** | Up to hours between sweeps | Up to 5 min | +| **Recommended for** | Most users — low daily cost, pull-based check-in | Users who need near-real-time verdicts and `⚑ needs you` alerts promptly | + +**Short answer:** start with sweeps. If you find yourself needing to know +within minutes when a session is blocked, switch to the daemon. + +Running both simultaneously is allowed but redundant — the daemon's continuous +verdicts are not harmed by the sweep also firing, but the sweep adds no extra +value when the daemon is running. + +--- + +## Stopping and removing the sweep agent + +Remove the sweep agent (stop + uninstall): + +```bash +bach supervise uninstall +``` + +After uninstall, `bach supervise install` re-registers it from scratch. + +To stop the daemon separately (if you have both installed): + +```bash +bach daemon uninstall +``` + +--- + +## Troubleshooting + +| Symptom | Check | +|---------|-------| +| "nothing to supervise" on manual run | Confirm hooks installed: `bach hooks status --project ` | +| Sweep skips outside expected hours | Default window is 08–22; check `bach config show` for custom values; use `--force` to override | +| `launchctl list` doesn't show `com.bach.supervise` after install | Check `~/.bach/logs/supervise.stderr.log` for startup errors | +| Verdicts land on wrong sessions | Confirm artifacts have `agent.runtime_session_id` set (hooks must fire at least once after session start) | +| Sweep fires but no `⚑ needs you` when agent is blocked | The judge confidence was below `judge-confidence-threshold` — lower it: `bach config set judge-confidence-threshold 0.5` | + +--- + +## Related docs + +- `enable-observer.md` — hooks, watch-on-launch, board liveness dots +- `run-the-daemon.md` — always-on alternative for real-time supervision +- `configure-user-settings.md` — all user config knobs +- `use-the-repl.md` — /board, /show, /open commands +- `../adr/018-scheduled-supervision-sweeps.md` — sweep architecture and cost trade-offs +- `../adr/017-persistent-watcher-daemon.md` — daemon architecture diff --git a/src/bach/cli/app.py b/src/bach/cli/app.py index b4b3f4e..b859fd2 100644 --- a/src/bach/cli/app.py +++ b/src/bach/cli/app.py @@ -12,6 +12,7 @@ from bach.cli.internal_sessions import register_internal_sessions from bach.cli.issue import issue_app from bach.cli.sessions import sessions_app +from bach.cli.supervise_cmd import supervise_app from bach.config.paths import bach_home from bach.config.projects import ProjectRegistry from bach.config.settings import ( @@ -76,6 +77,8 @@ app.add_typer(hooks_app, name="hooks") # Phase 17 (issue #12): daemon supervisor sub-app. app.add_typer(daemon_app, name="daemon") +# Issue #18: one-shot supervision sweep sub-app. +app.add_typer(supervise_app, name="supervise") # Internal hook-event + session-watch + session-stop commands. register_internal_sessions(internal_app) # Phase 16 L1: `bach afk run` sub-app. afk_runner.py (L2) provides the logic. diff --git a/src/bach/cli/supervise_cmd.py b/src/bach/cli/supervise_cmd.py new file mode 100644 index 0000000..2377f96 --- /dev/null +++ b/src/bach/cli/supervise_cmd.py @@ -0,0 +1,457 @@ +"""Typer sub-app for `bach supervise ...` — issue #18. + +Subcommands: + run — run a one-shot supervision sweep (judge all live, linked sessions). + install — install the sweep agent as a macOS launchd calendar agent. + uninstall — remove the sweep agent plist. + +The bare `bach supervise` (no subcommand) also runs the sweep via the group +callback (invoke_without_command=True, invoked when ctx.invoked_subcommand is None). + +Options on the sweep: + --task Restrict sweep to one task's session (resolved via find_artifact_for_ref). + --force Bypass the active-hours window guard. + +Architecture notes: + - Handler is THIN: loads config, wires seams, calls run_sweep, renders table. + - Enumeration reuses the same path as daemon_cmd.enumerate_desired. + - Session-qualified artifacts come from list_supervisable_artifacts. + - All seams (judge_fn, apply_fn, read_payload_fn, transcript_path_fn) are + wired in _run_sweep_with_real_seams; run_sweep itself is side-effect-free + so the service layer is unit-testable without touching disk or LLM. +""" + +import logging +from datetime import datetime +from pathlib import Path +from typing import Annotated + +import typer +from rich.console import Console +from rich.table import Table + +from bach.config.paths import bach_home +from bach.config.projects import ProjectRegistry +from bach.config.settings import load_config +from bach.config.themes import THEMES, ThemePalette, get_theme +from bach.runtimes.session_discovery import default_process_probe, discover_sessions +from bach.runtimes.transcript import read_tail_events +from bach.services.launchd_service import ( + install_sweep_agent, + uninstall_sweep_agent, +) +from bach.services.sessions_service import ( + find_artifact_for_ref, + list_sessions, + list_supervisable_artifacts, + scan_registry_artifact_paths, +) +from bach.services.supervise_service import SweepReport, SweepResult, run_sweep +from bach.services.verdict_apply import apply_verdict + +logger = logging.getLogger(__name__) + +# Typer GROUP — invoke_without_command=True so bare `bach supervise` runs the sweep. +supervise_app = typer.Typer( + name="supervise", + help="One-shot supervision sweep: judge all live, task-linked sessions once.", + invoke_without_command=True, +) + +_console = Console(highlight=False) + + +def _theme() -> ThemePalette: + """Load the active theme palette (same pattern as cli/sessions.py).""" + return get_theme(load_config().theme) or THEMES["default"] + + +# --------------------------------------------------------------------------- +# Enumeration helper — reuses the same logic as daemon_cmd.enumerate_desired +# --------------------------------------------------------------------------- + + +def _enumerate_supervisable_artifacts() -> list[Path]: + """Return a list of artifact Paths that need a sweep cycle. + + This is the same enumeration logic as daemon_cmd.enumerate_desired: + ProjectRegistry → scan_registry_artifact_paths + → discover_sessions → list_sessions → list_supervisable_artifacts + + Returns a sorted list (not a set) so the order is deterministic for the + Rich table output. + """ + from datetime import UTC + + cfg = load_config() + registry = ProjectRegistry.default() + artifact_paths = scan_registry_artifact_paths(registry) + + discovered = discover_sessions( + claude_projects_dir=Path.home() / ".claude" / "projects", + codex_home=Path.home() / ".codex", + process_probe=default_process_probe, + now=datetime.now(UTC), + running_threshold_s=cfg.liveness_running_seconds, + idle_threshold_s=cfg.liveness_idle_minutes * 60, + ) + + rows = list_sessions( + sessions_dir=bach_home() / "sessions", + artifacts_dir=bach_home(), + discover_fn=lambda: discovered, + read_events_fn=read_tail_events, + artifact_paths=artifact_paths, + max_ended_age_s=0, # exclude ended sessions immediately + ) + + supervisable: set[Path] = list_supervisable_artifacts(rows=rows) + return sorted(supervisable) + + +# --------------------------------------------------------------------------- +# Core sweep runner — wires all real seams +# --------------------------------------------------------------------------- + + +def _run_sweep_with_real_seams( + artifacts: list[Path], + *, + force: bool, +) -> SweepReport: + """Wire the real seams and call run_sweep. + + Extracts the wiring so both the bare callback and the `run` subcommand + share it without duplication. All heavyweight imports (judge_service, + session_watcher internals, status_service) are deferred to this call + site — run_sweep itself imports nothing optional at module level. + + The _apply_fn closure bridges the supervise_service.ApplyFn signature + (positional artifact + verdict) to verdict_apply.apply_verdict (which + also needs set_task_status_fn injected). This is the single place where + the real set_task_status is wired in for production use. + """ + from bach.services.judge_service import judge_session # noqa: PLC0415 + from bach.services.session_watcher import ( # noqa: PLC0415 + _read_artifact_payload, + _transcript_path_for, + ) + from bach.services.status_service import set_task_status # noqa: PLC0415 + + cfg = load_config() + + def _apply_fn( + a: Path, + verdict: object, + *, + config: object, + current_status: str, + session_id: str, + ) -> None: + """Wrap apply_verdict with the real set_task_status seam.""" + + apply_verdict( + a, + verdict, # type: ignore[arg-type] + config=config, # type: ignore[arg-type] + current_status=current_status, + session_id=session_id, + set_task_status_fn=set_task_status, + ) + + return run_sweep( + artifacts, + read_events_fn=read_tail_events, + judge_fn=judge_session, + apply_fn=_apply_fn, + read_payload_fn=_read_artifact_payload, + transcript_path_fn=_transcript_path_for, + config=cfg, + model=cfg.normalize_model, + now=datetime.now().astimezone(), + force=force, + ) + + +# --------------------------------------------------------------------------- +# Rich table rendering +# --------------------------------------------------------------------------- + + +def _render_sweep_report(report: SweepReport, t: ThemePalette) -> None: + """Render the SweepReport as a Rich table to the console.""" + if report.skipped: + _console.print( + f"[{t.muted}]sweep skipped — outside active-hours window. " + f"Use --force to bypass.[/{t.muted}]" + ) + return + + if not report.results: + _console.print( + f"[{t.muted}]nothing to supervise — no live, task-linked sessions.[/{t.muted}]" + ) + return + + table = Table(show_header=True, header_style="bold", highlight=False) + table.add_column("Artifact", style="dim", max_width=30) + table.add_column("Status") + table.add_column("Suggested") + table.add_column("Confidence") + table.add_column("Needs human?") + table.add_column("Moved?") + table.add_column("Error") + + for result in report.results: + _add_result_row(table, result, t) + + _console.print(table) + logger.info( + "event=supervise_sweep_rendered total=%d errors=%d", + len(report.results), + sum(1 for r in report.results if r.error is not None), + ) + + +def _add_result_row(table: Table, result: SweepResult, t: ThemePalette) -> None: + """Add one SweepResult to the Rich table.""" + artifact_name = result.artifact.name[:28] # truncate for readability + + if result.error is not None: + table.add_row( + artifact_name, + result.current_status or "—", + "—", + "—", + "—", + "—", + f"[{t.error}]{result.error}[/{t.error}]", + ) + return + + verdict = result.verdict + if verdict is None: + table.add_row( + artifact_name, + result.current_status or "—", + f"[{t.muted}]no verdict[/{t.muted}]", + "—", + "—", + "—", + "—", + ) + return + + # Determine if the status was actually moved (suggested != current). + moved = verdict.suggested_status != result.current_status + moved_str = ( + f"[{t.success}]yes[/{t.success}]" + if moved + else f"[{t.muted}]no[/{t.muted}]" + ) + needs_human_str = ( + f"[{t.warning}]yes[/{t.warning}]" + if verdict.needs_human + else f"[{t.muted}]no[/{t.muted}]" + ) + + table.add_row( + artifact_name, + result.current_status or "—", + verdict.suggested_status, + f"{verdict.confidence:.2f}", + needs_human_str, + moved_str, + "—", + ) + + +# --------------------------------------------------------------------------- +# Group callback — `bach supervise` (bare, no subcommand) runs the sweep +# --------------------------------------------------------------------------- + + +@supervise_app.callback(invoke_without_command=True) +def supervise_callback( + ctx: typer.Context, + task: Annotated[ + str | None, + typer.Option("--task", help="Restrict to one task's session (task ID or session prefix)."), + ] = None, + force: Annotated[ + bool, + typer.Option("--force", help="Bypass the active-hours window guard."), + ] = False, +) -> None: + """One-shot supervision sweep. + + With no subcommand, runs the sweep immediately. Options work on both + bare `bach supervise` and `bach supervise run`. + """ + if ctx.invoked_subcommand is not None: + # A subcommand was given (e.g. `supervise run`); let it handle things. + return + + _do_sweep(task=task, force=force) + + +# --------------------------------------------------------------------------- +# `bach supervise run` — explicit subcommand +# --------------------------------------------------------------------------- + + +@supervise_app.command("run") +def supervise_run( + task: Annotated[ + str | None, + typer.Option("--task", help="Restrict to one task's session (task ID or session prefix)."), + ] = None, + force: Annotated[ + bool, + typer.Option("--force", help="Bypass the active-hours window guard."), + ] = False, +) -> None: + """Run a one-shot supervision sweep. + + Judges every live, task-linked session once and prints a per-session + summary table. Use --task to restrict to one session; --force to + bypass the active-hours window. + + Examples: + bach supervise run + bach supervise run --task t47 + bach supervise run --force + """ + _do_sweep(task=task, force=force) + + +# --------------------------------------------------------------------------- +# Shared sweep logic +# --------------------------------------------------------------------------- + + +def _do_sweep(*, task: str | None, force: bool) -> None: + """Resolve artifacts, run sweep, render table — shared by callback and run.""" + t = _theme() + + # Build the artifact list: all supervisable OR just one if --task given. + if task is not None: + # Search the full registry (not just supervisable) so --task can + # find any artifact by task_id or session_id prefix. + from bach.config.projects import ProjectRegistry # noqa: PLC0415 + + registry = ProjectRegistry.default() + all_paths = scan_registry_artifact_paths(registry) + found = find_artifact_for_ref(task, all_paths) + if found is None: + _console.print( + f"[{t.error}]error[/{t.error}] no artifact found for {task!r}. " + "Use `bach sessions list` to find linked tasks." + ) + raise typer.Exit(code=1) + artifacts = [found] + else: + artifacts = _enumerate_supervisable_artifacts() + + if not artifacts: + _console.print( + f"[{t.muted}]nothing to supervise — no live, task-linked sessions.[/{t.muted}]" + ) + return + + report = _run_sweep_with_real_seams(artifacts, force=force) + _render_sweep_report(report, t) + + +# --------------------------------------------------------------------------- +# `bach supervise install` +# --------------------------------------------------------------------------- + + +@supervise_app.command("install") +def supervise_install() -> None: + """Install the sweep agent as a macOS LaunchAgent (calendar-scheduled). + + Writes a plist to ~/Library/LaunchAgents/ and loads it via launchctl so + `bach supervise` runs N times per day inside the configured active-hours + window. N is read from `supervise-sweeps-per-day` (default 5); the window + from `supervise-window-start-hour` / `supervise-window-end-hour`. + + This is an EXPLICIT install — nothing auto-installs the LaunchAgent. + Run `bach supervise uninstall` to remove it. + + If launchctl is unavailable (non-macOS, sandboxed environment, etc.) the + plist is still written and the manual fallback command is printed so you + always have a path to trigger sweeps. + + Examples: + bach supervise install + bach config set supervise-sweeps-per-day 3 + bach supervise install + """ + t = _theme() + cfg = load_config() + + result = install_sweep_agent( + sweeps_per_day=cfg.supervise_sweeps_per_day, + window_start_hour=cfg.supervise_window_start_hour, + window_end_hour=cfg.supervise_window_end_hour, + ) + + if not result.installed: + # Unexpected failure writing the plist itself — surface the fallback. + _console.print( + f"[{t.error}]error[/{t.error}] failed to write sweep agent plist." + ) + if result.fallback_command: + _console.print( + f"[{t.muted}]Run manually (or under a cron / launchctl):[/{t.muted}] " + f"{result.fallback_command}" + ) + logger.warning("event=supervise_install_failed") + raise typer.Exit(code=1) + + if result.fallback_command: + # Plist written but launchctl load failed — print fallback prominently. + _console.print( + f"[{t.accent}]installed[/{t.accent}] sweep agent plist written. " + "However, launchctl failed to load it automatically." + ) + _console.print( + f"[{t.muted}]Run manually:[/{t.muted}] {result.fallback_command}" + ) + logger.warning( + "event=supervise_install_launchctl_failed fallback=%r", result.fallback_command + ) + else: + _console.print( + f"[{t.success}]installed[/{t.success}] Bach sweep agent loaded successfully. " + f"Will run {cfg.supervise_sweeps_per_day} time(s) per day " + f"between {cfg.supervise_window_start_hour}:00 and " + f"{cfg.supervise_window_end_hour}:00." + ) + logger.info("event=supervise_install_ok") + + +# --------------------------------------------------------------------------- +# `bach supervise uninstall` +# --------------------------------------------------------------------------- + + +@supervise_app.command("uninstall") +def supervise_uninstall() -> None: + """Remove the Bach sweep agent LaunchAgent. + + Unloads the agent from launchctl and deletes the plist file from + ~/Library/LaunchAgents/. Idempotent — safe to call when the agent is + not installed. + + Examples: + bach supervise uninstall + """ + t = _theme() + uninstall_sweep_agent() + _console.print( + f"[{t.success}]uninstalled[/{t.success}] Bach sweep agent removed." + ) + logger.info("event=supervise_uninstall_ok") diff --git a/src/bach/config/settings.py b/src/bach/config/settings.py index 945293c..0540156 100644 --- a/src/bach/config/settings.py +++ b/src/bach/config/settings.py @@ -116,6 +116,23 @@ class BachConfig: # table scan overhead. 30s is a sensible default — new sessions are adopted # within one cycle, and one extra scan per 30s is negligible overhead. daemon_reconcile_seconds: int = 30 + # --------------------------------------------------------------------------- + # Issue #18 — bach supervise active-hours window + # --------------------------------------------------------------------------- + # `bach supervise` skips the sweep when the local clock is outside + # [supervise_window_start_hour, supervise_window_end_hour). Use + # `bach supervise run --force` to bypass the window. + # Range constraint: 0 <= start < end <= 24 (validated in set_observer_key). + supervise_window_start_hour: int = 8 + supervise_window_end_hour: int = 22 + # --------------------------------------------------------------------------- + # Issue #19 — launchd calendar sweep agent + # --------------------------------------------------------------------------- + # Number of times per day `bach supervise` is triggered by the launchd + # calendar agent. Must be > 0. The entries are evenly spaced across the + # active-hours window [supervise_window_start_hour, supervise_window_end_hour). + # Default 5 = roughly every ~2.8 hours in the default 8-22 window. + supervise_sweeps_per_day: int = 5 def _default_config_path() -> Path: @@ -190,6 +207,16 @@ def load_config(path: Path | None = None) -> BachConfig: daemon_reconcile_seconds = _coerce_positive_int( parsed.get("daemon_reconcile_seconds"), "daemon_reconcile_seconds", default=30 ) + supervise_window_start_hour = _coerce_window_hour( + parsed.get("supervise_window_start_hour"), "supervise_window_start_hour", default=8 + ) + supervise_window_end_hour = _coerce_window_hour( + parsed.get("supervise_window_end_hour"), "supervise_window_end_hour", default=22 + ) + # Issue #19 — number of sweeps per day for the launchd calendar agent. + supervise_sweeps_per_day = _coerce_positive_int( + parsed.get("supervise_sweeps_per_day"), "supervise_sweeps_per_day", default=5 + ) config = BachConfig( iterm_layout=layout, concurrency=concurrency, @@ -206,6 +233,9 @@ def load_config(path: Path | None = None) -> BachConfig: afk_max_turns=afk_max_turns, afk_time_budget_minutes=afk_time_budget_minutes, daemon_reconcile_seconds=daemon_reconcile_seconds, + supervise_window_start_hour=supervise_window_start_hour, + supervise_window_end_hour=supervise_window_end_hour, + supervise_sweeps_per_day=supervise_sweeps_per_day, ) logger.info( @@ -214,7 +244,9 @@ def load_config(path: Path | None = None) -> BachConfig: "watch_on_launch=%s observer_moves=%s judge_confidence_threshold=%s " "judge_interval_seconds=%d liveness_running_seconds=%d " "liveness_idle_minutes=%d afk_max_turns=%d afk_time_budget_minutes=%d " - "daemon_reconcile_seconds=%d", + "daemon_reconcile_seconds=%d " + "supervise_window_start_hour=%d supervise_window_end_hour=%d " + "supervise_sweeps_per_day=%d", config_path, config.iterm_layout.value, config.concurrency, @@ -231,6 +263,9 @@ def load_config(path: Path | None = None) -> BachConfig: config.afk_max_turns, config.afk_time_budget_minutes, config.daemon_reconcile_seconds, + config.supervise_window_start_hour, + config.supervise_window_end_hour, + config.supervise_sweeps_per_day, ) return config @@ -443,6 +478,27 @@ def _coerce_confidence(value: Any) -> float: return fval +def _coerce_window_hour(value: Any, field: str, *, default: int) -> int: + """Accept an integer in [0, 24]; anything else → default + warning. + + Used for supervise_window_start_hour (allows 0) and supervise_window_end_hour. + Cross-field validation (start < end) is NOT done here — load_config reads + each field independently. The constraint is enforced in set_observer_key + at write time. + """ + if value is None: + return default + if isinstance(value, bool) or not isinstance(value, int) or value < 0 or value > 24: + logger.warning( + "event=config_invalid field=%s value=%r using=%d", + field, + value, + default, + ) + return default + return value + + def _coerce_positive_int(value: Any, field: str, *, default: int) -> int: """Accept a positive (>0) integer; anything else → default + warning. @@ -519,6 +575,27 @@ def _parse_positive_int(raw: str) -> int: return val +def _parse_window_hour(raw: str) -> int: + """Accept an hour in [0, 24]; raise ValueError otherwise. + + Used for supervise-window-start-hour and supervise-window-end-hour. + Note: 0 is valid for start (midnight) and 24 for end (midnight next day). + Cross-field validation (start < end) is done in set_observer_key. + """ + try: + val = int(raw.strip()) + except ValueError: + raise ValueError( + f"Invalid integer value {raw!r} for window hour. Expected an integer in [0, 24]." + ) from None + if val < 0 or val > 24: + raise ValueError( + f"Value {val!r} is outside [0, 24] for window hour. " + "Use 0 for midnight (start), 24 for midnight (end)." + ) + return val + + # CLI-spelling (hyphen) → (parser, YAML field name) # The parsers raise ValueError with a human message on bad input so that # set_observer_key can re-raise cleanly without wrapping. @@ -533,6 +610,11 @@ def _parse_positive_int(raw: str) -> int: "afk-time-budget-minutes": _parse_positive_int, # Phase 17 — daemon supervisor cycle interval (issue #12) "daemon-reconcile-seconds": _parse_positive_int, + # Issue #18 — supervise active-hours window + "supervise-window-start-hour": _parse_window_hour, + "supervise-window-end-hour": _parse_window_hour, + # Issue #19 — number of launchd calendar sweeps per day + "supervise-sweeps-per-day": _parse_positive_int, } # Map CLI-spelling (hyphen) → YAML/dataclass field name (underscore). @@ -590,6 +672,30 @@ def set_observer_key(key: str, raw_value: str) -> Path: "Otherwise idle state is unreachable and the ◐ board signal vanishes." ) + # Cross-field validation for supervise window: 0 <= start < end <= 24. + # Without this guard, a start >= end makes the window logically inverted — + # the sweep would always skip (nothing is ever "inside" an inverted window). + if key in ("supervise-window-start-hour", "supervise-window-end-hour"): + current_cfg = load_config() + if key == "supervise-window-start-hour": + new_start: int = int(typed_value) + end_hour = current_cfg.supervise_window_end_hour + if new_start >= end_hour: + raise ValueError( + f"supervise-window-start-hour ({new_start}) must be less than " + f"supervise-window-end-hour ({end_hour}). " + "Window invariant: 0 <= start < end <= 24." + ) + else: # supervise-window-end-hour + new_end: int = int(typed_value) + start_hour = current_cfg.supervise_window_start_hour + if start_hour >= new_end: + raise ValueError( + f"supervise-window-end-hour ({new_end}) must be greater than " + f"supervise-window-start-hour ({start_hour}). " + "Window invariant: 0 <= start < end <= 24." + ) + field_name = _OBSERVER_KEY_TO_FIELD[key] path = write_config_field(field_name, typed_value) logger.info( diff --git a/src/bach/services/launchd_service.py b/src/bach/services/launchd_service.py index ed6298a..868088b 100644 --- a/src/bach/services/launchd_service.py +++ b/src/bach/services/launchd_service.py @@ -33,16 +33,24 @@ logger = logging.getLogger(__name__) -# Stable reverse-DNS label for the LaunchAgent. Never change this — it is -# used as the plist filename, the launchctl service name, and for idempotency. +# Stable reverse-DNS label for the daemon LaunchAgent. Never change this — it +# is used as the plist filename, the launchctl service name, and for idempotency. LAUNCH_AGENT_LABEL = "com.bach.daemon" +# Stable reverse-DNS label for the sweep (calendar) LaunchAgent. Kept distinct +# from the daemon label so installing/uninstalling one agent never touches the +# other — both can coexist in launchd without coordination. +SUPERVISE_AGENT_LABEL = "com.bach.supervise" + # Type alias: runner takes an argv list and returns (returncode, output). RunnerFn = Callable[[list[str]], tuple[int, str]] # The fallback command users can run manually if launchctl is unavailable. _FALLBACK_CMD = "bach daemon run" +# Fallback command for the sweep agent (calendar-scheduled). +_SWEEP_FALLBACK_CMD = "bach supervise" + @dataclass(frozen=True) class LaunchdInstallResult: @@ -236,6 +244,244 @@ def status_launch_agent( } +# --------------------------------------------------------------------------- +# Sweep agent (calendar-scheduled) — com.bach.supervise +# --------------------------------------------------------------------------- + + +def _compute_sweep_hours( + sweeps_per_day: int, + window_start_hour: int, + window_end_hour: int, +) -> list[int]: + """Compute N evenly-spaced integer hours across [window_start, window_end). + + Spacing rule (uniform distribution): + - Entry i = round(window_start + i * span / N) for i in 0..N-1. + - This spreads entries across the FULL window so the last entry + lands near window_end rather than clustering early. + - N=1 always produces a single entry at window_start_hour. + - All entries are clamped to [0, 23] and must be < window_end_hour. + - Duplicate hours (when span < N) are retained; launchd deduplicates. + + Examples: + N=1, [8,22) → [8] (only start) + N=2, [8,22) → [8, 15] (span=14, step=7) + N=3, [9,21) → [9, 13, 17] (span=12, step=4) + N=5, [8,22) → [8, 11, 14, 16, 19] (span=14, step≈2.8, rounded) + """ + span = window_end_hour - window_start_hour # total hours in active window + + hours: list[int] = [] + for i in range(sweeps_per_day): + # round() not integer-step: the original bug used a floored step + # (span // N), which truncates 14//5=2 and clusters entries toward the + # start of the window, leaving the back half uncovered. round() over a + # float step distributes N points uniformly across the FULL span so the + # last entry reaches near window_end. + # Example N=5, [8,22): floored step gives [8,10,12,14,16] (nothing past + # 16:00); round() gives [8,11,14,16,19]. + h = round(window_start_hour + i * span / sweeps_per_day) + # Clamp to valid hour range and keep within window. + h = max(0, min(h, 23)) + if h >= window_end_hour: + # Safety guard: never schedule beyond the window end. + h = window_end_hour - 1 + hours.append(h) + + return hours + + +def render_sweep_agent( + *, + sweeps_per_day: int, + window_start_hour: int, + window_end_hour: int, + log_dir: Path, +) -> str: + """Return a valid plist XML string for the Bach sweep agent LaunchAgent. + + The plist contains: + Label — SUPERVISE_AGENT_LABEL (com.bach.supervise) + ProgramArguments — [sys.executable, "-m", "bach", "supervise"] + StartCalendarInterval — array of N {Hour, Minute} dicts, evenly spaced + across [window_start_hour, window_end_hour) + where N = sweeps_per_day. + StandardOutPath — /supervise.stdout.log + StandardErrorPath — /supervise.stderr.log + + Note: no KeepAlive or RunAtLoad — this is a calendar-triggered agent, not + a keep-alive daemon. It fires at scheduled times only. Omitting KeepAlive + prevents launchd from restarting a sweep that exits cleanly (which is + the normal case — run_sweep always exits 0 on successful sweep or skip). + + Spacing rule (see _compute_sweep_hours): + entry[i].Hour = round(window_start + i * (window_end - window_start) / sweeps_per_day) + This distributes N entries across the FULL window (not floor-division clustered). + + Args: + sweeps_per_day: Number of times to sweep per day (>0). + window_start_hour: First hour of the active window (inclusive, 0-24). + window_end_hour: Last hour of the active window (exclusive, 0-24). + log_dir: Directory for stdout/stderr log files. + + Returns: + A plist XML string suitable for writing to disk. + """ + # Use sys.executable so the correct venv Python is invoked even when + # the launchd environment has no PATH — same rationale as render_launch_agent. + program_args = [sys.executable, "-m", "bach", "supervise"] + + # Compute the evenly-spaced hours for the calendar intervals. + hours = _compute_sweep_hours(sweeps_per_day, window_start_hour, window_end_hour) + + # Build each calendar entry as {Hour: h, Minute: 0}. + # Minute is always 0 — we schedule at the top of the hour for simplicity. + calendar_intervals: list[dict[str, int]] = [ + {"Hour": h, "Minute": 0} for h in hours + ] + + data: dict[str, Any] = { + "Label": SUPERVISE_AGENT_LABEL, + "ProgramArguments": program_args, + "StartCalendarInterval": calendar_intervals, + "StandardOutPath": str(log_dir / "supervise.stdout.log"), + "StandardErrorPath": str(log_dir / "supervise.stderr.log"), + } + + # plistlib.dumps produces valid Apple plist XML — no hand-rolled XML needed. + return plistlib.dumps(data, fmt=plistlib.FMT_XML).decode("utf-8") + + +def install_sweep_agent( + *, + sweeps_per_day: int, + window_start_hour: int, + window_end_hour: int, + launch_agents_dir: Path | None = None, + log_dir: Path | None = None, + runner: RunnerFn | None = None, +) -> LaunchdInstallResult: + """Write the sweep agent plist and load it via launchctl. + + Idempotent: if the plist already exists it is overwritten, then launchctl + unloads the old instance before loading the new one (mirrors install_launch_agent). + + On launchctl failure the plist IS still written and a `fallback_command` + ('bach supervise') is returned so the user always has a manual path. + + Does NOT touch the daemon agent (com.bach.daemon) — fully independent. + + Args: + sweeps_per_day: Number of sweeps per day for the calendar schedule. + window_start_hour: Active window start (inclusive). + window_end_hour: Active window end (exclusive). + launch_agents_dir: Override for ~/Library/LaunchAgents (tests use tmp_path). + log_dir: Override for log directory (defaults to bach_home()/logs). + runner: Injectable launchctl runner (argv) -> (returncode, output). + + Returns: + LaunchdInstallResult with installed=True on plist write success. + fallback_command is set when launchctl load failed. + """ + resolved_dir = launch_agents_dir or _default_launch_agents_dir() + resolved_log_dir = log_dir or _default_log_dir() + resolved_runner = runner or _subprocess_runner + + # Ensure directories exist. + resolved_dir.mkdir(parents=True, exist_ok=True) + resolved_log_dir.mkdir(parents=True, exist_ok=True) + + plist_path = resolved_dir / f"{SUPERVISE_AGENT_LABEL}.plist" + + # Render the calendar agent plist from the pure render function. + xml = render_sweep_agent( + sweeps_per_day=sweeps_per_day, + window_start_hour=window_start_hour, + window_end_hour=window_end_hour, + log_dir=resolved_log_dir, + ) + + # Idempotency: unload before overwriting so launchd doesn't see a duplicate label. + if plist_path.exists(): + _try_unload(plist_path=plist_path, runner=resolved_runner) + + # Write the plist to disk. + plist_path.write_text(xml, encoding="utf-8") + logger.info("event=sweep_plist_written path=%s", plist_path) + + # Load the agent via launchctl. + fallback: str | None = None + load_rc, load_out = _try_load(plist_path=plist_path, runner=resolved_runner) + if load_rc != 0: + fallback = _SWEEP_FALLBACK_CMD + logger.warning( + "event=sweep_load_failed plist=%s rc=%d output=%r fallback=%r", + plist_path, + load_rc, + load_out, + fallback, + ) + else: + logger.info("event=sweep_load_ok plist=%s", plist_path) + + return LaunchdInstallResult(installed=True, fallback_command=fallback) + + +def uninstall_sweep_agent( + *, + launch_agents_dir: Path | None = None, + runner: RunnerFn | None = None, +) -> None: + """Unload and remove the sweep agent plist. + + Idempotent: if the plist does not exist, this is a no-op (no error raised). + Does NOT touch the daemon agent — only the com.bach.supervise plist. + + Args: + launch_agents_dir: Override for ~/Library/LaunchAgents (tests use tmp_path). + runner: Injectable launchctl runner. + """ + resolved_dir = launch_agents_dir or _default_launch_agents_dir() + resolved_runner = runner or _subprocess_runner + + plist_path = resolved_dir / f"{SUPERVISE_AGENT_LABEL}.plist" + + if not plist_path.exists(): + logger.info("event=sweep_uninstall_noop plist=%s (not found)", plist_path) + return + + # Unload from launchd before removing (ignore return code — may not be loaded). + _try_unload(plist_path=plist_path, runner=resolved_runner) + + plist_path.unlink(missing_ok=True) + logger.info("event=sweep_plist_removed path=%s", plist_path) + + +def status_sweep_agent( + *, + launch_agents_dir: Path | None = None, +) -> dict[str, Any]: + """Return the installation status of the sweep agent. + + Best-effort check based on plist file presence on disk. + + Args: + launch_agents_dir: Override for ~/Library/LaunchAgents (tests use tmp_path). + + Returns: + dict with keys: + installed: bool — plist file is present at the expected path. + plist_path: str — the expected plist path (for display). + """ + resolved_dir = launch_agents_dir or _default_launch_agents_dir() + plist_path = resolved_dir / f"{SUPERVISE_AGENT_LABEL}.plist" + return { + "installed": plist_path.exists(), + "plist_path": str(plist_path), + } + + # --------------------------------------------------------------------------- # Internal helpers — launchctl wrappers # --------------------------------------------------------------------------- diff --git a/src/bach/services/session_watcher.py b/src/bach/services/session_watcher.py index 3cbbf1f..d9f0471 100644 --- a/src/bach/services/session_watcher.py +++ b/src/bach/services/session_watcher.py @@ -30,7 +30,6 @@ import logging import time from collections.abc import Callable -from datetime import UTC, datetime from pathlib import Path from typing import Any @@ -45,7 +44,15 @@ ) from bach.runtimes.transcript import follow_events as _default_follow_events from bach.runtimes.transcript import read_tail_events as _read_tail_events -from bach.services.status_service import OBSERVER_SOURCE, set_task_status +from bach.services.status_service import set_task_status + +# Import shared verdict-apply path and helpers from the canonical module. +# Both session_watcher (continuous) and supervise_service (one-shot sweep) +# use the same apply_verdict so observer_moves/confidence gating is centralised. +from bach.services.verdict_apply import ( + SetTaskStatusFn, # noqa: F401 — re-exported type + ) +from bach.services.verdict_apply import apply_verdict as _apply_verdict_impl from bach.storage.artifacts import TaskArtifactStore logger = logging.getLogger(__name__) @@ -77,8 +84,8 @@ # services/judge_service.judge_session signature JudgeFn = Callable[..., JudgeVerdict | None] -# services/status_service.set_task_status signature -SetTaskStatusFn = Callable[[Path, str, str], Any] +# SetTaskStatusFn is imported from verdict_apply (shared canonical type). +# Re-assigning it here would shadow the import — see the import block above. def _default_set_task_status( @@ -134,31 +141,6 @@ def _read_artifact_payload(artifact: Path) -> dict[str, Any] | None: return None -def _append_log_event(artifact: Path, entry: dict[str, Any]) -> None: - """Append one entry to the artifact's log array. Non-fatal on failure. - - Delegates to TaskArtifactStore so the frontmatter/body split is always - done with maxsplit=2 (safe for body sections that contain "---" horizontal - rules). We read the payload, mutate the log list, then write_payload back - — body is preserved verbatim by the store's _split / _write pair. - """ - try: - store = TaskArtifactStore.from_artifact_path(artifact) - payload = store.read_frontmatter(artifact) - log = payload.get("log", []) - if not isinstance(log, list): - log = [] - log.append(entry) - payload["log"] = log - store.write_payload(artifact, payload) - except Exception as exc: # noqa: BLE001 - logger.warning( - "event=observer_log_append_error path=%s reason=%s", - artifact, - type(exc).__name__, - ) - - def _apply_verdict( artifact: Path, verdict: JudgeVerdict, @@ -168,97 +150,21 @@ def _apply_verdict( session_id: str, set_task_status_fn: SetTaskStatusFn, ) -> None: - """ - Apply a judge verdict to the artifact, respecting observer authority rules. + """Thin wrapper that delegates to the shared verdict_apply.apply_verdict. - Decision tree: - 1. If observer_moves=False OR confidence < threshold → log suggestion only. - 2. Else → call set_task_status with source="observer". - Status service enforces the authority matrix (executing→qa, etc.). - Any exception is caught and logged; non-fatal. - - `session_id` is included in every log line so the full judge/verdict decision - trail can be grepped end-to-end for a single session. + Kept here so existing call-sites and test imports (which import + `_apply_verdict` directly from session_watcher) continue to work + without any change. Behavior is identical — all logic lives in + services/verdict_apply.py. """ - log_only = not config.observer_moves or verdict.confidence < config.judge_confidence_threshold - - if log_only: - logger.info( - "event=observer_suggestion session_id=%s artifact=%s suggested=%r confidence=%.2f " - "needs_human=%s observer_moves=%s threshold=%.2f", - session_id, - artifact, - verdict.suggested_status, - verdict.confidence, - verdict.needs_human, - config.observer_moves, - config.judge_confidence_threshold, - ) - # Append the suggestion to the artifact log so the user can see it. - _append_log_event( - artifact, - { - "timestamp": datetime.now(UTC).isoformat(), - "event": "observer_suggestion", - "suggested_status": verdict.suggested_status, - "confidence": verdict.confidence, - "needs_human": verdict.needs_human, - "summary": verdict.summary, - "reason": ( - "observer_moves_disabled" - if not config.observer_moves - else "confidence_below_threshold" - ), - }, - ) - return - - # Observer move: attempt to write the status. - try: - set_task_status_fn(artifact, verdict.suggested_status, OBSERVER_SOURCE) - logger.info( - "event=observer_status_written session_id=%s artifact=%s from=%r to=%r " - "confidence=%.2f needs_human=%s", - session_id, - artifact, - current_status, - verdict.suggested_status, - verdict.confidence, - verdict.needs_human, - ) - except Exception as exc: # noqa: BLE001 - # Status service may raise InvalidStatusError (observer authority violation) - # or OSError (disk issue). Both are non-fatal — log and continue. - logger.warning( - "event=observer_status_write_error session_id=%s artifact=%s target=%r reason=%s", - session_id, - artifact, - verdict.suggested_status, - type(exc).__name__, - ) - # The verdict must stay visible even when the move is rejected — - # the board summary line and /show read the artifact log, and an - # authority rejection is information the human wants ("judge thinks - # this needs me, but it's in a protected stage"). - try: - _append_log_event( - artifact, - { - "timestamp": datetime.now(UTC).isoformat(), - "event": "observer_suggestion", - "suggested_status": verdict.suggested_status, - "confidence": verdict.confidence, - "needs_human": verdict.needs_human, - "summary": verdict.summary, - "reason": f"write_rejected:{type(exc).__name__}", - }, - ) - except Exception: # noqa: BLE001 — best-effort; primary error already logged - logger.warning( - "event=observer_suggestion_fallback_failed session_id=%s artifact=%s", - session_id, - artifact, - ) + _apply_verdict_impl( + artifact, + verdict, + config=config, + current_status=current_status, + session_id=session_id, + set_task_status_fn=set_task_status_fn, + ) def _run_judge( diff --git a/src/bach/services/supervise_service.py b/src/bach/services/supervise_service.py new file mode 100644 index 0000000..9ccab4c --- /dev/null +++ b/src/bach/services/supervise_service.py @@ -0,0 +1,330 @@ +"""One-shot supervision sweep service — issue #18, `bach supervise`. + +`run_sweep` iterates a list of task artifact Paths, reads each one's payload +and transcript tail, judges once via the injected judge_fn, and applies the +verdict via the injected apply_fn (which should wrap verdict_apply.apply_verdict). + +All external I/O, the LLM call, and the clock are injected as seams so tests +run without touching disk, processes, or real time. + +Key design decisions: + - One judge call per session per sweep: the cost-containment invariant. + The continuous watcher ticks every judge_interval_seconds; the sweep + fires once and exits. At 5 sweeps/day this is ~36× cheaper than the + daemon's per-tick pace. + - Per-session error isolation: a failure on one session is caught, recorded + as a SweepResult with error set, and the sweep continues. One bad + artifact must not abort the whole batch — mirrors the daemon's per- + watcher resilience. + - Active-hours window guard: if now.hour is outside [start, end) AND + force=False, the sweep returns an empty skipped report immediately — + no judge calls, no disk writes. This prevents launchd calendar fires + at odd hours (e.g. reboot at 02:00 catching up missed intervals) from + generating surprise LLM charges. + - apply_fn is called ONLY when judge_fn returns a non-None verdict. + - read_payload_fn / transcript_path_fn are injectable for tests; in + production they wrap _read_artifact_payload and _transcript_path_for + from session_watcher.py. +""" + +import logging +from collections.abc import Callable +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any + +from bach.config.settings import BachConfig +from bach.domain.models import AgentRuntime, JudgeVerdict, SessionEvent + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Domain types for the sweep report +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class SweepResult: + """Result for one artifact in a supervision sweep. + + Frozen: a sweep result is a point-in-time record — nothing mutates it + after construction. Callers render these into the Rich summary table. + """ + + artifact: Path + current_status: str # status read from artifact at sweep time + verdict: JudgeVerdict | None # None when judge returned nothing or error occurred + error: str | None # non-None when an exception was caught; contains exc type name + + +@dataclass(frozen=True) +class SweepReport: + """Aggregate result of one `bach supervise` sweep. + + skipped=True means the sweep was a no-op because the current time was + outside the configured active-hours window (and force=False). + """ + + results: list[SweepResult] = field(default_factory=list) + skipped: bool = False # True → window guard fired; no sessions judged + + +# --------------------------------------------------------------------------- +# Type aliases for injected callables +# --------------------------------------------------------------------------- + +# runtimes/transcript.read_tail_events signature +ReadEventsFn = Callable[ + [Path, AgentRuntime], # path, runtime (positional) + list[SessionEvent], +] + +# services/judge_service.judge_session (keyword-only args) +JudgeFn = Callable[..., JudgeVerdict | None] + +# The apply path — wraps verdict_apply.apply_verdict with the same signature +ApplyFn = Callable[ + ..., # (artifact, verdict, *, config, current_status, session_id) -> None + None, +] + +# services/session_watcher._read_artifact_payload +ReadPayloadFn = Callable[[Path], dict[str, Any] | None] + +# services/session_watcher._transcript_path_for +TranscriptPathFn = Callable[..., Path | None] + + +# --------------------------------------------------------------------------- +# run_sweep — the core sweep logic +# --------------------------------------------------------------------------- + + +def run_sweep( + artifacts: list[Path], + *, + read_events_fn: ReadEventsFn, + judge_fn: JudgeFn, + apply_fn: ApplyFn, + read_payload_fn: ReadPayloadFn, + transcript_path_fn: TranscriptPathFn, + config: BachConfig, + model: str, + now: datetime, + force: bool = False, +) -> SweepReport: + """Run one supervision sweep over a list of task artifact Paths. + + Args: + artifacts: List of artifact Paths to sweep. Caller builds + this via the same enumeration path as the daemon + (scan_registry_artifact_paths + list_sessions + + list_supervisable_artifacts). + read_events_fn: Callable(path, runtime) → list[SessionEvent]. + Injected so tests skip real transcript I/O. + judge_fn: Callable(**kw) → JudgeVerdict | None. + Same interface as judge_service.judge_session. + apply_fn: Callable(artifact, verdict, *, config, + current_status, session_id) → None. + Should wrap verdict_apply.apply_verdict. + read_payload_fn: Callable(Path) → dict | None. Reads artifact YAML. + transcript_path_fn: Callable(runtime, session_id, project_path) + → Path | None. Resolves transcript file path. + config: BachConfig with observer knobs + window settings. + model: Model slug forwarded to judge_fn. + now: Current local time (injected so tests fake the clock). + force: When True, bypass the active-hours window guard. + + Returns: + SweepReport with one SweepResult per artifact, or a skipped report + when the window guard fires without --force. + """ + # Active-hours window guard. Use now.hour (local time, already converted by + # the caller via datetime.now().astimezone()). Window is half-open: [start, end). + # Without this guard, launchd calendar fires at reboot time (which can be + # midnight) would trigger judge calls outside the user's working hours. + # --force lets a manual invocation bypass the window at any hour. + if not force: + start = config.supervise_window_start_hour + end = config.supervise_window_end_hour + local_hour = now.hour + if not (start <= local_hour < end): + logger.info( + "event=supervise_sweep_skipped reason=outside_window " + "local_hour=%d window=[%d,%d)", + local_hour, + start, + end, + ) + return SweepReport(results=[], skipped=True) + + if not artifacts: + logger.info("event=supervise_sweep_empty reason=no_supervisable_artifacts") + return SweepReport(results=[], skipped=False) + + results: list[SweepResult] = [] + + for artifact in artifacts: + result = _sweep_one( + artifact, + read_events_fn=read_events_fn, + judge_fn=judge_fn, + apply_fn=apply_fn, + read_payload_fn=read_payload_fn, + transcript_path_fn=transcript_path_fn, + config=config, + model=model, + ) + results.append(result) + + logger.info( + "event=supervise_sweep_done total=%d errors=%d", + len(results), + sum(1 for r in results if r.error is not None), + ) + return SweepReport(results=results, skipped=False) + + +def _sweep_one( + artifact: Path, + *, + read_events_fn: ReadEventsFn, + judge_fn: JudgeFn, + apply_fn: ApplyFn, + read_payload_fn: ReadPayloadFn, + transcript_path_fn: TranscriptPathFn, + config: BachConfig, + model: str, +) -> SweepResult: + """Judge + apply for exactly one artifact; catch all errors. + + Per-session isolation boundary: any exception from _sweep_one_unsafe + (bad YAML, missing transcript, LLM error, disk write failure) is caught + here so the run_sweep loop continues to the remaining sessions. + The error is surfaced in the SweepResult so the Rich table can render + an error row — the user sees which sessions failed without losing the + results for sessions that succeeded. + """ + try: + return _sweep_one_unsafe( + artifact, + read_events_fn=read_events_fn, + judge_fn=judge_fn, + apply_fn=apply_fn, + read_payload_fn=read_payload_fn, + transcript_path_fn=transcript_path_fn, + config=config, + model=model, + ) + except Exception as exc: # noqa: BLE001 + logger.warning( + "event=supervise_session_error artifact=%s reason=%s", + artifact, + type(exc).__name__, + exc_info=True, + ) + return SweepResult( + artifact=artifact, + current_status="", + verdict=None, + error=type(exc).__name__, + ) + + +def _sweep_one_unsafe( + artifact: Path, + *, + read_events_fn: ReadEventsFn, + judge_fn: JudgeFn, + apply_fn: ApplyFn, + read_payload_fn: ReadPayloadFn, + transcript_path_fn: TranscriptPathFn, + config: BachConfig, + model: str, +) -> SweepResult: + """Core sweep logic for one artifact — no error catching (caller wraps it). + + Steps: + 1. Read the artifact payload (task_title, current_status, runtime, session_id). + 2. Resolve the transcript path via transcript_path_fn. + 3. Read transcript tail events via read_events_fn. + 4. Call judge_fn once. + 5. If verdict is not None, call apply_fn. + 6. Return a SweepResult. + """ + # --- Step 1: read payload --- + payload = read_payload_fn(artifact) + if payload is None: + raise ValueError(f"Could not read artifact payload from {artifact}") + + current_status = str(payload.get("status", "")) + task_title = str(payload.get("task", {}).get("title", "") or payload.get("id", "")) + agent = payload.get("agent") or {} + session_id = str( + agent.get("runtime_session_id", "") or agent.get("bach_session_id", "") + ) + runtime_str = str(agent.get("runtime", "claude-code")) + try: + runtime = AgentRuntime(runtime_str) + except ValueError: + runtime = AgentRuntime.claude_code + + # --- Step 2: resolve transcript path --- + project_path = Path(str(payload.get("project", {}).get("path", ""))) + transcript_path = transcript_path_fn(runtime, session_id, project_path) + + # --- Step 3: read events --- + events: list[SessionEvent] = [] + if transcript_path is not None: + try: + events = read_events_fn(transcript_path, runtime) + except Exception as exc: # noqa: BLE001 + # Non-fatal: proceed with empty events. The judge may still + # produce a verdict from the task context alone. + logger.warning( + "event=supervise_read_events_failed artifact=%s reason=%s", + artifact, + type(exc).__name__, + ) + + # --- Step 4: judge once --- + verdict: JudgeVerdict | None = judge_fn( + events=events, + task_title=task_title, + current_status=current_status, + model=model, + ) + + logger.info( + "event=supervise_judged artifact=%s session_id=%s status=%r " + "verdict=%s confidence=%s", + artifact, + session_id, + current_status, + verdict.suggested_status if verdict else "none", + f"{verdict.confidence:.2f}" if verdict else "n/a", + ) + + # --- Step 5: apply verdict (only when judge returned something) --- + # apply_fn wraps verdict_apply.apply_verdict — the same shared path + # the continuous watcher uses. Observer authority semantics (confidence + # threshold, observer_moves flag, status transition matrix) all apply + # identically here. We do not apply if the judge returned None (no + # opinion) to avoid spurious no-op writes. + if verdict is not None: + apply_fn( + artifact, + verdict, + config=config, + current_status=current_status, + session_id=session_id, + ) + + return SweepResult( + artifact=artifact, + current_status=current_status, + verdict=verdict, + error=None, + ) diff --git a/src/bach/services/verdict_apply.py b/src/bach/services/verdict_apply.py new file mode 100644 index 0000000..d135b0f --- /dev/null +++ b/src/bach/services/verdict_apply.py @@ -0,0 +1,166 @@ +"""Shared verdict-apply path for session observer and supervise sweep. + +This module holds the single canonical `apply_verdict` function that both +`session_watcher.py` (continuous loop) and `supervise_service.py` (one-shot +sweep) use to apply a JudgeVerdict to a task artifact. + +WHY a shared module: before this extraction the apply logic lived inside +session_watcher.py. When the sweep was added it needed the same observer +authority semantics (confidence threshold, observer_moves flag, transition +matrix enforcement). Duplicating would create two places to diverge. A single +module is the contract that "sweep verdict = watcher verdict, same rules". + +It also re-exports `_append_log_event` (private helper) so both callers can +log suggestions to the artifact without duplicating the logic. + +Decision tree in apply_verdict: + 1. observer_moves=False OR confidence < threshold → log suggestion only. + 2. Else → call set_task_status with source="observer". + Any exception from set_task_status is caught, logged, and a suggestion + log entry is appended so the board and /show always see the verdict. + +All failures are non-fatal — the caller's loop must continue after any error. +""" + +import logging +from collections.abc import Callable +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +from bach.config.settings import BachConfig +from bach.domain.models import JudgeVerdict +from bach.services.status_service import OBSERVER_SOURCE + +logger = logging.getLogger(__name__) + +# Type alias: matches status_service.set_task_status(artifact, new_status, source) +SetTaskStatusFn = Callable[[Path, str, str], Any] + + +def _append_log_event(artifact: Path, entry: dict[str, Any]) -> None: + """Append one entry to the artifact's log array. Non-fatal on failure. + + Delegates to TaskArtifactStore so the frontmatter/body split is always + done with maxsplit=2 (safe for body sections that contain "---" horizontal + rules). We read the payload, mutate the log list, then write_payload back + — body is preserved verbatim by the store's _split / _write pair. + """ + try: + from bach.storage.artifacts import TaskArtifactStore # noqa: PLC0415 + + store = TaskArtifactStore.from_artifact_path(artifact) + payload = store.read_frontmatter(artifact) + log = payload.get("log", []) + if not isinstance(log, list): + log = [] + log.append(entry) + payload["log"] = log + store.write_payload(artifact, payload) + except Exception as exc: # noqa: BLE001 + logger.warning( + "event=observer_log_append_error path=%s reason=%s", + artifact, + type(exc).__name__, + ) + + +def apply_verdict( + artifact: Path, + verdict: JudgeVerdict, + *, + config: BachConfig, + current_status: str, + session_id: str, + set_task_status_fn: SetTaskStatusFn, +) -> None: + """Apply a judge verdict to the artifact, respecting observer authority rules. + + Decision tree: + 1. If observer_moves=False OR confidence < threshold → log suggestion only. + 2. Else → call set_task_status with source="observer". + Status service enforces the authority matrix (executing→qa, etc.). + Any exception is caught and logged; non-fatal. + + `session_id` is included in every log line so the full judge/verdict decision + trail can be grepped end-to-end for a single session. + """ + log_only = not config.observer_moves or verdict.confidence < config.judge_confidence_threshold + + if log_only: + logger.info( + "event=observer_suggestion session_id=%s artifact=%s suggested=%r confidence=%.2f " + "needs_human=%s observer_moves=%s threshold=%.2f", + session_id, + artifact, + verdict.suggested_status, + verdict.confidence, + verdict.needs_human, + config.observer_moves, + config.judge_confidence_threshold, + ) + # Append the suggestion to the artifact log so the user can see it. + _append_log_event( + artifact, + { + "timestamp": datetime.now(UTC).isoformat(), + "event": "observer_suggestion", + "suggested_status": verdict.suggested_status, + "confidence": verdict.confidence, + "needs_human": verdict.needs_human, + "summary": verdict.summary, + "reason": ( + "observer_moves_disabled" + if not config.observer_moves + else "confidence_below_threshold" + ), + }, + ) + return + + # Observer move: attempt to write the status. + try: + set_task_status_fn(artifact, verdict.suggested_status, OBSERVER_SOURCE) + logger.info( + "event=observer_status_written session_id=%s artifact=%s from=%r to=%r " + "confidence=%.2f needs_human=%s", + session_id, + artifact, + current_status, + verdict.suggested_status, + verdict.confidence, + verdict.needs_human, + ) + except Exception as exc: # noqa: BLE001 + # Status service may raise InvalidStatusError (observer authority violation) + # or OSError (disk issue). Both are non-fatal — log and continue. + logger.warning( + "event=observer_status_write_error session_id=%s artifact=%s target=%r reason=%s", + session_id, + artifact, + verdict.suggested_status, + type(exc).__name__, + ) + # The verdict must stay visible even when the move is rejected. + # The board summary line and /show read the artifact log, and an + # authority rejection is information the human wants — they can + # decide whether to move the status manually via REPL or CLI. + try: + _append_log_event( + artifact, + { + "timestamp": datetime.now(UTC).isoformat(), + "event": "observer_suggestion", + "suggested_status": verdict.suggested_status, + "confidence": verdict.confidence, + "needs_human": verdict.needs_human, + "summary": verdict.summary, + "reason": f"write_rejected:{type(exc).__name__}", + }, + ) + except Exception: # noqa: BLE001 — best-effort; primary error already logged + logger.warning( + "event=observer_suggestion_fallback_failed session_id=%s artifact=%s", + session_id, + artifact, + ) diff --git a/tests/unit/test_config_set.py b/tests/unit/test_config_set.py index 87dcfe7..47e005c 100644 --- a/tests/unit/test_config_set.py +++ b/tests/unit/test_config_set.py @@ -35,12 +35,14 @@ # --------------------------------------------------------------------------- -def test_observer_settable_keys_has_nine_entries() -> None: - """The allowlist must contain exactly 9 observer knobs. +def test_observer_settable_keys_has_eleven_entries() -> None: + """The allowlist must contain exactly 12 observer knobs. Phase 17 added daemon-reconcile-seconds to the original 8. + Issue #18 added supervise-window-start-hour and supervise-window-end-hour. + Issue #19 added supervise-sweeps-per-day. """ - assert len(OBSERVER_SETTABLE_KEYS) == 9 + assert len(OBSERVER_SETTABLE_KEYS) == 12 def test_observer_settable_keys_contains_expected_keys() -> None: @@ -56,6 +58,11 @@ def test_observer_settable_keys_contains_expected_keys() -> None: "afk-time-budget-minutes", # Phase 17 (issue #12) — daemon supervisor cycle interval "daemon-reconcile-seconds", + # Issue #18 — supervise active-hours window + "supervise-window-start-hour", + "supervise-window-end-hour", + # Issue #19 — launchd calendar sweep agent + "supervise-sweeps-per-day", } assert set(OBSERVER_SETTABLE_KEYS.keys()) == expected diff --git a/tests/unit/test_supervise.py b/tests/unit/test_supervise.py new file mode 100644 index 0000000..6097c7e --- /dev/null +++ b/tests/unit/test_supervise.py @@ -0,0 +1,826 @@ +"""Tests for bach supervise — issue #18. + +Strategy: + - All external seams injected (no real I/O, LLM, clock, or processes). + - run_sweep covered: judge+apply per session, observer_moves on/off, + window skip, --force override, per-session error isolation, empty set. + - Config window validation tested separately. + - CLI routing (group + bare + --task + --force) tested via typer CliRunner. + +TDD: these tests are written RED before any implementation. +""" + +from datetime import UTC, datetime +from pathlib import Path +from typing import Any + +import pytest +import yaml +from typer.testing import CliRunner + +from bach.config.settings import BachConfig +from bach.domain.models import AgentRuntime, JudgeVerdict, SessionEvent, SessionEventKind + +# --------------------------------------------------------------------------- +# Helpers shared across tests +# --------------------------------------------------------------------------- + + +def _write_artifact( + tmp_path: Path, + task_id: str = "t1", + status: str = "executing", + session_id: str = "sess-abc", + runtime: str = "claude-code", +) -> Path: + """Create a minimal valid task artifact for testing.""" + project_dir = tmp_path / "proj" + date_dir = project_dir / ".bach" / "runs" / "2026-06-10" + date_dir.mkdir(parents=True) + + artifact = date_dir / f"task-{task_id}-test.md" + payload: dict[str, Any] = { + "id": f"task-{task_id}-test", + "task_id": task_id, + "date": "2026-06-10", + "status": status, + "project": {"name": "proj", "path": str(project_dir)}, + "agent": { + "runtime": runtime, + "bach_session_id": f"bach-{task_id}", + "runtime_session_id": session_id, + "runtime_session_name": f"bach-proj-{task_id}", + "launch_command": None, + "resume_command": None, + "restart_command": None, + }, + "skill": {"name": "grill-me", "status": "not_started"}, + "task": { + "title": f"Task {task_id}", + "original_description": f"desc {task_id}", + "objective": None, + "non_goals": [], + "constraints": [], + "likely_files_or_areas": [], + "acceptance_checks": [], + "risks_or_unknowns": [], + }, + "post_grill": {"readiness": "not_ready", "next_action": "undecided"}, + "log": [], + } + body = "\n## Notes\n\n" + artifact.write_text(f"---\n{yaml.safe_dump(payload, sort_keys=False)}---\n{body}") + return artifact + + +def _read_status(artifact: Path) -> str: + """Return the current status from artifact frontmatter.""" + raw = yaml.safe_load(artifact.read_text().split("---", 2)[1]) + return str(raw.get("status", "")) + + +def _make_verdict( + suggested_status: str = "qa", + confidence: float = 0.9, + needs_human: bool = False, +) -> JudgeVerdict: + return JudgeVerdict( + summary="session looks good", + suggested_status=suggested_status, + confidence=confidence, + needs_human=needs_human, + ) + + +def _make_event(kind: SessionEventKind = SessionEventKind.assistant_turn) -> SessionEvent: + return SessionEvent( + kind=kind, + timestamp=datetime(2026, 6, 10, 12, 0, 0, tzinfo=UTC), + text="hello", + raw_kind=kind.value, + ) + + +def _make_config( + observer_moves: bool = True, + judge_confidence_threshold: float = 0.7, + supervise_window_start_hour: int = 8, + supervise_window_end_hour: int = 22, +) -> BachConfig: + return BachConfig( + observer_moves=observer_moves, + judge_confidence_threshold=judge_confidence_threshold, + supervise_window_start_hour=supervise_window_start_hour, + supervise_window_end_hour=supervise_window_end_hour, + ) + + +def _fake_read_events( + path: Path, runtime: AgentRuntime, *, max_events: int = 50 +) -> list[SessionEvent]: + """Return a minimal non-empty event list (no real I/O).""" + return [_make_event()] + + +def _fake_transcript_path( + runtime: AgentRuntime, session_id: str, project_path: Path, **_kw: object +) -> Path | None: + return None # spool-only mode is fine for sweep tests + + +def _fake_read_payload(artifact: Path) -> dict[str, Any] | None: + """Parse artifact frontmatter without the TaskArtifactStore helper.""" + try: + raw_text = artifact.read_text() + parts = raw_text.split("---", 2) + if len(parts) < 2: + return None + return yaml.safe_load(parts[1]) or {} + except Exception: + return None + + +# --------------------------------------------------------------------------- +# 1. run_sweep — judge+apply per session (happy path) +# --------------------------------------------------------------------------- + + +class TestRunSweepHappyPath: + """SweepReport must contain one result per artifact with judge applied.""" + + def test_single_artifact_judged_and_applied(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + artifact = _write_artifact(tmp_path, task_id="t1", status="executing") + cfg = _make_config() + + status_writes: list[tuple[Path, str, str]] = [] + + def _judge(**_kw: Any) -> JudgeVerdict: + return _make_verdict(suggested_status="qa", confidence=0.9) + + def _set_status(a: Path, new_status: str, source: str) -> None: + status_writes.append((a, new_status, source)) + + def _apply_fn( + a: Path, + verdict: JudgeVerdict, + *, + config: BachConfig, + current_status: str, + session_id: str, + ) -> None: + _set_status(a, verdict.suggested_status, "observer") + + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [artifact], + read_events_fn=_fake_read_events, + judge_fn=_judge, + apply_fn=_apply_fn, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + assert len(report.results) == 1 + result = report.results[0] + assert result.artifact == artifact + assert result.error is None + assert result.verdict is not None + assert result.verdict.suggested_status == "qa" + + def test_multiple_artifacts_all_judged(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + artifacts = [ + _write_artifact( + tmp_path / f"p{i}", + task_id=f"t{i}", + status="executing", + session_id=f"sess-{i}", + ) + for i in range(1, 4) + ] + cfg = _make_config() + judged_count = [0] + + def _judge(**_kw: Any) -> JudgeVerdict: + judged_count[0] += 1 + return _make_verdict(suggested_status="qa") + + def _apply_fn(a: Path, verdict: JudgeVerdict, **_kw: Any) -> None: + pass + + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + artifacts, + read_events_fn=_fake_read_events, + judge_fn=_judge, + apply_fn=_apply_fn, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + assert len(report.results) == 3 + assert judged_count[0] == 3 + + def test_empty_artifact_list_returns_empty_report(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + cfg = _make_config() + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [], + read_events_fn=_fake_read_events, + judge_fn=lambda **_: None, + apply_fn=lambda *a, **k: None, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + assert report.results == [] + assert not report.skipped + + +# --------------------------------------------------------------------------- +# 2. run_sweep — observer_moves on/off +# --------------------------------------------------------------------------- + + +class TestRunSweepObserverMoves: + """apply_fn is called regardless; its behavior depends on config (tested via verdict_apply).""" + + def test_apply_fn_called_when_observer_moves_true(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + artifact = _write_artifact(tmp_path, task_id="t1", status="executing") + cfg = _make_config(observer_moves=True) + apply_calls: list[Path] = [] + + def _apply_fn(a: Path, verdict: JudgeVerdict, **_kw: Any) -> None: + apply_calls.append(a) + + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + run_sweep( + [artifact], + read_events_fn=_fake_read_events, + judge_fn=lambda **_: _make_verdict(), + apply_fn=_apply_fn, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + assert artifact in apply_calls + + def test_apply_fn_still_called_when_observer_moves_false(self, tmp_path: Path) -> None: + """With observer_moves=False the sweep still calls apply_fn; the apply + function itself (verdict_apply.apply_verdict) gates the actual write. + run_sweep's contract is to ALWAYS call apply_fn when a verdict is returned.""" + from bach.services.supervise_service import run_sweep + + artifact = _write_artifact(tmp_path, task_id="t1", status="executing") + cfg = _make_config(observer_moves=False) + apply_calls: list[Path] = [] + + def _apply_fn(a: Path, verdict: JudgeVerdict, **_kw: Any) -> None: + apply_calls.append(a) + + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + run_sweep( + [artifact], + read_events_fn=_fake_read_events, + judge_fn=lambda **_: _make_verdict(), + apply_fn=_apply_fn, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + assert artifact in apply_calls + + +# --------------------------------------------------------------------------- +# 3. run_sweep — active-hours window guard +# --------------------------------------------------------------------------- + + +class TestRunSweepWindowGuard: + """Outside window without --force → skipped report. With --force → runs.""" + + def test_outside_window_returns_skipped_report(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + artifact = _write_artifact(tmp_path, task_id="t1") + # Window 8–22, time is 03:00 — outside + cfg = _make_config(supervise_window_start_hour=8, supervise_window_end_hour=22) + judged = [0] + + def _judge(**_kw: Any) -> JudgeVerdict: + judged[0] += 1 + return _make_verdict() + + now = datetime(2026, 6, 10, 3, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [artifact], + read_events_fn=_fake_read_events, + judge_fn=_judge, + apply_fn=lambda *a, **k: None, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=False, # no force + ) + + assert report.skipped is True + assert judged[0] == 0 # judge must NOT have been called + + def test_force_bypasses_window(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + artifact = _write_artifact(tmp_path, task_id="t1") + cfg = _make_config(supervise_window_start_hour=8, supervise_window_end_hour=22) + judged = [0] + + def _judge(**_kw: Any) -> JudgeVerdict: + judged[0] += 1 + return _make_verdict() + + now = datetime(2026, 6, 10, 3, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [artifact], + read_events_fn=_fake_read_events, + judge_fn=_judge, + apply_fn=lambda *a, **k: None, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, # force bypasses + ) + + assert not report.skipped + assert judged[0] == 1 + + def test_inside_window_runs_normally(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + artifact = _write_artifact(tmp_path, task_id="t1") + cfg = _make_config(supervise_window_start_hour=8, supervise_window_end_hour=22) + judged = [0] + + def _judge(**_kw: Any) -> JudgeVerdict: + judged[0] += 1 + return _make_verdict() + + # 14:00 is inside [8, 22) + now = datetime(2026, 6, 10, 14, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [artifact], + read_events_fn=_fake_read_events, + judge_fn=_judge, + apply_fn=lambda *a, **k: None, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=False, + ) + + assert not report.skipped + assert judged[0] == 1 + + +# --------------------------------------------------------------------------- +# 4. run_sweep — per-session error isolation +# --------------------------------------------------------------------------- + + +class TestRunSweepErrorIsolation: + """One session failure must not abort the sweep.""" + + def test_judge_error_on_one_session_continues_to_next(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + a1 = _write_artifact(tmp_path / "p1", task_id="t1", session_id="sess-1") + a2 = _write_artifact(tmp_path / "p2", task_id="t2", session_id="sess-2") + cfg = _make_config() + call_count = [0] + + def _judge(**_kw: Any) -> JudgeVerdict: + call_count[0] += 1 + if call_count[0] == 1: + raise RuntimeError("LLM failed on first session") + return _make_verdict() + + apply_calls: list[Path] = [] + + def _apply_fn(a: Path, verdict: JudgeVerdict, **_kw: Any) -> None: + apply_calls.append(a) + + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [a1, a2], + read_events_fn=_fake_read_events, + judge_fn=_judge, + apply_fn=_apply_fn, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + # Both sessions processed (sweep did not abort) + assert len(report.results) == 2 + # First has error, second has verdict + error_results = [r for r in report.results if r.error is not None] + ok_results = [r for r in report.results if r.error is None] + assert len(error_results) == 1 + assert len(ok_results) == 1 + # apply_fn was called for the second session + assert a2 in apply_calls + + def test_apply_error_on_one_session_continues_to_next(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + a1 = _write_artifact(tmp_path / "p1", task_id="t1", session_id="sess-1") + a2 = _write_artifact(tmp_path / "p2", task_id="t2", session_id="sess-2") + cfg = _make_config() + apply_call_count = [0] + + def _apply_fn(a: Path, verdict: JudgeVerdict, **_kw: Any) -> None: + apply_call_count[0] += 1 + if apply_call_count[0] == 1: + raise OSError("disk write failed") + + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [a1, a2], + read_events_fn=_fake_read_events, + judge_fn=lambda **_: _make_verdict(), + apply_fn=_apply_fn, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + # apply was attempted for both + assert apply_call_count[0] == 2 + assert len(report.results) == 2 + + +# --------------------------------------------------------------------------- +# 5. run_sweep — None verdict (judge returns no info) +# --------------------------------------------------------------------------- + + +class TestRunSweepNoVerdict: + def test_none_verdict_does_not_call_apply(self, tmp_path: Path) -> None: + from bach.services.supervise_service import run_sweep + + artifact = _write_artifact(tmp_path, task_id="t1") + cfg = _make_config() + apply_calls: list[Path] = [] + + def _apply_fn(a: Path, verdict: JudgeVerdict, **_kw: Any) -> None: + apply_calls.append(a) + + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [artifact], + read_events_fn=_fake_read_events, + judge_fn=lambda **_: None, # no verdict + apply_fn=_apply_fn, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + assert len(report.results) == 1 + assert not apply_calls # apply_fn not called + + +# --------------------------------------------------------------------------- +# 6. SweepReport shape +# --------------------------------------------------------------------------- + + +class TestSweepReportShape: + def test_sweep_result_fields_present(self, tmp_path: Path) -> None: + from bach.services.supervise_service import SweepResult, run_sweep + + artifact = _write_artifact(tmp_path, task_id="t1", status="executing") + cfg = _make_config() + now = datetime(2026, 6, 10, 12, 0, tzinfo=UTC).astimezone() + + report = run_sweep( + [artifact], + read_events_fn=_fake_read_events, + judge_fn=lambda **_: _make_verdict(suggested_status="qa"), + apply_fn=lambda *a, **k: None, + read_payload_fn=_fake_read_payload, + transcript_path_fn=_fake_transcript_path, + config=cfg, + model="gpt-test", + now=now, + force=True, + ) + + assert len(report.results) == 1 + r: SweepResult = report.results[0] + # Required fields on SweepResult + assert r.artifact == artifact + assert r.current_status == "executing" + assert r.verdict is not None + assert r.verdict.suggested_status == "qa" + assert r.error is None + + +# --------------------------------------------------------------------------- +# 7. Config window knobs — settings.py +# --------------------------------------------------------------------------- + + +class TestConfigWindowKnobs: + def test_default_window_values(self) -> None: + cfg = BachConfig() + assert cfg.supervise_window_start_hour == 8 + assert cfg.supervise_window_end_hour == 22 + + def test_custom_window_values_accepted(self) -> None: + cfg = BachConfig(supervise_window_start_hour=6, supervise_window_end_hour=18) + assert cfg.supervise_window_start_hour == 6 + assert cfg.supervise_window_end_hour == 18 + + def test_window_knobs_in_observer_settable_keys(self) -> None: + from bach.config.settings import OBSERVER_SETTABLE_KEYS + + assert "supervise-window-start-hour" in OBSERVER_SETTABLE_KEYS + assert "supervise-window-end-hour" in OBSERVER_SETTABLE_KEYS + + def test_set_observer_key_window_start_valid(self, tmp_path: Path) -> None: + """set_observer_key accepts valid start hour.""" + from unittest.mock import patch + + from bach.config.settings import set_observer_key + + with patch("bach.config.settings.write_config_field") as mock_write: + with patch("bach.config.settings.load_config") as mock_load: + mock_load.return_value = BachConfig( + supervise_window_start_hour=8, supervise_window_end_hour=22 + ) + set_observer_key("supervise-window-start-hour", "6") + mock_write.assert_called_once() + + def test_set_observer_key_window_end_valid(self, tmp_path: Path) -> None: + """set_observer_key accepts valid end hour.""" + from unittest.mock import patch + + from bach.config.settings import set_observer_key + + with patch("bach.config.settings.write_config_field") as mock_write: + with patch("bach.config.settings.load_config") as mock_load: + mock_load.return_value = BachConfig( + supervise_window_start_hour=8, supervise_window_end_hour=22 + ) + set_observer_key("supervise-window-end-hour", "20") + mock_write.assert_called_once() + + def test_set_observer_key_start_gte_end_raises(self) -> None: + """start >= end violates invariant → ValueError.""" + from unittest.mock import patch + + from bach.config.settings import set_observer_key + + with patch("bach.config.settings.load_config") as mock_load: + mock_load.return_value = BachConfig( + supervise_window_start_hour=8, supervise_window_end_hour=22 + ) + # Setting start = 22 would make start == end → invalid + with pytest.raises(ValueError, match="start.*end|window"): + set_observer_key("supervise-window-start-hour", "22") + + def test_set_observer_key_end_lte_start_raises(self) -> None: + """Setting end <= start raises ValueError.""" + from unittest.mock import patch + + from bach.config.settings import set_observer_key + + with patch("bach.config.settings.load_config") as mock_load: + mock_load.return_value = BachConfig( + supervise_window_start_hour=10, supervise_window_end_hour=22 + ) + # Setting end = 5 would make end < start → invalid + with pytest.raises(ValueError, match="start.*end|window"): + set_observer_key("supervise-window-end-hour", "5") + + def test_window_loads_from_yaml(self, tmp_path: Path) -> None: + """Window knobs are loaded from config.yaml correctly.""" + from bach.config.settings import load_config + + cfg_file = tmp_path / "config.yaml" + cfg_file.write_text("supervise_window_start_hour: 7\nsupervise_window_end_hour: 20\n") + cfg = load_config(cfg_file) + assert cfg.supervise_window_start_hour == 7 + assert cfg.supervise_window_end_hour == 20 + + def test_window_invalid_yaml_falls_back_to_default(self, tmp_path: Path) -> None: + """Invalid window value in yaml falls back to default.""" + from bach.config.settings import load_config + + cfg_file = tmp_path / "config.yaml" + # start=0 is valid (0 <= start), end=30 is invalid (>24) + cfg_file.write_text("supervise_window_start_hour: 0\nsupervise_window_end_hour: 30\n") + cfg = load_config(cfg_file) + # start=0 is valid; end=30 > 24 → falls back to default 22 + assert cfg.supervise_window_start_hour == 0 + assert cfg.supervise_window_end_hour == 22 + + +# --------------------------------------------------------------------------- +# 8. verdict_apply module — shared apply path +# --------------------------------------------------------------------------- + + +class TestVerdictApplyModule: + """Verify the shared verdict-apply module exists and works identically to + the original _apply_verdict from session_watcher.""" + + def test_apply_verdict_moves_status_when_observer_moves_true(self, tmp_path: Path) -> None: + from bach.services.verdict_apply import apply_verdict + + artifact = _write_artifact(tmp_path, task_id="t1", status="executing") + cfg = BachConfig(observer_moves=True, judge_confidence_threshold=0.7) + verdict = _make_verdict(suggested_status="qa", confidence=0.9) + status_writes: list[tuple[str, str]] = [] + + def _set_status(a: Path, new_status: str, source: str) -> None: + status_writes.append((new_status, source)) + + apply_verdict( + artifact, + verdict, + config=cfg, + current_status="executing", + session_id="sess-1", + set_task_status_fn=_set_status, + ) + + assert ("qa", "observer") in status_writes + + def test_apply_verdict_logs_when_observer_moves_false(self, tmp_path: Path) -> None: + from bach.services.verdict_apply import apply_verdict + + artifact = _write_artifact(tmp_path, task_id="t1", status="executing") + cfg = BachConfig(observer_moves=False, judge_confidence_threshold=0.7) + verdict = _make_verdict(suggested_status="qa", confidence=0.9) + status_writes: list[tuple[str, str]] = [] + + def _set_status(a: Path, new_status: str, source: str) -> None: + status_writes.append((new_status, source)) + + apply_verdict( + artifact, + verdict, + config=cfg, + current_status="executing", + session_id="sess-1", + set_task_status_fn=_set_status, + ) + + assert not status_writes + text = artifact.read_text() + assert "observer_suggestion" in text + + def test_apply_verdict_logs_when_confidence_below_threshold(self, tmp_path: Path) -> None: + from bach.services.verdict_apply import apply_verdict + + artifact = _write_artifact(tmp_path, task_id="t1", status="executing") + cfg = BachConfig(observer_moves=True, judge_confidence_threshold=0.8) + verdict = _make_verdict(suggested_status="qa", confidence=0.5) + status_writes: list[str] = [] + + def _set_status(a: Path, new_status: str, source: str) -> None: + status_writes.append(new_status) + + apply_verdict( + artifact, + verdict, + config=cfg, + current_status="executing", + session_id="sess-1", + set_task_status_fn=_set_status, + ) + + assert not status_writes + text = artifact.read_text() + assert "observer_suggestion" in text + + def test_session_watcher_still_imports_apply_verdict(self) -> None: + """session_watcher._apply_verdict still exists (re-exported) for the + existing test imports to work without change.""" + from bach.services.session_watcher import _apply_verdict # noqa: F401 + + assert callable(_apply_verdict) + + +# --------------------------------------------------------------------------- +# 9. CLI routing — `bach supervise` +# --------------------------------------------------------------------------- + + +class TestSuperviseCliRouting: + """Verify the CLI group is registered and basic routing works.""" + + def test_supervise_is_registered_in_app(self) -> None: + """bach supervise is a registered typer group.""" + from bach.cli.app import app + + # Walk the registered commands — supervise must appear + command_names = {c.name for c in app.registered_commands} + group_names = {g.name for g in app.registered_groups} + assert "supervise" in group_names or "supervise" in command_names + + def test_supervise_help_does_not_crash(self) -> None: + """bach supervise --help exits 0.""" + from bach.cli.app import app + + runner = CliRunner() + result = runner.invoke(app, ["supervise", "--help"]) + assert result.exit_code == 0 + + def test_supervise_run_help_does_not_crash(self) -> None: + """bach supervise run --help exits 0.""" + from bach.cli.app import app + + runner = CliRunner() + result = runner.invoke(app, ["supervise", "run", "--help"]) + assert result.exit_code == 0 + + def test_supervise_bare_exits_zero_with_no_linked_sessions( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """bach supervise with no linked sessions → 'nothing to supervise', exit 0.""" + from bach.cli import supervise_cmd + from bach.cli.app import app + + # Patch the enumeration to return empty list + monkeypatch.setattr(supervise_cmd, "_enumerate_supervisable_artifacts", lambda: []) + runner = CliRunner() + result = runner.invoke(app, ["supervise"]) + assert result.exit_code == 0 + output = result.output.lower() + assert "nothing" in output or "no" in output + + def test_supervise_run_exits_zero_with_no_linked_sessions( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """bach supervise run with no linked sessions → exit 0.""" + from bach.cli import supervise_cmd + from bach.cli.app import app + + monkeypatch.setattr(supervise_cmd, "_enumerate_supervisable_artifacts", lambda: []) + runner = CliRunner() + result = runner.invoke(app, ["supervise", "run"]) + assert result.exit_code == 0 diff --git a/tests/unit/test_supervise_launchd_cmd.py b/tests/unit/test_supervise_launchd_cmd.py new file mode 100644 index 0000000..db4c5f7 --- /dev/null +++ b/tests/unit/test_supervise_launchd_cmd.py @@ -0,0 +1,147 @@ +"""Tests for `bach supervise install` and `bach supervise uninstall` CLI commands. + +Strategy (mirrors test_daemon_launchd_cmd.py): + - All launchd service calls are patched — no real launchctl or + ~/Library/LaunchAgents touched. + - Tests verify: subcommand registration, success output, fallback output + when launchctl fails, and idempotent-call behaviour. +""" + +from unittest.mock import MagicMock, patch + +from typer.testing import CliRunner + +from bach.cli.app import app +from bach.services.launchd_service import LaunchdInstallResult + +runner = CliRunner() + +# Patch targets inside supervise_cmd module. +_INSTALL_PATCH = "bach.cli.supervise_cmd.install_sweep_agent" +_UNINSTALL_PATCH = "bach.cli.supervise_cmd.uninstall_sweep_agent" +_LOAD_CONFIG_PATCH = "bach.cli.supervise_cmd.load_config" + + +# --------------------------------------------------------------------------- +# Subcommand registration +# --------------------------------------------------------------------------- + + +def test_supervise_install_is_registered() -> None: + """bach supervise install must appear in --help output.""" + result = runner.invoke(app, ["supervise", "--help"]) + assert result.exit_code == 0, result.output + assert "install" in result.output + + +def test_supervise_uninstall_is_registered() -> None: + """bach supervise uninstall must appear in --help output.""" + result = runner.invoke(app, ["supervise", "--help"]) + assert result.exit_code == 0, result.output + assert "uninstall" in result.output + + +# --------------------------------------------------------------------------- +# `bach supervise install` — success path +# --------------------------------------------------------------------------- + + +def test_supervise_install_calls_service() -> None: + """supervise install must call install_sweep_agent once.""" + mock_install = MagicMock( + return_value=LaunchdInstallResult(installed=True, fallback_command=None) + ) + with patch(_INSTALL_PATCH, mock_install): + result = runner.invoke(app, ["supervise", "install"]) + + assert mock_install.call_count == 1 + assert result.exit_code == 0, result.output + + +def test_supervise_install_prints_success_message() -> None: + """On success, supervise install must print a success-indicating message.""" + mock_install = MagicMock( + return_value=LaunchdInstallResult(installed=True, fallback_command=None) + ) + with patch(_INSTALL_PATCH, mock_install): + result = runner.invoke(app, ["supervise", "install"]) + + assert any( + word in result.output.lower() + for word in ("installed", "success", "loaded", "install") + ), f"Expected success message, got: {result.output!r}" + + +# --------------------------------------------------------------------------- +# `bach supervise install` — fallback path (launchctl failure) +# --------------------------------------------------------------------------- + + +def test_supervise_install_prints_fallback_on_launchctl_failure() -> None: + """When launchctl fails, supervise install must print the manual fallback command.""" + mock_install = MagicMock( + return_value=LaunchdInstallResult( + installed=True, + fallback_command="bach supervise", + ) + ) + with patch(_INSTALL_PATCH, mock_install): + result = runner.invoke(app, ["supervise", "install"]) + + # The fallback command must appear in the output. + assert "bach supervise" in result.output, ( + f"Fallback command not printed. Output: {result.output!r}" + ) + + +def test_supervise_install_exit_code_nonzero_on_install_failure() -> None: + """When install_sweep_agent returns installed=False, exit with non-zero code.""" + mock_install = MagicMock( + return_value=LaunchdInstallResult( + installed=False, fallback_command="bach supervise" + ) + ) + with patch(_INSTALL_PATCH, mock_install): + result = runner.invoke(app, ["supervise", "install"]) + + assert result.exit_code != 0, ( + f"Expected non-zero exit on failure, got 0. Output: {result.output!r}" + ) + + +# --------------------------------------------------------------------------- +# `bach supervise uninstall` +# --------------------------------------------------------------------------- + + +def test_supervise_uninstall_calls_service() -> None: + """supervise uninstall must call uninstall_sweep_agent once.""" + mock_uninstall = MagicMock(return_value=None) + with patch(_UNINSTALL_PATCH, mock_uninstall): + result = runner.invoke(app, ["supervise", "uninstall"]) + + assert mock_uninstall.call_count == 1 + assert result.exit_code == 0, result.output + + +def test_supervise_uninstall_prints_confirmation() -> None: + """supervise uninstall must print a message confirming the action.""" + mock_uninstall = MagicMock(return_value=None) + with patch(_UNINSTALL_PATCH, mock_uninstall): + result = runner.invoke(app, ["supervise", "uninstall"]) + + assert any( + word in result.output.lower() + for word in ("uninstalled", "removed", "uninstall") + ), f"Expected uninstall message, got: {result.output!r}" + + +def test_supervise_uninstall_is_idempotent_from_cli() -> None: + """Calling supervise uninstall twice must succeed both times (exit 0).""" + mock_uninstall = MagicMock(return_value=None) + with patch(_UNINSTALL_PATCH, mock_uninstall): + r1 = runner.invoke(app, ["supervise", "uninstall"]) + r2 = runner.invoke(app, ["supervise", "uninstall"]) + + assert r1.exit_code == 0, r1.output + assert r2.exit_code == 0, r2.output diff --git a/tests/unit/test_supervise_sweeps_config.py b/tests/unit/test_supervise_sweeps_config.py new file mode 100644 index 0000000..bf3de2c --- /dev/null +++ b/tests/unit/test_supervise_sweeps_config.py @@ -0,0 +1,141 @@ +"""Tests for the supervise_sweeps_per_day config knob (issue #19). + +Verifies: + - Default value of 5. + - Loaded correctly from config.yaml (positive int). + - Invalid values (0, negative, bool, non-int) fall back to default. + - set_observer_key / OBSERVER_SETTABLE_KEYS supports 'supervise-sweeps-per-day'. + - _OBSERVER_KEY_TO_FIELD maps 'supervise-sweeps-per-day' -> 'supervise_sweeps_per_day'. +""" + +from pathlib import Path + +import pytest # noqa: F401 — used for pytest.raises + +from bach.config.settings import ( + _OBSERVER_KEY_TO_FIELD, + OBSERVER_SETTABLE_KEYS, + BachConfig, + load_config, + set_observer_key, +) + +# --------------------------------------------------------------------------- +# Default value +# --------------------------------------------------------------------------- + + +def test_supervise_sweeps_per_day_default() -> None: + """BachConfig default for supervise_sweeps_per_day must be 5.""" + cfg = BachConfig() + assert cfg.supervise_sweeps_per_day == 5 + + +# --------------------------------------------------------------------------- +# load_config — valid value +# --------------------------------------------------------------------------- + + +def test_load_config_reads_supervise_sweeps_per_day(tmp_path: Path) -> None: + """A well-formed config file with supervise_sweeps_per_day is respected.""" + config_path = tmp_path / "config.yaml" + config_path.write_text("supervise_sweeps_per_day: 3\n") + cfg = load_config(path=config_path) + assert cfg.supervise_sweeps_per_day == 3 + + +def test_load_config_supervise_sweeps_per_day_max_value(tmp_path: Path) -> None: + """Positive int is accepted.""" + config_path = tmp_path / "config.yaml" + config_path.write_text("supervise_sweeps_per_day: 10\n") + cfg = load_config(path=config_path) + assert cfg.supervise_sweeps_per_day == 10 + + +# --------------------------------------------------------------------------- +# load_config — invalid values fall back to 5 +# --------------------------------------------------------------------------- + + +def test_load_config_zero_sweeps_per_day_falls_back(tmp_path: Path) -> None: + """supervise_sweeps_per_day=0 must fall back to 5.""" + config_path = tmp_path / "config.yaml" + config_path.write_text("supervise_sweeps_per_day: 0\n") + cfg = load_config(path=config_path) + assert cfg.supervise_sweeps_per_day == 5 + + +def test_load_config_negative_sweeps_per_day_falls_back(tmp_path: Path) -> None: + """supervise_sweeps_per_day=-1 must fall back to 5.""" + config_path = tmp_path / "config.yaml" + config_path.write_text("supervise_sweeps_per_day: -1\n") + cfg = load_config(path=config_path) + assert cfg.supervise_sweeps_per_day == 5 + + +def test_load_config_bool_sweeps_per_day_falls_back(tmp_path: Path) -> None: + """supervise_sweeps_per_day: true must fall back to 5 (bool is not int here).""" + config_path = tmp_path / "config.yaml" + config_path.write_text("supervise_sweeps_per_day: true\n") + cfg = load_config(path=config_path) + assert cfg.supervise_sweeps_per_day == 5 + + +def test_load_config_string_sweeps_per_day_falls_back(tmp_path: Path) -> None: + """supervise_sweeps_per_day: 'five' must fall back to 5.""" + config_path = tmp_path / "config.yaml" + config_path.write_text("supervise_sweeps_per_day: 'five'\n") + cfg = load_config(path=config_path) + assert cfg.supervise_sweeps_per_day == 5 + + +# --------------------------------------------------------------------------- +# OBSERVER_SETTABLE_KEYS — the new knob is allowlisted +# --------------------------------------------------------------------------- + + +def test_supervise_sweeps_per_day_in_observer_settable_keys() -> None: + """'supervise-sweeps-per-day' must be a key in OBSERVER_SETTABLE_KEYS.""" + assert "supervise-sweeps-per-day" in OBSERVER_SETTABLE_KEYS + + +def test_observer_key_to_field_maps_correctly() -> None: + """'supervise-sweeps-per-day' must map to 'supervise_sweeps_per_day'.""" + assert _OBSERVER_KEY_TO_FIELD.get("supervise-sweeps-per-day") == "supervise_sweeps_per_day" + + +# --------------------------------------------------------------------------- +# set_observer_key — valid and invalid values +# --------------------------------------------------------------------------- + + +def test_set_observer_key_supervise_sweeps_per_day_valid(tmp_path: Path) -> None: + """set_observer_key with a positive int must not raise and persist the value.""" + # Patch the default config path to write into tmp_path. + import bach.config.settings as settings_mod + orig = settings_mod._default_config_path + settings_mod._default_config_path = lambda: tmp_path / "config.yaml" + try: + set_observer_key("supervise-sweeps-per-day", "4") + cfg = load_config(path=tmp_path / "config.yaml") + assert cfg.supervise_sweeps_per_day == 4 + finally: + settings_mod._default_config_path = orig + + +def test_set_observer_key_supervise_sweeps_per_day_zero_raises() -> None: + """set_observer_key with 0 must raise ValueError.""" + with pytest.raises(ValueError): + set_observer_key("supervise-sweeps-per-day", "0") + + +def test_set_observer_key_supervise_sweeps_per_day_negative_raises() -> None: + """set_observer_key with negative value must raise ValueError.""" + with pytest.raises(ValueError): + set_observer_key("supervise-sweeps-per-day", "-1") + + +def test_set_observer_key_supervise_sweeps_per_day_non_int_raises() -> None: + """set_observer_key with 'abc' must raise ValueError.""" + with pytest.raises(ValueError): + set_observer_key("supervise-sweeps-per-day", "abc") diff --git a/tests/unit/test_sweep_launchd_service.py b/tests/unit/test_sweep_launchd_service.py new file mode 100644 index 0000000..1cd2084 --- /dev/null +++ b/tests/unit/test_sweep_launchd_service.py @@ -0,0 +1,636 @@ +"""Tests for the sweep agent (calendar-scheduled launchd) in launchd_service.py. + +Tests issue #19: `render_sweep_agent`, `install_sweep_agent`, `uninstall_sweep_agent`, +and `status_sweep_agent`. + +Strategy (mirrors test_launchd_service.py): + - `render_sweep_agent` is a pure function — assert plist content directly. + - `install_sweep_agent` / `uninstall_sweep_agent` accept an injected `runner` + and `launch_agents_dir` so tests NEVER call real launchctl or touch + ~/Library/LaunchAgents. + - Plist-file writes are directed at tmp_path via an injected `launch_agents_dir`. + - Idempotency: install/uninstall twice must not produce errors or duplicate state. + - Fallback: when the launchctl runner returns failure, install must surface the + manual `bach supervise` fallback command (returned in the result). + - The sweep agent label (com.bach.supervise) is DISTINCT from the daemon's + (com.bach.daemon) — installing one must never touch the other. +""" + +import plistlib +import sys +from pathlib import Path +from typing import Any + +from bach.services.launchd_service import ( + LAUNCH_AGENT_LABEL, + SUPERVISE_AGENT_LABEL, + LaunchdInstallResult, + install_sweep_agent, + render_sweep_agent, + status_sweep_agent, + uninstall_sweep_agent, +) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# ProgramArguments tail: must invoke `bach supervise` via -m bach supervise. +_EXPECTED_ARGS_TAIL = ["-m", "bach", "supervise"] + +# A fake runner that always succeeds (exit-code 0, empty output). +_OK_RUNNER = lambda argv: (0, "") # noqa: E731 + +# A fake runner that always fails (exit-code 1). +_FAIL_RUNNER = lambda argv: (1, "service not found") # noqa: E731 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _parse_plist(xml: str) -> dict[str, Any]: + """Parse plist XML string into a Python dict.""" + result: dict[str, Any] = plistlib.loads(xml.encode()) + return result + + +# --------------------------------------------------------------------------- +# SUPERVISE_AGENT_LABEL constant — distinct from daemon +# --------------------------------------------------------------------------- + + +def test_supervise_label_is_distinct_from_daemon_label() -> None: + """The sweep agent label must differ from the daemon's label.""" + assert SUPERVISE_AGENT_LABEL != LAUNCH_AGENT_LABEL, ( + "SUPERVISE_AGENT_LABEL must be distinct from LAUNCH_AGENT_LABEL" + ) + + +def test_supervise_label_is_reverse_dns() -> None: + """The sweep agent label must be a reverse-DNS string starting with 'com.'.""" + assert SUPERVISE_AGENT_LABEL.startswith("com."), ( + f"SUPERVISE_AGENT_LABEL {SUPERVISE_AGENT_LABEL!r} must start with 'com.'" + ) + + +def test_supervise_label_value() -> None: + """The sweep agent label must be 'com.bach.supervise'.""" + assert SUPERVISE_AGENT_LABEL == "com.bach.supervise" + + +# --------------------------------------------------------------------------- +# render_sweep_agent — basic plist validity +# --------------------------------------------------------------------------- + + +def test_render_sweep_agent_returns_string() -> None: + """render_sweep_agent must return a non-empty string.""" + xml = render_sweep_agent( + sweeps_per_day=4, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + assert isinstance(xml, str) + assert len(xml) > 0 + + +def test_render_sweep_agent_is_valid_plist_xml() -> None: + """The rendered output must be valid plist XML parseable by plistlib.""" + xml = render_sweep_agent( + sweeps_per_day=4, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + assert isinstance(data, dict) + + +def test_render_sweep_agent_has_supervise_label() -> None: + """The plist Label must equal SUPERVISE_AGENT_LABEL.""" + xml = render_sweep_agent( + sweeps_per_day=2, + window_start_hour=9, + window_end_hour=17, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + assert data["Label"] == SUPERVISE_AGENT_LABEL + + +# --------------------------------------------------------------------------- +# render_sweep_agent — ProgramArguments invoke `bach supervise` +# --------------------------------------------------------------------------- + + +def test_render_sweep_agent_program_arguments_starts_with_executable() -> None: + """ProgramArguments must start with sys.executable.""" + xml = render_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + args = data["ProgramArguments"] + assert isinstance(args, list) + assert args[0] == sys.executable, ( + f"First arg must be sys.executable, got: {args[0]!r}" + ) + + +def test_render_sweep_agent_program_arguments_tail_is_bach_supervise() -> None: + """ProgramArguments tail must be ['-m', 'bach', 'supervise'].""" + xml = render_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + args = data["ProgramArguments"] + assert args[-3:] == _EXPECTED_ARGS_TAIL, ( + f"Expected tail {_EXPECTED_ARGS_TAIL!r}, got: {args!r}" + ) + + +# --------------------------------------------------------------------------- +# render_sweep_agent — no KeepAlive / RunAtLoad (calendar agent, not daemon) +# --------------------------------------------------------------------------- + + +def test_render_sweep_agent_has_no_keep_alive() -> None: + """Calendar agents must NOT have KeepAlive (only the daemon uses KeepAlive).""" + xml = render_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + # KeepAlive must NOT be set or must be False for a calendar agent. + assert not data.get("KeepAlive", False), ( + "Sweep agent must NOT have KeepAlive=True (only daemon uses KeepAlive)" + ) + + +# --------------------------------------------------------------------------- +# render_sweep_agent — StartCalendarInterval entry count and hours +# --------------------------------------------------------------------------- + + +def test_render_sweep_agent_calendar_interval_is_a_list() -> None: + """StartCalendarInterval must be a list (array of dicts).""" + xml = render_sweep_agent( + sweeps_per_day=4, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + intervals = data.get("StartCalendarInterval") + assert isinstance(intervals, list), ( + f"StartCalendarInterval must be a list, got {type(intervals)}" + ) + + +def test_render_sweep_agent_calendar_interval_count_matches_sweeps_per_day() -> None: + """StartCalendarInterval must have exactly sweeps_per_day entries.""" + for n in (1, 3, 5): + xml = render_sweep_agent( + sweeps_per_day=n, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + intervals = data["StartCalendarInterval"] + assert len(intervals) == n, ( + f"sweeps_per_day={n}: expected {n} intervals, got {len(intervals)}" + ) + + +def test_render_sweep_agent_n1_gives_one_entry_at_window_start() -> None: + """N=1 sweep → single entry at window_start_hour, Minute=0.""" + xml = render_sweep_agent( + sweeps_per_day=1, + window_start_hour=9, + window_end_hour=17, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + intervals = data["StartCalendarInterval"] + assert len(intervals) == 1 + assert intervals[0]["Hour"] == 9 + assert intervals[0]["Minute"] == 0 + + +def test_render_sweep_agent_n2_window_8_22_correct_hours() -> None: + """N=2 in window [8,22): entries at hours 8 and 15 (step = 14/2 = 7). + + Window span = 22 - 8 = 14 hours. + Step = 14 // 2 = 7. Entries: 8, 8+7=15. + """ + xml = render_sweep_agent( + sweeps_per_day=2, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + intervals = data["StartCalendarInterval"] + assert len(intervals) == 2 + hours = [e["Hour"] for e in intervals] + assert hours == [8, 15], f"Expected [8, 15], got {hours}" + + +def test_render_sweep_agent_n3_window_9_21_correct_hours() -> None: + """N=3 in window [9,21): entries at hours 9, 13, 17 (step = 12//3 = 4). + + Window span = 21 - 9 = 12 hours. + Step = 12 // 3 = 4. Entries: 9, 9+4=13, 9+8=17. + """ + xml = render_sweep_agent( + sweeps_per_day=3, + window_start_hour=9, + window_end_hour=21, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + intervals = data["StartCalendarInterval"] + assert len(intervals) == 3 + hours = [e["Hour"] for e in intervals] + assert hours == [9, 13, 17], f"Expected [9, 13, 17], got {hours}" + + +def test_render_sweep_agent_entries_are_hour_minute_dicts() -> None: + """Each StartCalendarInterval entry must be a dict with 'Hour' and 'Minute' keys.""" + xml = render_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + for entry in data["StartCalendarInterval"]: + assert "Hour" in entry, f"Entry missing 'Hour': {entry}" + assert "Minute" in entry, f"Entry missing 'Minute': {entry}" + assert entry["Minute"] == 0, f"Expected Minute=0, got {entry['Minute']}" + assert 0 <= entry["Hour"] <= 23, f"Hour {entry['Hour']} out of [0,23]" + + +def test_render_sweep_agent_all_entries_within_window() -> None: + """All entry hours must satisfy window_start_hour <= hour < window_end_hour.""" + xml = render_sweep_agent( + sweeps_per_day=5, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + for entry in data["StartCalendarInterval"]: + h = entry["Hour"] + assert 8 <= h < 22, f"Hour {h} outside window [8,22)" + + +def test_render_sweep_agent_n5_window_8_22_well_spread_hours() -> None: + """N=5 in window [8,22): entries must span the FULL window, not cluster early. + + Corrected uniform-distribution formula: + entry[i] = round(8 + i * 14 / 5) + i=0: round(8.0) = 8 + i=1: round(10.8) = 11 + i=2: round(13.6) = 14 + i=3: round(16.4) = 16 + i=4: round(19.2) = 19 + + The old floor-division formula (step=14//5=2) produced [8,10,12,14,16] + leaving 16:00–22:00 with no coverage. The corrected formula reaches 19:00. + """ + xml = render_sweep_agent( + sweeps_per_day=5, + window_start_hour=8, + window_end_hour=22, + log_dir=Path("/tmp/logs"), + ) + data = _parse_plist(xml) + intervals = data["StartCalendarInterval"] + assert len(intervals) == 5 + hours = [e["Hour"] for e in intervals] + assert hours == [8, 11, 14, 16, 19], ( + f"Expected well-spread [8, 11, 14, 16, 19], got {hours}. " + "Hours must span the full [8,22) window, not cluster early." + ) + + +# --------------------------------------------------------------------------- +# render_sweep_agent — log paths +# --------------------------------------------------------------------------- + + +def test_render_sweep_agent_has_stdout_path() -> None: + """StandardOutPath must point under the given log_dir.""" + log_dir = Path("/tmp/bach/logs") + xml = render_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + log_dir=log_dir, + ) + data = _parse_plist(xml) + stdout = data.get("StandardOutPath", "") + assert str(log_dir) in stdout + + +def test_render_sweep_agent_has_stderr_path() -> None: + """StandardErrorPath must point under the given log_dir.""" + log_dir = Path("/tmp/bach/logs") + xml = render_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + log_dir=log_dir, + ) + data = _parse_plist(xml) + stderr = data.get("StandardErrorPath", "") + assert str(log_dir) in stderr + + +# --------------------------------------------------------------------------- +# install_sweep_agent — happy path +# --------------------------------------------------------------------------- + + +def test_install_sweep_agent_creates_plist_file(tmp_path: Path) -> None: + """install must write a plist at launch_agents_dir/com.bach.supervise.plist.""" + result = install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + plist_path = tmp_path / f"{SUPERVISE_AGENT_LABEL}.plist" + assert plist_path.exists(), "Plist file was not created" + assert result.installed is True + + +def test_install_sweep_agent_plist_content_is_valid(tmp_path: Path) -> None: + """The plist written must be valid and contain the sweep agent label.""" + install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + plist_path = tmp_path / f"{SUPERVISE_AGENT_LABEL}.plist" + data = plistlib.loads(plist_path.read_bytes()) + assert data["Label"] == SUPERVISE_AGENT_LABEL + assert isinstance(data.get("StartCalendarInterval"), list) + assert len(data["StartCalendarInterval"]) == 3 + + +def test_install_sweep_agent_calls_launchctl(tmp_path: Path) -> None: + """install must invoke the runner (launchctl load).""" + calls: list[list[str]] = [] + + def capturing_runner(argv: list[str]) -> tuple[int, str]: + calls.append(argv) + return 0, "" + + install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=capturing_runner, + ) + assert len(calls) >= 1, "launchctl was never called" + plist_path = str(tmp_path / f"{SUPERVISE_AGENT_LABEL}.plist") + assert any(plist_path in " ".join(call) for call in calls), ( + f"No launchctl call referenced the plist path. Calls: {calls}" + ) + + +def test_install_sweep_agent_no_fallback_on_success(tmp_path: Path) -> None: + """On success, fallback_command must be None.""" + result = install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + assert result.fallback_command is None + + +# --------------------------------------------------------------------------- +# install_sweep_agent — idempotency +# --------------------------------------------------------------------------- + + +def test_install_sweep_agent_is_idempotent(tmp_path: Path) -> None: + """Calling install twice must succeed without errors.""" + # Call with explicit args (avoids **dict[str, object] mypy inference issue). + def _do_install() -> LaunchdInstallResult: + return install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + + result1 = _do_install() + result2 = _do_install() + assert result1.installed is True + assert result2.installed is True + plist_path = tmp_path / f"{SUPERVISE_AGENT_LABEL}.plist" + assert plist_path.exists() + + +# --------------------------------------------------------------------------- +# install_sweep_agent — launchctl failure / fallback +# --------------------------------------------------------------------------- + + +def test_install_sweep_agent_returns_fallback_on_launchctl_failure(tmp_path: Path) -> None: + """When launchctl fails, result.fallback_command must be set.""" + result = install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_FAIL_RUNNER, + ) + assert result.fallback_command is not None, ( + "fallback_command must be set when launchctl fails" + ) + + +def test_install_sweep_agent_fallback_contains_bach_supervise(tmp_path: Path) -> None: + """The fallback command must contain 'bach supervise'.""" + result = install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_FAIL_RUNNER, + ) + assert result.fallback_command is not None + assert "bach supervise" in result.fallback_command, ( + f"fallback_command {result.fallback_command!r} must contain 'bach supervise'" + ) + + +def test_install_sweep_agent_does_not_raise_on_launchctl_failure(tmp_path: Path) -> None: + """install must NOT raise even when launchctl fails — return result instead.""" + result = install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_FAIL_RUNNER, + ) + assert isinstance(result, LaunchdInstallResult) + + +# --------------------------------------------------------------------------- +# install_sweep_agent — does NOT touch daemon plist +# --------------------------------------------------------------------------- + + +def test_install_sweep_agent_does_not_create_daemon_plist(tmp_path: Path) -> None: + """Installing the sweep agent must NOT write the daemon plist.""" + install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + daemon_plist = tmp_path / f"{LAUNCH_AGENT_LABEL}.plist" + assert not daemon_plist.exists(), ( + "install_sweep_agent must NOT create the daemon plist" + ) + + +# --------------------------------------------------------------------------- +# uninstall_sweep_agent — happy path +# --------------------------------------------------------------------------- + + +def test_uninstall_sweep_agent_removes_plist_file(tmp_path: Path) -> None: + """uninstall must delete the plist file if it exists.""" + install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + plist_path = tmp_path / f"{SUPERVISE_AGENT_LABEL}.plist" + assert plist_path.exists() + + uninstall_sweep_agent(launch_agents_dir=tmp_path, runner=_OK_RUNNER) + assert not plist_path.exists(), "Plist file was not removed by uninstall" + + +def test_uninstall_sweep_agent_calls_launchctl(tmp_path: Path) -> None: + """uninstall must invoke the runner (launchctl unload).""" + install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + calls: list[list[str]] = [] + + def capturing_runner(argv: list[str]) -> tuple[int, str]: + calls.append(argv) + return 0, "" + + uninstall_sweep_agent(launch_agents_dir=tmp_path, runner=capturing_runner) + assert len(calls) >= 1, "launchctl was never called during uninstall" + + +def test_uninstall_sweep_agent_idempotent_when_absent(tmp_path: Path) -> None: + """Calling uninstall when no plist exists must not raise (idempotent).""" + uninstall_sweep_agent(launch_agents_dir=tmp_path, runner=_OK_RUNNER) + uninstall_sweep_agent(launch_agents_dir=tmp_path, runner=_OK_RUNNER) + + +def test_uninstall_sweep_agent_idempotent_after_install_uninstall(tmp_path: Path) -> None: + """install → uninstall → uninstall must not raise on the second uninstall.""" + install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + uninstall_sweep_agent(launch_agents_dir=tmp_path, runner=_OK_RUNNER) + uninstall_sweep_agent(launch_agents_dir=tmp_path, runner=_OK_RUNNER) + + +# --------------------------------------------------------------------------- +# status_sweep_agent +# --------------------------------------------------------------------------- + + +def test_status_sweep_agent_returns_dict(tmp_path: Path) -> None: + """status_sweep_agent must return a dict.""" + result = status_sweep_agent(launch_agents_dir=tmp_path) + assert isinstance(result, dict) + + +def test_status_sweep_agent_not_installed_when_no_plist(tmp_path: Path) -> None: + """Before install, installed must be False.""" + result = status_sweep_agent(launch_agents_dir=tmp_path) + assert result.get("installed") is False + + +def test_status_sweep_agent_installed_after_install(tmp_path: Path) -> None: + """After install, installed must be True.""" + install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + result = status_sweep_agent(launch_agents_dir=tmp_path) + assert result.get("installed") is True + + +def test_status_sweep_agent_not_installed_after_uninstall(tmp_path: Path) -> None: + """After uninstall, installed must go back to False.""" + install_sweep_agent( + sweeps_per_day=3, + window_start_hour=8, + window_end_hour=22, + launch_agents_dir=tmp_path, + log_dir=tmp_path / "logs", + runner=_OK_RUNNER, + ) + uninstall_sweep_agent(launch_agents_dir=tmp_path, runner=_OK_RUNNER) + result = status_sweep_agent(launch_agents_dir=tmp_path) + assert result.get("installed") is False