diff --git a/src/bcli/batch/__init__.py b/src/bcli/batch/__init__.py new file mode 100644 index 0000000..11084aa --- /dev/null +++ b/src/bcli/batch/__init__.py @@ -0,0 +1,9 @@ +"""Batch operation ledger (Phase 3 of AIP v0.1). + +A durable SQLite ledger that records every batch run's intent + outcome +per step. Survives ``SIGKILL`` because the intent row is written +*before* the HTTP call, and ``PRAGMA synchronous=NORMAL`` keeps WAL +honest at commit time. +""" + +from bcli.batch.ledger import Ledger, RunLedgerExistsError # noqa: F401 diff --git a/src/bcli/batch/ledger.py b/src/bcli/batch/ledger.py new file mode 100644 index 0000000..4975a9b --- /dev/null +++ b/src/bcli/batch/ledger.py @@ -0,0 +1,503 @@ +"""SQLite-backed operation ledger for ``bcli batch run``. + +Why this exists +--------------- +Stdout-only observation cannot distinguish "POST committed, stdout died" +from "nothing started." We need durable, per-step state the runtime can +query after the process dies — and a coherent answer to "what's the +overall state of this run?" derived from the steps. + +Durability contract +------------------- +- One SQLite file per run: ``/.db``. +- ``PRAGMA journal_mode=WAL`` + ``synchronous=NORMAL`` so an intent row + hits disk before the HTTP call is dispatched. +- Autocommit mode (``isolation_level=None``). Each ``INSERT`` / + ``UPDATE`` is its own transaction, so a SIGKILL between two + statements never leaves a half-written row. +- The connection is kept open for the life of the ``Ledger`` instance to + avoid pathological lock churn on Windows-style filesystems. + +State derivation +---------------- +``run.state`` is *stamped* on lifecycle calls (``start_run`` → +``"running"``; ``finish_run`` → whatever the caller declared). But if +the process is SIGKILLed before ``finish_run`` runs, the stamp lies. +``compute_run_state(run_id)`` is the authoritative read-side derivation +from the step table: + + all committed → "completed" + any committed + any without outcome → "partially_committed" + any committed + any failed → "partially_committed" + only failed → "failed" + only intent (no outcomes) → "running" (process still going, + or died before any + outcome — caller + must reconcile with + finished_at) + +``list_runs`` and ``batch state`` both call this on read so the operator +always sees the truthful state, not a stale stamp. + +Schema (version 1) +------------------ +:: + + CREATE TABLE run ( + run_id TEXT PRIMARY KEY, + manifest_path TEXT, + manifest_hash TEXT, + profile TEXT, environment TEXT, company TEXT, + state TEXT CHECK (state IN ( + 'planned','running','partially_committed','completed', + 'failed','cancelled','rolled_back' + )), + started_at TEXT, finished_at TEXT + ); + CREATE TABLE step ( + step_id INTEGER PRIMARY KEY, + run_id TEXT REFERENCES run(run_id), + seq INTEGER, + intent_ts TEXT, method TEXT, url TEXT, body_hash TEXT, + outcome_ts TEXT, + status TEXT, -- "committed" | "failed" | "rollback_skipped" | "rolled_back" | "unknown" + bc_correlation_id TEXT, + error_message TEXT, + rollback_url TEXT + ); + CREATE TABLE schema_version (version INTEGER); + +The step ``status`` column is intentionally not constrained — rollback +introduces transient states (e.g. ``rollback_skipped``) we don't want to +keep adding to a CHECK enum. +""" + +from __future__ import annotations + +import datetime as _dt +import sqlite3 +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +SCHEMA_VERSION = 1 + +# Allowed values for ``run.state``. Mirrors the contract doc §Phase 3 +# enum. Step-level statuses are deliberately *not* enforced — they +# evolve faster than the run-level enum and have a wider vocabulary +# (``committed``, ``failed``, ``unknown``, ``rollback_skipped``, +# ``rolled_back`` …). +_RUN_STATE_ENUM = ( + "planned", + "running", + "partially_committed", + "completed", + "failed", + "cancelled", + "rolled_back", +) + + +class RunLedgerExistsError(RuntimeError): + """Raised when ``start_run`` is called on a run-id that already has a row. + + Defensive: the CLI generates run ids via ``uuid4()`` so natural + collisions are impossible. This guards against a programmer mistake + (re-running ``start_run`` on the same Ledger) corrupting the + sequence of timestamps. + """ + + +# ─── Public dataclasses ────────────────────────────────────────────── + + +@dataclass(frozen=True) +class StepRow: + """A row from the ``step`` table — exposed for ``rollback`` consumers.""" + + step_id: int + run_id: str + seq: int + intent_ts: str + method: str + url: str + body_hash: str | None + outcome_ts: str | None + status: str | None + bc_correlation_id: str | None + error_message: str | None + rollback_url: str | None + + +# ─── Path helpers ──────────────────────────────────────────────────── + + +def _default_base_dir() -> Path: + """Where ledger files live by default. + + Resolved lazily (via a function call) rather than captured at import + time so tests can monkeypatch ``Path.home`` without surgery. + """ + return Path.home() / ".config" / "bcli" / "batch" + + +# ─── The Ledger ────────────────────────────────────────────────────── + + +class Ledger: + """A single-run SQLite ledger. + + Construction does *not* create the DB — that happens on + ``start_run``. The split lets callers pass a Ledger handle into + code paths that read existing runs without inadvertently creating + files. + """ + + def __init__(self, run_id: str, *, base_dir: Path | None = None) -> None: + self.run_id = run_id + self.base_dir = Path(base_dir) if base_dir is not None else _default_base_dir() + self._conn: sqlite3.Connection | None = None + + # ── Connection lifecycle ──────────────────────────────────────── + + @property + def db_path(self) -> Path: + return self.base_dir / f"{self.run_id}.db" + + def _connect(self) -> sqlite3.Connection: + """Open (or reuse) the SQLite connection with durability pragmas.""" + if self._conn is not None: + return self._conn + + self.base_dir.mkdir(parents=True, exist_ok=True) + # isolation_level=None ⇒ autocommit. Each INSERT/UPDATE is its + # own transaction; combined with WAL + synchronous=NORMAL this + # gives us "the intent row landed on disk before HTTP fires." + conn = sqlite3.connect(self.db_path, isolation_level=None) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + conn.execute("PRAGMA foreign_keys=ON") + self._ensure_schema(conn) + self._conn = conn + return conn + + def close(self) -> None: + if self._conn is not None: + self._conn.close() + self._conn = None + + def __enter__(self) -> "Ledger": + self._connect() + return self + + def __exit__(self, *_exc) -> None: + self.close() + + # ── Schema setup ──────────────────────────────────────────────── + + @staticmethod + def _ensure_schema(conn: sqlite3.Connection) -> None: + # IMPORTANT: the CHECK enum below is the source of truth for + # ``run.state`` values. Adding a new value here is a schema + # change; bump ``SCHEMA_VERSION`` and add a migration. + conn.execute( + f""" + CREATE TABLE IF NOT EXISTS run ( + run_id TEXT PRIMARY KEY, + manifest_path TEXT, + manifest_hash TEXT, + profile TEXT, + environment TEXT, + company TEXT, + state TEXT CHECK (state IN ( + {",".join(f"'{s}'" for s in _RUN_STATE_ENUM)} + )), + started_at TEXT, + finished_at TEXT + ) + """, + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS step ( + step_id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id TEXT NOT NULL REFERENCES run(run_id), + seq INTEGER NOT NULL, + intent_ts TEXT NOT NULL, + method TEXT NOT NULL, + url TEXT NOT NULL, + body_hash TEXT, + outcome_ts TEXT, + status TEXT, + bc_correlation_id TEXT, + error_message TEXT, + rollback_url TEXT + ) + """, + ) + conn.execute( + "CREATE TABLE IF NOT EXISTS schema_version (version INTEGER PRIMARY KEY)" + ) + # Idempotent: only insert if empty. + existing = conn.execute("SELECT version FROM schema_version").fetchone() + if existing is None: + conn.execute( + "INSERT INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,) + ) + + # ── Lifecycle ─────────────────────────────────────────────────── + + def start_run( + self, + *, + manifest_path: str, + manifest_hash: str, + profile: str, + environment: str, + company: str, + ) -> None: + """Insert the ``run`` row in state ``"running"``. + + Raises ``RunLedgerExistsError`` if a row already exists for this + ``run_id`` — see class docstring. + """ + conn = self._connect() + existing = conn.execute( + "SELECT 1 FROM run WHERE run_id = ?", (self.run_id,) + ).fetchone() + if existing is not None: + raise RunLedgerExistsError( + f"Ledger row for run_id={self.run_id!r} already exists. " + "Use a fresh run-id." + ) + conn.execute( + """ + INSERT INTO run ( + run_id, manifest_path, manifest_hash, + profile, environment, company, + state, started_at + ) VALUES (?, ?, ?, ?, ?, ?, 'running', ?) + """, + ( + self.run_id, manifest_path, manifest_hash, + profile, environment, company, _utc_now_iso(), + ), + ) + + def finish_run(self, run_id: str, final_state: str) -> None: + """Stamp the final state + ``finished_at`` for this run. + + ``final_state`` must be a member of the CHECK enum; SQLite will + reject anything else with an ``IntegrityError`` which we let + propagate (callers are the runtime, not end users). + """ + conn = self._connect() + conn.execute( + "UPDATE run SET state = ?, finished_at = ? WHERE run_id = ?", + (final_state, _utc_now_iso(), run_id), + ) + + # ── Step writes ───────────────────────────────────────────────── + + def write_intent( + self, + *, + seq: int, + method: str, + url: str, + body_hash: str | None, + ) -> int: + """Insert the *intent* row for a step and return its ``step_id``. + + This is the row that survives a SIGKILL — we know we *tried* the + HTTP call. The matching ``write_outcome`` flips the state once + the response (or an exception) comes back. + """ + conn = self._connect() + cur = conn.execute( + """ + INSERT INTO step ( + run_id, seq, intent_ts, method, url, body_hash + ) VALUES (?, ?, ?, ?, ?, ?) + """, + (self.run_id, seq, _utc_now_iso(), method, url, body_hash), + ) + return int(cur.lastrowid) + + def write_outcome( + self, + *, + step_id: int, + status: str, + bc_correlation_id: str | None, + error_message: str | None, + rollback_url: str | None, + ) -> None: + """Stamp the outcome row for a step. + + ``status`` is the step-level status (free text — see module + docstring for the vocabulary). ``rollback_url`` is the + precomposed inverse-op target for the rollback command; + callers responsible for building it on POST success. + """ + conn = self._connect() + conn.execute( + """ + UPDATE step + SET outcome_ts = ?, + status = ?, + bc_correlation_id = ?, + error_message = ?, + rollback_url = ? + WHERE step_id = ? + """, + ( + _utc_now_iso(), status, bc_correlation_id, + error_message, rollback_url, step_id, + ), + ) + + def update_step_status(self, step_id: int, status: str) -> None: + """Used by rollback to flip a step to ``rolled_back`` / + ``rollback_skipped`` after the inverse op fires.""" + conn = self._connect() + conn.execute( + "UPDATE step SET status = ? WHERE step_id = ?", + (status, step_id), + ) + + # ── Reads ─────────────────────────────────────────────────────── + + def get_run(self, run_id: str) -> dict[str, Any]: + conn = self._connect() + row = conn.execute( + "SELECT * FROM run WHERE run_id = ?", (run_id,) + ).fetchone() + if row is None: + raise LookupError(f"No run row for run_id={run_id!r}") + return dict(row) + + def get_steps(self, run_id: str) -> list[dict[str, Any]]: + conn = self._connect() + rows = conn.execute( + "SELECT * FROM step WHERE run_id = ? ORDER BY seq", (run_id,), + ).fetchall() + return [dict(r) for r in rows] + + # ── Derived state ─────────────────────────────────────────────── + + def compute_run_state(self, run_id: str) -> str: + """Derive the *real* state of a run from its step rows. + + Stamped ``run.state`` is just a hint — it goes stale the moment + the process dies between ``write_outcome`` and ``finish_run``. + This method is the source of truth for the read side. + """ + conn = self._connect() + rows = conn.execute( + "SELECT status, outcome_ts FROM step WHERE run_id = ?", (run_id,), + ).fetchall() + run_row = conn.execute( + "SELECT state, finished_at FROM run WHERE run_id = ?", (run_id,), + ).fetchone() + stamped = run_row["state"] if run_row is not None else None + is_finished = bool(run_row and run_row["finished_at"]) + + # Once ``finish_run`` has stamped a terminal state, that stamp + # is authoritative. The derivation path below is reserved for + # *unfinished* runs (the SIGKILL recovery case): a process that + # died never called ``finish_run`` so ``finished_at`` is NULL. + if is_finished and stamped is not None: + return stamped + + if not rows: + # No steps yet, no finish stamp — process is still ramping + # up (or died before any step). + return "running" + + committed = [r for r in rows if r["status"] == "committed"] + failed = [r for r in rows if r["status"] == "failed"] + unknown = [r for r in rows if r["outcome_ts"] is None] + rolled = [r for r in rows if r["status"] in {"rolled_back", "rollback_skipped"}] + + if rolled and not committed and not failed and not unknown: + return "rolled_back" + + if committed and (unknown or failed): + return "partially_committed" + + if committed and not unknown and not failed: + return "completed" + + if failed and not committed and not unknown: + return "failed" + + # Only intent-only rows (the process is mid-run or died before any + # response came back). Treat as "running" — operator can decide. + return "running" + + # ── Class-level scanner ───────────────────────────────────────── + + @classmethod + def list_runs( + cls, + *, + base_dir: Path | None = None, + state: str | None = None, + limit: int = 50, + ) -> list[dict[str, Any]]: + """Scan all ``*.db`` files under ``base_dir`` and return a + summary row per run, most-recent-first. + + The returned ``state`` is the *derived* state — not the stamp — + so a SIGKILLed run shows up as ``partially_committed`` instead + of the stale ``running``. + """ + d = Path(base_dir) if base_dir is not None else _default_base_dir() + if not d.exists(): + return [] + + rows: list[dict[str, Any]] = [] + for db_file in d.glob("*.db"): + try: + ledger = cls(run_id=db_file.stem, base_dir=d) + run = ledger.get_run(db_file.stem) + step_count = len(ledger.get_steps(db_file.stem)) + derived = ledger.compute_run_state(db_file.stem) + rows.append( + { + "run_id": run["run_id"], + "started_at": run["started_at"], + "finished_at": run["finished_at"], + "profile": run["profile"], + "environment": run["environment"], + "company": run["company"], + "manifest_path": run["manifest_path"], + "state": derived, + "step_count": step_count, + } + ) + ledger.close() + except (sqlite3.DatabaseError, LookupError): + # Skip malformed / unrelated .db files in the dir. + continue + + if state is not None: + rows = [r for r in rows if r["state"] == state] + rows.sort(key=lambda r: r["started_at"] or "", reverse=True) + return rows[:limit] + + +# ─── Helpers ───────────────────────────────────────────────────────── + + +def _utc_now_iso() -> str: + """UTC timestamp formatted as ISO 8601 with second precision. + + Microsecond precision in tests is fine but creates noisy diffs when + a human reads the ledger; we keep the historic ``isoformat()`` + output but strip the trailing ``+00:00`` and add ``Z`` to match the + rest of bcli's audit/telemetry formats. + """ + return _dt.datetime.now(_dt.timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") diff --git a/src/bcli/client/_async.py b/src/bcli/client/_async.py index 46c0178..db93d52 100644 --- a/src/bcli/client/_async.py +++ b/src/bcli/client/_async.py @@ -211,6 +211,18 @@ async def delete( ) return await transport.delete(url, etag=etag) + async def delete_url(self, url: str, *, etag: str = "*") -> dict[str, Any]: + """DELETE an already-resolved absolute URL. + + Used by the batch ledger's rollback path, where the inverse-op + URL was composed at POST-capture time and stored verbatim. The + registry would not be re-consulted at rollback (the original + endpoint may have moved between runs); we trust the captured + URL. + """ + transport = self._ensure_transport() + return await transport.delete(url, etag=etag) + async def upload_attachment( self, parent_type: str, diff --git a/src/bcli_cli/commands/batch_cmd.py b/src/bcli_cli/commands/batch_cmd.py index 77ad3e9..6c196b4 100644 --- a/src/bcli_cli/commands/batch_cmd.py +++ b/src/bcli_cli/commands/batch_cmd.py @@ -1,20 +1,39 @@ -"""bcli batch — execute YAML batch files with optional workflow features.""" +"""bcli batch — execute YAML batch files with optional workflow features. + +This module also hosts the durable operation ledger introduced in AIP +v0.1 Phase 3. Every batch run writes: + + * a ``run`` row at start_run time, with manifest hash + profile/env/company; + * an *intent* ``step`` row before each HTTP call (survives SIGKILL); + * an *outcome* ``step`` row after the call returns (or raises). + +``bcli batch state ``, ``bcli batch list``, and ``bcli batch +rollback `` read that ledger. +""" from __future__ import annotations import asyncio +import hashlib import json +import uuid from pathlib import Path from typing import Any import typer from rich.console import Console +from rich.table import Table +from bcli.batch.ledger import Ledger from bcli_cli._state import state from bcli_cli.output import format_output, print_context_banner app = typer.Typer(no_args_is_help=True) console = Console() +# Stderr console for AIP metadata (ledger path, run id) — keeps stdout +# clean for downstream parsers / pipelines as required by the +# "behavior unchanged by default" constraint. +_stderr = Console(stderr=True) # ─── Helpers ───────────────────────────────────────────────────────── @@ -125,7 +144,69 @@ def _validate_step_names(steps: list[dict]) -> None: seen.add(name) -# ─── Command ───────────────────────────────────────────────────────── +def _close_if_coroutine(obj: Any) -> None: + """Close a stray coroutine to suppress the ResourceWarning that fires + when an ``AsyncMock``'s synchronously-accessed attribute returns a + coroutine the production code never awaits. No-op for non-coroutines. + """ + if hasattr(obj, "close") and callable(obj.close): + # ``inspect.iscoroutine`` is stricter than necessary — duck-type + # on ``send`` / ``close`` to also cover generator-coroutines. + if hasattr(obj, "send") or type(obj).__name__ in {"coroutine"}: + try: + obj.close() + except Exception: + pass + + +def _body_hash(body: Any) -> str | None: + """sha256 of a JSON-serialised body, or None if there's no body. + + Hashing instead of storing the body keeps the ledger compact and + avoids accidentally durably persisting secrets that find their way + into request bodies. Reviewers can confirm "this is what we sent" + by hashing the YAML themselves. + """ + if body is None: + return None + try: + encoded = json.dumps(body, sort_keys=True, default=str).encode("utf-8") + except (TypeError, ValueError): + return None + return hashlib.sha256(encoded).hexdigest() + + +def _manifest_hash(path: Path) -> str: + return hashlib.sha256(path.read_bytes()).hexdigest() + + +def _compose_rollback_url(client: Any, entity: str, post_result: Any) -> str | None: + """Build the URL that DELETE will hit on rollback. + + BC POST responses include ``id`` or ``systemId`` for the new + record. We resolve the entity URL via the client's registry + (so a sandboxed profile's custom route is honoured), then append + ``()`` per OData. If the result lacks an id, rollback isn't + possible from this run alone — return ``None`` and the rollback + command will mark the step ``rollback_skipped``. + """ + if not isinstance(post_result, dict): + return None + record_id = post_result.get("id") or post_result.get("systemId") + if not record_id: + return None + try: + base = client._resolve_url(entity) # noqa: SLF001 + except Exception: + return None + if not isinstance(base, str): + _close_if_coroutine(base) + return None + # OData record URL convention: () + return f"{base}({record_id})" + + +# ─── Command: batch run ────────────────────────────────────────────── @app.command("run") @@ -143,6 +224,10 @@ def run_batch( Supports workflow features: step chaining via ${{ steps.. }} and runtime parameters via ${{ params. }}. + Every run writes a durable SQLite ledger at + ``~/.config/bcli/batch/.db`` — see ``bcli batch state`` and + ``bcli batch rollback`` to inspect or undo it. + \b Examples: bcli batch run workflow.yaml --set vendor_no=V00011 @@ -226,24 +311,65 @@ def run_batch( output_format = format + # Spin up the ledger and write the run row BEFORE any HTTP fires. + # The run-id is a fresh uuid4 so we never collide with a prior run's + # ledger file. A defensive guard in start_run raises if it does. + run_id = uuid.uuid4().hex + ledger = Ledger(run_id=run_id) + ledger.start_run( + manifest_path=str(file.resolve()), + manifest_hash=_manifest_hash(file), + profile=state.active_profile_name, + environment=state.profile.environment, + company=state.profile.company_id or "", + ) + _stderr.print(f"[dim]Ledger: {ledger.db_path}[/dim]") + + final_state = "completed" try: - results = asyncio.run(_execute_batch(steps, context=context, output_format=output_format)) + results = asyncio.run( + _execute_batch(steps, context=context, output_format=output_format, ledger=ledger) + ) succeeded = sum(1 for r in results if r.get("status") == "ok") + failed_count = sum(1 for r in results if r.get("status") == "error") + if failed_count and succeeded: + final_state = "partially_committed" + elif failed_count: + final_state = "failed" + else: + final_state = "completed" console.print(f"\n[green]✓[/green] Batch complete: {succeeded}/{len(steps)} steps succeeded") + _stderr.print(f"[dim]Run id: {run_id}[/dim]") if output: output.parent.mkdir(parents=True, exist_ok=True) output_data = { "batch": batch_name, + "run_id": run_id, "steps": results, } output.write_text(json.dumps(output_data, indent=2, default=str)) console.print(f"[dim]Results saved to {output}[/dim]") - except Exception as e: - console.print(f"[red]Batch failed:[/red] {e}") - raise typer.Exit(1) + except BaseException as e: + # BaseException covers SystemExit / KeyboardInterrupt — anything + # short of SIGKILL passes through this branch, so the ledger + # always gets a derived final state. + if isinstance(e, Exception) and not isinstance(e, (KeyboardInterrupt, SystemExit)): + console.print(f"[red]Batch failed:[/red] {e}") + # Derive the truthful state from steps that landed before the + # crash. ``compute_run_state`` looks at intent_ts vs outcome_ts + # — exactly what we need for "POST committed, then we died." + final_state = ledger.compute_run_state(run_id) + ledger.finish_run(run_id, final_state) + ledger.close() + if isinstance(e, Exception) and not isinstance(e, (KeyboardInterrupt, SystemExit)): + raise typer.Exit(1) + raise + + ledger.finish_run(run_id, final_state) + ledger.close() # ─── Dry run ───────────────────────────────────────────────────────── @@ -291,11 +417,19 @@ async def _execute_batch( *, context: Any | None = None, output_format: str | None = None, + ledger: Ledger | None = None, ) -> list[dict]: + """Execute the batch steps, writing intent + outcome rows around each + HTTP call when a ``ledger`` is supplied. + + The ``ledger`` argument is keyword-only and optional so existing + integration tests that call this function directly continue to work + unchanged. + """ from bcli.workflow._models import StepResult, WorkflowContext from bcli.workflow._resolver import resolve_references - results = [] + results: list[dict] = [] async with state.make_async_client() as client: for i, step in enumerate(steps, 1): # Resolve workflow references if in workflow mode @@ -323,6 +457,34 @@ async def _execute_batch( console.print(f" [dim]Step {i}:[/dim] {action.upper()} {endpoint}...", end=" ") + # ── Ledger: write intent BEFORE the HTTP call. ── + # The resolved URL is computed lazily via the client's + # registry. If resolution fails (or the client is a mock + # whose ``_resolve_url`` returns something non-string), fall + # back to the entity name so the operator has *something* to + # look at and the SQLite TEXT column accepts the value. + ledger_step_id: int | None = None + if ledger is not None: + intent_url = endpoint + if endpoint: + try: + candidate = client._resolve_url(endpoint, record_id=record_id) # noqa: SLF001 + if isinstance(candidate, str): + intent_url = candidate + else: + # AsyncMock returns a coroutine when treated + # as sync — close it to avoid the resource + # warning at GC time. + _close_if_coroutine(candidate) + except Exception: + intent_url = endpoint + ledger_step_id = ledger.write_intent( + seq=i, + method=action.upper() if action != "get" else "GET", + url=intent_url, + body_hash=_body_hash(data) if data is not None else None, + ) + try: if action == "get": from bcli.odata._query import Query @@ -353,6 +515,13 @@ async def _execute_batch( format_output(records, output_format) console.print() + if ledger is not None and ledger_step_id is not None: + ledger.write_outcome( + step_id=ledger_step_id, status="committed", + bc_correlation_id=None, error_message=None, + rollback_url=None, # GETs are not rollback-eligible + ) + elif action == "post": result = await client.post(endpoint, data or {}) console.print("[green]✓[/green] created") @@ -368,10 +537,25 @@ async def _execute_batch( if output_format and result: format_output([result], output_format) + if ledger is not None and ledger_step_id is not None: + rb_url = _compose_rollback_url(client, endpoint, result) + ledger.write_outcome( + step_id=ledger_step_id, status="committed", + bc_correlation_id=None, error_message=None, + rollback_url=rb_url, + ) + elif action == "patch": if not record_id: console.print("[red]✗ missing 'id' field[/red]") results.append({"step": i, "name": step_name, "action": action, "endpoint": endpoint, "status": "error", "error": "missing id"}) + if ledger is not None and ledger_step_id is not None: + ledger.write_outcome( + step_id=ledger_step_id, status="failed", + bc_correlation_id=None, + error_message="missing id", + rollback_url=None, + ) continue result = await client.patch(endpoint, record_id, data or {}, etag=etag) console.print("[green]✓[/green] updated") @@ -384,10 +568,27 @@ async def _execute_batch( StepResult(name=step_name, action=action, endpoint=endpoint, status="ok", data=result or {}), ) + if ledger is not None and ledger_step_id is not None: + ledger.write_outcome( + step_id=ledger_step_id, status="committed", + bc_correlation_id=None, error_message=None, + # PATCH rollback would need a pre-image snapshot + # we don't keep in v0.1 — see help text on + # ``bcli batch rollback``. + rollback_url=None, + ) + elif action == "delete": if not record_id: console.print("[red]✗ missing 'id' field[/red]") results.append({"step": i, "name": step_name, "action": action, "endpoint": endpoint, "status": "error", "error": "missing id"}) + if ledger is not None and ledger_step_id is not None: + ledger.write_outcome( + step_id=ledger_step_id, status="failed", + bc_correlation_id=None, + error_message="missing id", + rollback_url=None, + ) continue await client.delete(endpoint, record_id, etag=etag) console.print("[green]✓[/green] deleted") @@ -400,9 +601,23 @@ async def _execute_batch( StepResult(name=step_name, action=action, endpoint=endpoint, status="ok", data={}), ) + if ledger is not None and ledger_step_id is not None: + ledger.write_outcome( + step_id=ledger_step_id, status="committed", + bc_correlation_id=None, error_message=None, + rollback_url=None, # DELETE has no clean inverse + ) + else: console.print(f"[yellow]? unknown action '{action}'[/yellow]") results.append({"step": i, "name": step_name, "status": "skipped"}) + if ledger is not None and ledger_step_id is not None: + ledger.write_outcome( + step_id=ledger_step_id, status="skipped", + bc_correlation_id=None, + error_message=f"unknown action '{action}'", + rollback_url=None, + ) except Exception as e: console.print(f"[red]✗ {e}[/red]") @@ -412,5 +627,229 @@ async def _execute_batch( step_name, StepResult(name=step_name, action=action, endpoint=endpoint, status="error", error=str(e)), ) + if ledger is not None and ledger_step_id is not None: + ledger.write_outcome( + step_id=ledger_step_id, status="failed", + bc_correlation_id=None, + error_message=str(e), + rollback_url=None, + ) return results + + +# ─── Command: batch state ──────────────────────────────────────────── + + +@app.command("state") +def state_cmd( + run_id: str = typer.Argument(help="Run id (from `bcli batch list` or the run output)"), + format: str = typer.Option("table", "--format", "-f", help="Output format: table | json"), +) -> None: + """Show the ledger state for a single batch run.""" + ledger = Ledger(run_id=run_id) + if not ledger.db_path.exists(): + console.print(f"[red]No ledger for run-id '{run_id}'.[/red]") + raise typer.Exit(1) + + run = ledger.get_run(run_id) + run["state"] = ledger.compute_run_state(run_id) # truthful state + steps = ledger.get_steps(run_id) + ledger.close() + + if format == "json": + out = {"run": run, "steps": steps} + typer.echo(json.dumps(out, indent=2, default=str)) + return + + # Table layout + console.print(f"[bold]Run:[/bold] {run['run_id']}") + console.print( + f" [dim]Manifest:[/dim] {run['manifest_path']}\n" + f" [dim]Profile:[/dim] {run['profile']} ({run['environment']} / {run['company']})\n" + f" [dim]Started:[/dim] {run['started_at']}\n" + f" [dim]Finished:[/dim] {run.get('finished_at') or '(running)'}\n" + f" [dim]State:[/dim] {run['state']}\n" + ) + table = Table(title=f"Steps ({len(steps)})") + table.add_column("seq", justify="right") + table.add_column("method") + table.add_column("url", overflow="fold") + table.add_column("status") + table.add_column("intent_ts") + table.add_column("outcome_ts") + for s in steps: + table.add_row( + str(s["seq"]), s["method"], s["url"] or "", + s["status"] or "(none)", s["intent_ts"] or "", + s["outcome_ts"] or "", + ) + console.print(table) + + +# ─── Command: batch list ───────────────────────────────────────────── + + +@app.command("list") +def list_cmd( + state_filter: str | None = typer.Option( + None, "--state", help="Filter by derived run state (completed/failed/...)", + ), + limit: int = typer.Option(50, "--limit", help="Max rows to return"), + format: str = typer.Option("table", "--format", "-f", help="Output format: table | json"), +) -> None: + """List recent batch runs from the local ledger.""" + rows = Ledger.list_runs(state=state_filter, limit=limit) + + if format == "json": + typer.echo(json.dumps(rows, indent=2, default=str)) + return + + if not rows: + console.print("[dim]No batch runs found.[/dim]") + return + + table = Table(title="Batch runs") + table.add_column("run_id") + table.add_column("started_at") + table.add_column("state") + table.add_column("steps", justify="right") + table.add_column("profile") + table.add_column("manifest", overflow="fold") + for r in rows: + table.add_row( + r["run_id"], r["started_at"] or "", r["state"] or "", + str(r["step_count"]), r["profile"] or "", + r["manifest_path"] or "", + ) + console.print(table) + + +# ─── Command: batch rollback ───────────────────────────────────────── + + +@app.command( + "rollback", + help=( + "Roll back a batch run.\n\n" + "v0.1 limitation: only committed POSTs are reversed (DELETE on the " + "captured record URL). PATCH and DELETE require manual cleanup — " + "there is no clean inverse without a pre-image snapshot. Those " + "steps are flagged 'rollback_skipped'.\n\n" + "Refuses to run on profiles with disable_writes=true. The --yes " + "flag does NOT bypass that gate." + ), +) +def rollback_cmd( + run_id: str = typer.Argument(help="Run id to roll back"), + dry_run: bool = typer.Option( + False, "--dry-run", help="Show which steps would be reversed; touch nothing.", + ), + yes: bool = typer.Option( + False, "--yes", "-y", help="Skip the interactive confirmation prompt.", + ), +) -> None: + ledger = Ledger(run_id=run_id) + if not ledger.db_path.exists(): + console.print(f"[red]No ledger for run-id '{run_id}'.[/red]") + raise typer.Exit(1) + + # disable_writes is a HARD refusal here. The operator opted into + # read-only mode — silently undoing committed work would defeat the + # safety. --yes does NOT bypass this gate (unlike `batch run`). + if getattr(state.profile, "disable_writes", False): + console.print( + f"[red]✗ Profile '{state.active_profile_name}' has " + "disable_writes=true.[/red]" + ) + console.print( + "[dim]Rollback issues inverse mutations and is refused on " + "read-only profiles. Switch to a writable profile to undo " + "this run.[/dim]" + ) + raise typer.Exit(1) + + steps = ledger.get_steps(run_id) + # Reverse seq so we undo most recent first. + steps_reverse = sorted(steps, key=lambda s: s["seq"], reverse=True) + + plan: list[tuple[dict, str]] = [] # (step_row, action) + for s in steps_reverse: + status = s["status"] + method = (s["method"] or "").upper() + if status != "committed": + # Intent-only or already-failed/rolled steps are not eligible. + continue + if method == "POST" and s["rollback_url"]: + plan.append((s, "delete")) + elif method in {"PATCH", "DELETE"}: + plan.append((s, "skip")) + elif method == "POST" and not s["rollback_url"]: + # POST that returned no id — we can't compose a DELETE URL. + plan.append((s, "skip")) + + if not plan: + console.print("[yellow]Nothing to roll back.[/yellow]") + ledger.close() + return + + # Summary preview before any mutation. + console.print( + f"[bold]Rollback plan for run {run_id}:[/bold] " + f"{sum(1 for _, a in plan if a == 'delete')} delete(s), " + f"{sum(1 for _, a in plan if a == 'skip')} skipped" + ) + for step_row, plan_action in plan: + verb = "DELETE" if plan_action == "delete" else "SKIP " + url = step_row["rollback_url"] or step_row["url"] + console.print(f" {verb} {url}") + + if dry_run: + ledger.close() + return + + # Confirmation prompt. --yes skips. + if not yes: + import sys + + if not sys.stdin.isatty(): + console.print( + "[red]✗ Refusing to roll back: non-interactive session and " + "--yes was not passed.[/red]" + ) + raise typer.Exit(1) + answer = typer.prompt( + "Type 'yes' to apply the rollback, anything else to cancel", + default="", + show_default=False, + ) + if answer.strip().lower() != "yes": + console.print("[red]✗ Cancelled.[/red]") + raise typer.Exit(1) + + asyncio.run(_apply_rollback(plan, ledger=ledger)) + + ledger.finish_run(run_id, "rolled_back") + ledger.close() + console.print(f"[green]✓[/green] Rollback applied for run {run_id}.") + + +async def _apply_rollback(plan: list[tuple[dict, str]], *, ledger: Ledger) -> None: + """Issue the planned inverse operations and update step statuses.""" + async with state.make_async_client() as client: + for step_row, plan_action in plan: + if plan_action == "delete": + url = step_row["rollback_url"] + try: + await client.delete_url(url) + ledger.update_step_status(step_row["step_id"], "rolled_back") + console.print(f" [green]✓[/green] DELETE {url}") + except Exception as e: # noqa: BLE001 + ledger.update_step_status(step_row["step_id"], "rollback_failed") + console.print(f" [red]✗[/red] DELETE {url}: {e}") + else: + ledger.update_step_status(step_row["step_id"], "rollback_skipped") + console.print( + f" [yellow]?[/yellow] SKIP {step_row['method']} " + f"{step_row['url']} (no clean inverse; manual cleanup required)" + ) diff --git a/tests/test_batch_ledger/__init__.py b/tests/test_batch_ledger/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_batch_ledger/test_batch_cmd_ledger.py b/tests/test_batch_ledger/test_batch_cmd_ledger.py new file mode 100644 index 0000000..4929a50 --- /dev/null +++ b/tests/test_batch_ledger/test_batch_cmd_ledger.py @@ -0,0 +1,265 @@ +"""``bcli batch run`` writes an intent row before each HTTP call and an +outcome row after, surviving SIGKILL in the gap. + +These are integration tests against ``run_batch`` — they patch HOME so the +ledger lands in tmp_path, patch ``make_async_client`` to inject a fake +async client, and assert on the resulting SQLite file. +""" + +from __future__ import annotations + +import sqlite3 +import textwrap +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +from bcli.config._model import BCConfig, BCDefaults, BCProfile +from bcli_cli._state import state +from bcli_cli.commands.batch_cmd import run_batch + + +# ─── Fixtures ──────────────────────────────────────────────────────── + + +def _writable_config() -> BCConfig: + return BCConfig( + defaults=BCDefaults(profile="dev"), + profiles={ + "dev": BCProfile( + tenant_id="t1", + environment="Sandbox", + company_id="c-1", + disable_writes=False, + ), + }, + ) + + +@pytest.fixture +def writable_state(): + state._config = _writable_config() + state._registry = None + state.profile_name = None + state.dry_run = False + state.quiet = True + yield + state._config = None + state._registry = None + + +@pytest.fixture +def ledger_home(tmp_path, monkeypatch): + """Force the ledger to live under tmp_path/.config/bcli/batch.""" + monkeypatch.setenv("HOME", str(tmp_path)) + # Some platforms also look up Path.home() via pwd; patch directly. + monkeypatch.setattr(Path, "home", lambda: tmp_path) + ledger_dir = tmp_path / ".config" / "bcli" / "batch" + yield ledger_dir + + +@pytest.fixture +def fake_client(): + c = AsyncMock() + c.__aenter__ = AsyncMock(return_value=c) + c.__aexit__ = AsyncMock(return_value=False) + c.post = AsyncMock(return_value={"id": "rec-1", "@odata.context": "x"}) + c.patch = AsyncMock(return_value={"id": "rec-1"}) + c.delete = AsyncMock(return_value=None) + c.get = AsyncMock( + return_value=type("R", (), {"value": [], "raw": None})() + ) + # Used so the ledger can compose a rollback URL for POSTs. + c._resolve_url = lambda entity, record_id=None, **_: ( + f"https://x/{entity}({record_id})" if record_id else f"https://x/{entity}" + ) + return c + + +def _write_yaml(tmp_path: Path, name: str, content: str) -> Path: + f = tmp_path / name + f.write_text(textwrap.dedent(content).strip(), encoding="utf-8") + return f + + +# ─── Intent before HTTP ────────────────────────────────────────────── + + +class TestIntentBeforeHttp: + def test_intent_written_before_post( + self, writable_state, ledger_home, fake_client, tmp_path, + ): + """When the fake client's POST is invoked, an intent row must + already exist in the ledger DB. The test enforces ordering via a + side_effect that reads the DB at the moment of the call. + """ + captured = {} + + async def post_side_effect(entity, body, **kwargs): + # Scan the batch ledger dir for any *.db that contains an + # intent row for this POST. Done at call time so we catch a + # write-order regression. + db_files = list(ledger_home.glob("*.db")) + assert db_files, "ledger DB should exist before HTTP fires" + rows: list = [] + for db in db_files: + with sqlite3.connect(db) as conn: + conn.row_factory = sqlite3.Row + rows.extend( + dict(r) for r in conn.execute( + "SELECT * FROM step WHERE method = 'POST'" + ) + ) + assert any(r["intent_ts"] for r in rows), ( + "intent row must be written before HTTP" + ) + assert all(r["outcome_ts"] is None for r in rows), ( + "outcome must NOT be written before HTTP returns" + ) + captured["body"] = body + return {"id": "created-1"} + + fake_client.post.side_effect = post_side_effect + + f = _write_yaml(tmp_path, "p.yaml", """ + steps: + - name: create + action: post + endpoint: items + data: + displayName: "ok" + """) + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + run_batch( + file=f, dry_run=False, output=None, format=None, + set_params=None, params_file=None, yes=True, + ) + + # Post-run: outcome row populated. + db_files = list(ledger_home.glob("*.db")) + assert len(db_files) == 1 + with sqlite3.connect(db_files[0]) as conn: + conn.row_factory = sqlite3.Row + rows = [ + dict(r) for r in conn.execute( + "SELECT * FROM step ORDER BY seq" + ) + ] + assert len(rows) == 1 + assert rows[0]["status"] == "committed" + assert rows[0]["outcome_ts"] is not None + assert rows[0]["method"] == "POST" + assert "items" in rows[0]["url"] + + def test_partially_committed_after_simulated_crash( + self, writable_state, ledger_home, fake_client, tmp_path, + ): + """Step 1 succeeds; step 2 'crashes' (raises mid-flight) — the run + ledger surfaces ``partially_committed`` on read. + """ + call_count = {"n": 0} + + async def post_side_effect(entity, body, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 2: + # Simulate SIGKILL between intent + outcome by raising a + # BaseException that batch_cmd cannot catch with `except + # Exception:`. + raise BaseException("simulated SIGKILL") + return {"id": f"rec-{call_count['n']}"} + + fake_client.post.side_effect = post_side_effect + + f = _write_yaml(tmp_path, "two.yaml", """ + steps: + - name: s1 + action: post + endpoint: items + data: {x: 1} + - name: s2 + action: post + endpoint: items + data: {x: 2} + """) + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + with pytest.raises(BaseException, match="simulated SIGKILL"): + run_batch( + file=f, dry_run=False, output=None, format=None, + set_params=None, params_file=None, yes=True, + ) + + # Read the ledger after the crash. + from bcli.batch.ledger import Ledger + + rows = Ledger.list_runs(base_dir=ledger_home) + assert len(rows) == 1 + # Either persisted via SIGKILL-safe handler OR derived on read — + # the contract is "the operator sees partially_committed". + assert rows[0]["state"] == "partially_committed" + + def test_normal_completion_marks_completed( + self, writable_state, ledger_home, fake_client, tmp_path, + ): + f = _write_yaml(tmp_path, "ok.yaml", """ + steps: + - name: s1 + action: post + endpoint: items + data: {x: 1} + """) + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + run_batch( + file=f, dry_run=False, output=None, format=None, + set_params=None, params_file=None, yes=True, + ) + + from bcli.batch.ledger import Ledger + rows = Ledger.list_runs(base_dir=ledger_home) + assert len(rows) == 1 + assert rows[0]["state"] == "completed" + + +# ─── Rollback URL capture ──────────────────────────────────────────── + + +class TestRollbackUrlCapture: + def test_post_step_stores_rollback_url( + self, writable_state, ledger_home, fake_client, tmp_path, + ): + """POST that returns an ``id`` → rollback_url = entity_url(id).""" + fake_client.post.return_value = {"id": "VND-1"} + f = _write_yaml(tmp_path, "p.yaml", """ + steps: + - name: create_vendor + action: post + endpoint: vendors + data: {name: "AAR"} + """) + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + run_batch( + file=f, dry_run=False, output=None, format=None, + set_params=None, params_file=None, yes=True, + ) + + db_files = list(ledger_home.glob("*.db")) + with sqlite3.connect(db_files[0]) as conn: + conn.row_factory = sqlite3.Row + row = dict( + conn.execute("SELECT * FROM step LIMIT 1").fetchone() + ) + assert row["rollback_url"] + assert "vendors" in row["rollback_url"] + assert "VND-1" in row["rollback_url"] diff --git a/tests/test_batch_ledger/test_ledger_schema.py b/tests/test_batch_ledger/test_ledger_schema.py new file mode 100644 index 0000000..b3f8c04 --- /dev/null +++ b/tests/test_batch_ledger/test_ledger_schema.py @@ -0,0 +1,337 @@ +"""Unit tests for the SQLite Ledger — schema, write paths, derivations. + +These tests target ``bcli.batch.ledger.Ledger`` directly. CLI-level behavior +(``bcli batch run`` writing intent before HTTP, the ``state/list/rollback`` +commands) is covered in ``test_batch_cmd_ledger.py`` and +``test_rollback_cmd.py``. +""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest + +from bcli.batch.ledger import Ledger, RunLedgerExistsError + + +@pytest.fixture +def base_dir(tmp_path: Path) -> Path: + """Per-test ledger root.""" + d = tmp_path / "batch" + d.mkdir() + return d + + +# ─── Schema + creation ─────────────────────────────────────────────── + + +class TestLedgerSchema: + def test_ledger_created_at_run_start(self, base_dir): + """Opening a Ledger + start_run creates .db with `run` + `step` tables.""" + ledger = Ledger(run_id="abc", base_dir=base_dir) + ledger.start_run( + manifest_path="/x/y.yaml", + manifest_hash="deadbeef", + profile="finance_sandbox", + environment="Sandbox", + company="BTUSALLC", + ) + + db_path = base_dir / "abc.db" + assert db_path.exists() + + with sqlite3.connect(db_path) as conn: + tables = { + row[0] + for row in conn.execute( + "SELECT name FROM sqlite_master WHERE type='table'" + ) + } + assert {"run", "step", "schema_version"}.issubset(tables) + + def test_ledger_run_metadata(self, base_dir): + ledger = Ledger(run_id="run-meta", base_dir=base_dir) + ledger.start_run( + manifest_path="/path/to/wf.yaml", + manifest_hash="cafe", + profile="prod", + environment="Production", + company="BTALI", + ) + + run = ledger.get_run("run-meta") + assert run["run_id"] == "run-meta" + assert run["manifest_path"] == "/path/to/wf.yaml" + assert run["manifest_hash"] == "cafe" + assert run["profile"] == "prod" + assert run["environment"] == "Production" + assert run["company"] == "BTALI" + assert run["state"] == "running" + assert run["started_at"] # ISO timestamp populated + + def test_schema_version_recorded(self, base_dir): + Ledger(run_id="r", base_dir=base_dir).start_run( + manifest_path="", manifest_hash="", profile="p", environment="e", company="c", + ) + with sqlite3.connect(base_dir / "r.db") as conn: + (version,) = conn.execute("SELECT version FROM schema_version").fetchone() + assert version == 1 + + def test_state_enum_allowed_values(self, base_dir): + """Run.state CHECK enum includes every value in the contract.""" + ledger = Ledger(run_id="enum", base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", profile="p", environment="e", company="c", + ) + allowed = ( + "planned", "running", "partially_committed", "completed", + "failed", "cancelled", "rolled_back", + ) + for state_value in allowed: + ledger.finish_run("enum", state_value) + assert ledger.get_run("enum")["state"] == state_value + + # Bad value rejected by the CHECK constraint. + with pytest.raises(sqlite3.IntegrityError): + ledger.finish_run("enum", "not_a_real_state") + + def test_rerun_with_same_run_id_errors(self, base_dir): + """Defensive: start_run twice on the same run_id raises.""" + ledger = Ledger(run_id="dupe", base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", profile="p", environment="e", company="c", + ) + + ledger2 = Ledger(run_id="dupe", base_dir=base_dir) + with pytest.raises(RunLedgerExistsError): + ledger2.start_run( + manifest_path="", manifest_hash="", + profile="p", environment="e", company="c", + ) + + +# ─── Intent + outcome rows ─────────────────────────────────────────── + + +class TestIntentAndOutcome: + def test_write_intent_returns_step_id(self, base_dir): + ledger = Ledger(run_id="r", base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", profile="p", environment="e", company="c", + ) + + sid = ledger.write_intent( + seq=1, method="POST", url="https://x/api/v2.0/vendors", + body_hash="bh-1", + ) + assert isinstance(sid, int) + assert sid >= 1 + + rows = ledger.get_steps("r") + assert len(rows) == 1 + row = rows[0] + assert row["seq"] == 1 + assert row["method"] == "POST" + assert row["url"] == "https://x/api/v2.0/vendors" + assert row["body_hash"] == "bh-1" + assert row["intent_ts"] + # Outcome fields empty until write_outcome is called. + assert row["outcome_ts"] is None + assert row["status"] is None + + def test_write_outcome_marks_committed(self, base_dir): + ledger = Ledger(run_id="r", base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", profile="p", environment="e", company="c", + ) + sid = ledger.write_intent( + seq=1, method="POST", url="https://x/vendors", body_hash="h", + ) + ledger.write_outcome( + step_id=sid, status="committed", + bc_correlation_id="corr-7", error_message=None, + rollback_url="https://x/vendors(VND-1)", + ) + rows = ledger.get_steps("r") + assert rows[0]["status"] == "committed" + assert rows[0]["bc_correlation_id"] == "corr-7" + assert rows[0]["rollback_url"] == "https://x/vendors(VND-1)" + assert rows[0]["error_message"] is None + assert rows[0]["outcome_ts"] + + def test_write_outcome_failed(self, base_dir): + ledger = Ledger(run_id="r", base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", profile="p", environment="e", company="c", + ) + sid = ledger.write_intent( + seq=1, method="POST", url="https://x/vendors", body_hash="h", + ) + ledger.write_outcome( + step_id=sid, status="failed", + bc_correlation_id="corr-bad", error_message="500 server error", + rollback_url=None, + ) + row = ledger.get_steps("r")[0] + assert row["status"] == "failed" + assert row["error_message"] == "500 server error" + + +# ─── compute_run_state ─────────────────────────────────────────────── + + +class TestComputeRunState: + def _setup(self, base_dir, run_id="r"): + ledger = Ledger(run_id=run_id, base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", profile="p", environment="e", company="c", + ) + return ledger + + def test_no_steps_yet_is_running(self, base_dir): + ledger = self._setup(base_dir) + assert ledger.compute_run_state("r") == "running" + + def test_all_committed_is_completed(self, base_dir): + ledger = self._setup(base_dir) + for i in range(1, 4): + sid = ledger.write_intent(seq=i, method="POST", url=f"u{i}", body_hash="h") + ledger.write_outcome( + step_id=sid, status="committed", + bc_correlation_id=None, error_message=None, rollback_url=None, + ) + assert ledger.compute_run_state("r") == "completed" + + def test_intent_without_outcome_is_partially_committed(self, base_dir): + """Step 1 committed, step 2 intent only → simulates SIGKILL between steps.""" + ledger = self._setup(base_dir) + sid1 = ledger.write_intent(seq=1, method="POST", url="u1", body_hash="h") + ledger.write_outcome( + step_id=sid1, status="committed", + bc_correlation_id=None, error_message=None, rollback_url=None, + ) + ledger.write_intent(seq=2, method="POST", url="u2", body_hash="h") + # No outcome for step 2 — process died here. + assert ledger.compute_run_state("r") == "partially_committed" + + def test_failed_after_committed_is_partially_committed(self, base_dir): + ledger = self._setup(base_dir) + sid1 = ledger.write_intent(seq=1, method="POST", url="u1", body_hash="h") + ledger.write_outcome( + step_id=sid1, status="committed", + bc_correlation_id=None, error_message=None, rollback_url=None, + ) + sid2 = ledger.write_intent(seq=2, method="POST", url="u2", body_hash="h") + ledger.write_outcome( + step_id=sid2, status="failed", + bc_correlation_id=None, error_message="boom", rollback_url=None, + ) + assert ledger.compute_run_state("r") == "partially_committed" + + def test_only_failures_no_commits_is_failed(self, base_dir): + ledger = self._setup(base_dir) + sid = ledger.write_intent(seq=1, method="POST", url="u1", body_hash="h") + ledger.write_outcome( + step_id=sid, status="failed", + bc_correlation_id=None, error_message="boom", rollback_url=None, + ) + assert ledger.compute_run_state("r") == "failed" + + +# ─── list_runs ─────────────────────────────────────────────────────── + + +class TestListRuns: + def test_list_recent_sorted_by_started_at_desc(self, base_dir): + import time + + for i in range(3): + ledger = Ledger(run_id=f"run-{i}", base_dir=base_dir) + ledger.start_run( + manifest_path=f"/p/{i}", manifest_hash=f"h{i}", + profile="p", environment="e", company="c", + ) + ledger.finish_run(f"run-{i}", "completed") + time.sleep(0.01) # ensure distinct started_at + + # Scan happens with a class method so we don't keep a stale connection. + rows = Ledger.list_runs(base_dir=base_dir) + ids = [r["run_id"] for r in rows] + # Most recent first + assert ids == ["run-2", "run-1", "run-0"] + for r in rows: + assert r["state"] == "completed" + assert r["step_count"] == 0 + + def test_list_filter_by_state(self, base_dir): + runs = [ + ("a", "completed"), + ("b", "failed"), + ("c", "completed"), + ] + for rid, st in runs: + ledger = Ledger(run_id=rid, base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", + profile="p", environment="e", company="c", + ) + ledger.finish_run(rid, st) + + rows = Ledger.list_runs(base_dir=base_dir, state="failed") + assert [r["run_id"] for r in rows] == ["b"] + + def test_list_uses_computed_state_for_unfinished_runs(self, base_dir): + """A SIGKILLed run never had finish_run called. Its DB still says + 'running' — but ``list_runs`` should surface the derived + ``partially_committed`` so an operator sees reality. + """ + ledger = Ledger(run_id="ghost", base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", + profile="p", environment="e", company="c", + ) + sid = ledger.write_intent(seq=1, method="POST", url="u1", body_hash="h") + ledger.write_outcome( + step_id=sid, status="committed", + bc_correlation_id=None, error_message=None, rollback_url=None, + ) + ledger.write_intent(seq=2, method="POST", url="u2", body_hash="h") + # Process dies — no finish_run, no outcome on step 2. + + rows = Ledger.list_runs(base_dir=base_dir) + ghost = [r for r in rows if r["run_id"] == "ghost"][0] + assert ghost["state"] == "partially_committed" + + def test_list_step_count(self, base_dir): + ledger = Ledger(run_id="r", base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", + profile="p", environment="e", company="c", + ) + for i in range(1, 4): + ledger.write_intent(seq=i, method="GET", url=f"u{i}", body_hash=None) + rows = Ledger.list_runs(base_dir=base_dir) + assert rows[0]["step_count"] == 3 + + +# ─── Durability ────────────────────────────────────────────────────── + + +class TestDurability: + def test_wal_mode_enabled(self, base_dir): + """WAL journal mode + a sync mode that fsyncs commits — required so + an intent row durably lands before HTTP is dispatched. + """ + ledger = Ledger(run_id="r", base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", + profile="p", environment="e", company="c", + ) + with sqlite3.connect(base_dir / "r.db") as conn: + (mode,) = conn.execute("PRAGMA journal_mode").fetchone() + (synchronous,) = conn.execute("PRAGMA synchronous").fetchone() + assert mode.lower() == "wal" + # 1 = NORMAL, 2 = FULL — both flush at commit. 0 (OFF) is unsafe. + assert synchronous in (1, 2) diff --git a/tests/test_batch_ledger/test_rollback_cmd.py b/tests/test_batch_ledger/test_rollback_cmd.py new file mode 100644 index 0000000..755dc3b --- /dev/null +++ b/tests/test_batch_ledger/test_rollback_cmd.py @@ -0,0 +1,299 @@ +"""``bcli batch rollback `` issues inverse operations for committed +POST steps and refuses to touch PATCH/DELETE without manual cleanup.""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest +from typer.testing import CliRunner + +from bcli.batch.ledger import Ledger +from bcli.config._model import BCConfig, BCDefaults, BCProfile +from bcli_cli._state import state +from bcli_cli.app import app + + +# ─── Fixtures ──────────────────────────────────────────────────────── + + +@pytest.fixture +def ledger_home(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + yield tmp_path / ".config" / "bcli" / "batch" + + +def _writable_config() -> BCConfig: + return BCConfig( + defaults=BCDefaults(profile="dev"), + profiles={ + "dev": BCProfile( + tenant_id="t1", + environment="Sandbox", + company_id="c-1", + disable_writes=False, + ), + }, + ) + + +def _readonly_config() -> BCConfig: + return BCConfig( + defaults=BCDefaults(profile="ro"), + profiles={ + "ro": BCProfile( + tenant_id="t1", + environment="Production", + company_id="c-1", + disable_writes=True, + ), + }, + ) + + +@pytest.fixture +def writable_state(): + state._config = _writable_config() + state._registry = None + state.profile_name = None + state.dry_run = False + state.quiet = True + yield + state._config = None + state._registry = None + + +@pytest.fixture +def readonly_state(): + state._config = _readonly_config() + state._registry = None + state.profile_name = None + state.dry_run = False + state.quiet = True + yield + state._config = None + state._registry = None + + +@pytest.fixture +def fake_client(): + c = AsyncMock() + c.__aenter__ = AsyncMock(return_value=c) + c.__aexit__ = AsyncMock(return_value=False) + c.delete = AsyncMock(return_value=None) + c.delete_url = AsyncMock(return_value=None) + return c + + +@pytest.fixture +def runner() -> CliRunner: + return CliRunner() + + +def _seed_committed_post(base_dir: Path, run_id: str, *, rollback_url="https://x/items(rec-1)"): + """Seed a ledger with one committed POST whose rollback_url is known.""" + ledger = Ledger(run_id=run_id, base_dir=base_dir) + ledger.start_run( + manifest_path=f"/wf/{run_id}.yaml", manifest_hash="hh", + profile="dev", environment="Sandbox", company="c-1", + ) + sid = ledger.write_intent(seq=1, method="POST", url="https://x/items", body_hash="bh") + ledger.write_outcome( + step_id=sid, status="committed", + bc_correlation_id="corr-1", error_message=None, + rollback_url=rollback_url, + ) + ledger.finish_run(run_id, "completed") + ledger.close() # avoid holding the WAL while another Ledger opens it + return ledger + + +# ─── POST rollback ─────────────────────────────────────────────────── + + +class TestPostRollback: + def test_committed_post_issues_delete( + self, writable_state, ledger_home, fake_client, runner, + ): + _seed_committed_post(ledger_home, "rb-1") + + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + result = runner.invoke(app, ["batch", "rollback", "rb-1", "--yes"]) + + assert result.exit_code == 0, result.output + # The fake client's delete_url (or delete) was invoked with the + # stored rollback_url. + called = ( + fake_client.delete_url.await_count + fake_client.delete.await_count + ) + assert called == 1 + + # Step state updated to rolled_back; run state too. + from bcli.batch.ledger import Ledger as L + + rows = L.list_runs(base_dir=ledger_home) + assert rows[0]["state"] == "rolled_back" + + def test_rollback_dry_run_does_not_call_delete( + self, writable_state, ledger_home, fake_client, runner, + ): + _seed_committed_post(ledger_home, "rb-dry") + + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + result = runner.invoke( + app, ["batch", "rollback", "rb-dry", "--dry-run", "--yes"], + ) + assert result.exit_code == 0, result.output + fake_client.delete_url.assert_not_awaited() + fake_client.delete.assert_not_awaited() + + +# ─── PATCH / DELETE skip ───────────────────────────────────────────── + + +class TestPatchDeleteSkipped: + def _seed_committed_patch(self, base_dir, run_id): + ledger = Ledger(run_id=run_id, base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", + profile="dev", environment="Sandbox", company="c-1", + ) + sid = ledger.write_intent( + seq=1, method="PATCH", url="https://x/items(rec-1)", body_hash="bh", + ) + ledger.write_outcome( + step_id=sid, status="committed", + bc_correlation_id="corr-1", error_message=None, + rollback_url=None, + ) + ledger.finish_run(run_id, "completed") + ledger.close() + + def _seed_committed_delete(self, base_dir, run_id): + ledger = Ledger(run_id=run_id, base_dir=base_dir) + ledger.start_run( + manifest_path="", manifest_hash="", + profile="dev", environment="Sandbox", company="c-1", + ) + sid = ledger.write_intent( + seq=1, method="DELETE", url="https://x/items(rec-1)", body_hash="bh", + ) + ledger.write_outcome( + step_id=sid, status="committed", + bc_correlation_id="corr-1", error_message=None, + rollback_url=None, + ) + ledger.finish_run(run_id, "completed") + ledger.close() + + def test_patch_step_is_rollback_skipped( + self, writable_state, ledger_home, fake_client, runner, + ): + self._seed_committed_patch(ledger_home, "rb-patch") + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + result = runner.invoke(app, ["batch", "rollback", "rb-patch", "--yes"]) + assert result.exit_code == 0, result.output + fake_client.delete_url.assert_not_awaited() + fake_client.delete.assert_not_awaited() + + # Step status flipped to rollback_skipped. + db_files = list(ledger_home.glob("*.db")) + with sqlite3.connect(db_files[0]) as conn: + conn.row_factory = sqlite3.Row + (status,) = conn.execute( + "SELECT status FROM step LIMIT 1" + ).fetchone() + assert status == "rollback_skipped" + + def test_delete_step_is_rollback_skipped( + self, writable_state, ledger_home, fake_client, runner, + ): + self._seed_committed_delete(ledger_home, "rb-del") + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + result = runner.invoke(app, ["batch", "rollback", "rb-del", "--yes"]) + assert result.exit_code == 0 + fake_client.delete_url.assert_not_awaited() + fake_client.delete.assert_not_awaited() + + +# ─── disable_writes refusal ────────────────────────────────────────── + + +class TestDisableWritesBlocksRollback: + def test_readonly_profile_aborts_before_any_http( + self, readonly_state, ledger_home, fake_client, runner, + ): + _seed_committed_post(ledger_home, "rb-ro") + + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + # --yes does NOT bypass disable_writes on rollback; refusal + # is unconditional. We invoke without --yes and assert + # non-zero exit. + result = runner.invoke(app, ["batch", "rollback", "rb-ro"]) + + assert result.exit_code != 0 + fake_client.delete_url.assert_not_awaited() + fake_client.delete.assert_not_awaited() + + +# ─── Unknown (intent-only) steps skipped ───────────────────────────── + + +class TestUnknownStepsSkipped: + def test_intent_only_step_is_not_rolled_back( + self, writable_state, ledger_home, fake_client, runner, + ): + """An intent row with no outcome (e.g. the process died between + intent and outcome) must not be 'rolled back' — we don't know + whether the HTTP succeeded server-side. + """ + ledger = Ledger(run_id="rb-unk", base_dir=ledger_home) + ledger.start_run( + manifest_path="", manifest_hash="", + profile="dev", environment="Sandbox", company="c-1", + ) + sid_committed = ledger.write_intent( + seq=1, method="POST", url="https://x/items", body_hash="bh", + ) + ledger.write_outcome( + step_id=sid_committed, status="committed", + bc_correlation_id="c1", error_message=None, + rollback_url="https://x/items(rec-1)", + ) + # Step 2: intent only. + ledger.write_intent( + seq=2, method="POST", url="https://x/items", body_hash="bh2", + ) + ledger.close() + + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + result = runner.invoke(app, ["batch", "rollback", "rb-unk", "--yes"]) + + assert result.exit_code == 0, result.output + # Exactly one rollback DELETE, for the committed step. The + # intent-only step is left alone. + called = ( + fake_client.delete_url.await_count + fake_client.delete.await_count + ) + assert called == 1 diff --git a/tests/test_batch_ledger/test_state_list_cmds.py b/tests/test_batch_ledger/test_state_list_cmds.py new file mode 100644 index 0000000..0177868 --- /dev/null +++ b/tests/test_batch_ledger/test_state_list_cmds.py @@ -0,0 +1,124 @@ +"""``bcli batch state`` and ``bcli batch list`` commands. + +These talk to the same ledger files written by ``batch run``. We seed the +ledger by hand (rather than running a whole batch) so the tests focus on +the read-side commands. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from bcli.batch.ledger import Ledger +from bcli_cli.app import app +from bcli_cli._state import state + + +@pytest.fixture +def ledger_home(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + yield tmp_path / ".config" / "bcli" / "batch" + + +@pytest.fixture(autouse=True) +def reset_state(monkeypatch): + state._config = None + state._registry = None + state.profile_name = None + state.dry_run = False + state.quiet = True + yield + + +@pytest.fixture +def runner() -> CliRunner: + return CliRunner() + + +def _seed_run(base_dir: Path, run_id: str, *, with_step=True, finish="completed"): + """Helper: create a ledger file with a single committed step.""" + ledger = Ledger(run_id=run_id, base_dir=base_dir) + ledger.start_run( + manifest_path=f"/wf/{run_id}.yaml", manifest_hash="hh", + profile="dev", environment="Sandbox", company="c-1", + ) + if with_step: + sid = ledger.write_intent(seq=1, method="POST", url="https://x/items", body_hash="bh") + ledger.write_outcome( + step_id=sid, status="committed", + bc_correlation_id="corr-1", error_message=None, + rollback_url="https://x/items(rec-1)", + ) + if finish: + ledger.finish_run(run_id, finish) + ledger.close() + return ledger + + +# ─── bcli batch state ──────────────────────────────────────────────── + + +class TestBatchState: + def test_state_json_returns_full_ledger(self, ledger_home, runner): + _seed_run(ledger_home, "alpha") + result = runner.invoke(app, ["batch", "state", "alpha", "--format", "json"]) + assert result.exit_code == 0, result.output + + # The JSON payload is written to stdout. The state command sets + # state.quiet = True for json output, so no banner pollution. + payload = json.loads(result.stdout) + assert payload["run"]["run_id"] == "alpha" + assert payload["run"]["state"] == "completed" + assert len(payload["steps"]) == 1 + assert payload["steps"][0]["status"] == "committed" + assert payload["steps"][0]["method"] == "POST" + + def test_state_unknown_run_id_errors(self, ledger_home, runner): + result = runner.invoke(app, ["batch", "state", "nope", "--format", "json"]) + assert result.exit_code != 0 + + def test_state_table_renders_a_summary(self, ledger_home, runner): + _seed_run(ledger_home, "table-run") + result = runner.invoke(app, ["batch", "state", "table-run", "--format", "table"]) + assert result.exit_code == 0, result.output + # The Rich table includes the run_id and a step row. + assert "table-run" in result.stdout + assert "POST" in result.stdout + + +# ─── bcli batch list ───────────────────────────────────────────────── + + +class TestBatchList: + def test_list_returns_recent_runs(self, ledger_home, runner): + _seed_run(ledger_home, "r1", finish="completed") + _seed_run(ledger_home, "r2", finish="failed") + + result = runner.invoke(app, ["batch", "list", "--format", "json"]) + assert result.exit_code == 0, result.output + + rows = json.loads(result.stdout) + ids = {row["run_id"] for row in rows} + assert ids == {"r1", "r2"} + + def test_list_filter_by_state(self, ledger_home, runner): + _seed_run(ledger_home, "r-ok", finish="completed") + _seed_run(ledger_home, "r-bad", finish="failed") + + result = runner.invoke( + app, ["batch", "list", "--state", "failed", "--format", "json"], + ) + assert result.exit_code == 0, result.output + rows = json.loads(result.stdout) + assert [r["run_id"] for r in rows] == ["r-bad"] + + def test_list_empty(self, ledger_home, runner): + ledger_home.mkdir(parents=True, exist_ok=True) + result = runner.invoke(app, ["batch", "list", "--format", "json"]) + assert result.exit_code == 0 + assert json.loads(result.stdout) == [] diff --git a/tests/test_cli/test_batch_safety.py b/tests/test_cli/test_batch_safety.py index 51ecce7..8a15d6f 100644 --- a/tests/test_cli/test_batch_safety.py +++ b/tests/test_cli/test_batch_safety.py @@ -111,6 +111,18 @@ def non_interactive(monkeypatch): yield +@pytest.fixture(autouse=True) +def _isolate_ledger_home(tmp_path, monkeypatch): + """Phase 3 ledger writes ~/.config/bcli/batch/.db on every + ``run_batch`` invocation. These safety tests pre-date the ledger and + don't care about its output — but they MUST NOT pollute the + developer's real home dir. Redirect Path.home() to tmp_path so the + ledger files land in a tear-down-friendly location. + """ + monkeypatch.setattr(Path, "home", lambda: tmp_path) + yield + + # ── Read-only profile blocks mutating batches ────────────────────────────