diff --git a/src/bcli/batch/ledger.py b/src/bcli/batch/ledger.py index 4975a9b..f5719bc 100644 --- a/src/bcli/batch/ledger.py +++ b/src/bcli/batch/ledger.py @@ -39,7 +39,7 @@ ``list_runs`` and ``batch state`` both call this on read so the operator always sees the truthful state, not a stale stamp. -Schema (version 1) +Schema (version 2) ------------------ :: @@ -63,13 +63,21 @@ status TEXT, -- "committed" | "failed" | "rollback_skipped" | "rolled_back" | "unknown" bc_correlation_id TEXT, error_message TEXT, - rollback_url TEXT + rollback_url TEXT, + idempotency_key TEXT -- v2 (AIP §Phase 4d) ); CREATE TABLE schema_version (version INTEGER); The step ``status`` column is intentionally not constrained — rollback introduces transient states (e.g. ``rollback_skipped``) we don't want to keep adding to a CHECK enum. + +Migrations +---------- +``_ensure_schema`` inspects ``schema_version`` on every connect. v1 +ledgers get an ``ALTER TABLE step ADD COLUMN idempotency_key TEXT`` and +a version bump — non-destructive, preserves all existing rows. New +ledgers get the column inline. """ from __future__ import annotations @@ -80,7 +88,7 @@ from pathlib import Path from typing import Any -SCHEMA_VERSION = 1 +SCHEMA_VERSION = 2 # Allowed values for ``run.state``. Mirrors the contract doc §Phase 3 # enum. Step-level statuses are deliberately *not* enforced — they @@ -232,7 +240,8 @@ def _ensure_schema(conn: sqlite3.Connection) -> None: status TEXT, bc_correlation_id TEXT, error_message TEXT, - rollback_url TEXT + rollback_url TEXT, + idempotency_key TEXT ) """, ) @@ -245,6 +254,21 @@ def _ensure_schema(conn: sqlite3.Connection) -> None: conn.execute( "INSERT INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,) ) + else: + # Migration: v1 → v2 adds the idempotency_key column. Inspect + # the live table (rather than just the version row) because a + # rolled-out v2 client may have created the table inline + # already — additive-only ALTER is the safe path either way. + existing_version = int(existing[0]) + if existing_version < 2: + cols = {row[1] for row in conn.execute("PRAGMA table_info(step)")} + if "idempotency_key" not in cols: + conn.execute( + "ALTER TABLE step ADD COLUMN idempotency_key TEXT" + ) + conn.execute( + "UPDATE schema_version SET version = ?", (SCHEMA_VERSION,) + ) # ── Lifecycle ─────────────────────────────────────────────────── @@ -307,24 +331,60 @@ def write_intent( method: str, url: str, body_hash: str | None, + idempotency_key: str | None = None, ) -> int: """Insert the *intent* row for a step and return its ``step_id``. This is the row that survives a SIGKILL — we know we *tried* the HTTP call. The matching ``write_outcome`` flips the state once the response (or an exception) comes back. + + ``idempotency_key`` (AIP §Phase 4d) is the optional opaque token + the caller wants to associate with this step. Stored verbatim so + a same-run replay can be detected via + :meth:`find_committed_idempotent_step`. """ conn = self._connect() cur = conn.execute( """ INSERT INTO step ( - run_id, seq, intent_ts, method, url, body_hash - ) VALUES (?, ?, ?, ?, ?, ?) + run_id, seq, intent_ts, method, url, body_hash, idempotency_key + ) VALUES (?, ?, ?, ?, ?, ?, ?) """, - (self.run_id, seq, _utc_now_iso(), method, url, body_hash), + (self.run_id, seq, _utc_now_iso(), method, url, body_hash, + idempotency_key), ) return int(cur.lastrowid) + def find_committed_idempotent_step( + self, idempotency_key: str + ) -> dict[str, Any] | None: + """Return the prior committed step (if any) with this key in this run. + + AIP §Phase 4d — same-run replay protection. A new write that + carries an idempotency_key already in the ``committed`` state + should be refused (the agent retried; the prior call landed). + + Cross-run collision detection is deliberately out of scope for + v0.1: implementing it requires scanning every ``*.db`` under + ``batch/`` on each call. Document deferral in the PR. + """ + if not idempotency_key: + return None + conn = self._connect() + row = conn.execute( + """ + SELECT * FROM step + WHERE run_id = ? + AND idempotency_key = ? + AND status = 'committed' + ORDER BY seq ASC + LIMIT 1 + """, + (self.run_id, idempotency_key), + ).fetchone() + return dict(row) if row is not None else None + def write_outcome( self, *, diff --git a/src/bcli/client/_async.py b/src/bcli/client/_async.py index db93d52..835ce14 100644 --- a/src/bcli/client/_async.py +++ b/src/bcli/client/_async.py @@ -168,11 +168,20 @@ async def post( publisher: str | None = None, group: str | None = None, version: str | None = None, + idempotency_key: str | None = None, ) -> dict[str, Any]: - """POST (create) a record.""" + """POST (create) a record. + + ``idempotency_key`` (AIP §Phase 4d) is forwarded as the IETF + ``Idempotency-Key`` HTTP header. BC may not consume it today; + it lets reverse proxies / gateways apply replay protection and + keeps the ledger key consistent with what was wired out. + """ transport = self._ensure_transport() url = self._resolve_url(entity_set_name, publisher=publisher, group=group, version=version) - return await transport.post(url, json_body=body) + return await transport.post( + url, json_body=body, idempotency_key=idempotency_key, + ) async def patch( self, @@ -184,6 +193,7 @@ async def patch( publisher: str | None = None, group: str | None = None, version: str | None = None, + idempotency_key: str | None = None, ) -> dict[str, Any]: """PATCH (update) a record.""" transport = self._ensure_transport() @@ -191,7 +201,9 @@ async def patch( entity_set_name, record_id=record_id, publisher=publisher, group=group, version=version, ) - return await transport.patch(url, json_body=body, etag=etag) + return await transport.patch( + url, json_body=body, etag=etag, idempotency_key=idempotency_key, + ) async def delete( self, @@ -202,6 +214,7 @@ async def delete( publisher: str | None = None, group: str | None = None, version: str | None = None, + idempotency_key: str | None = None, ) -> dict[str, Any]: """DELETE a record.""" transport = self._ensure_transport() @@ -209,7 +222,9 @@ async def delete( entity_set_name, record_id=record_id, publisher=publisher, group=group, version=version, ) - return await transport.delete(url, etag=etag) + return await transport.delete( + url, etag=etag, idempotency_key=idempotency_key, + ) async def delete_url(self, url: str, *, etag: str = "*") -> dict[str, Any]: """DELETE an already-resolved absolute URL. @@ -235,6 +250,7 @@ async def upload_attachment( group: str | None = None, version: str | None = None, force_standard: bool = False, + idempotency_key: str | None = None, ) -> dict[str, Any]: """Upload a file as a documentAttachment linked to a parent record (two-phase). @@ -300,12 +316,16 @@ async def upload_attachment( company_id=self._profile.company_id, entity_set_name="documentAttachments", ) - metadata = await transport.post(post_url, json_body=metadata_body) + metadata = await transport.post( + post_url, json_body=metadata_body, + idempotency_key=idempotency_key, + ) else: metadata = await self.post( "documentAttachments", metadata_body, publisher=publisher, group=group, version=version, + idempotency_key=idempotency_key, ) attachment_id = metadata.get("id") or metadata.get("systemId") if not attachment_id: diff --git a/src/bcli/client/_transport.py b/src/bcli/client/_transport.py index a62f484..d0e3576 100644 --- a/src/bcli/client/_transport.py +++ b/src/bcli/client/_transport.py @@ -124,6 +124,7 @@ async def _request( content: bytes | None = None, content_type: str | None = None, etag: str | None = None, + idempotency_key: str | None = None, log_context: dict[str, str] | None = None, ) -> dict[str, Any]: """Execute an HTTP request with retry and error handling. @@ -151,6 +152,13 @@ async def _request( headers["If-Match"] = etag if content is not None: headers["Content-Type"] = content_type or "application/octet-stream" + if idempotency_key is not None: + # AIP §Phase 4d — IETF draft "Idempotency-Key" header. + # BC may not honor it server-side today; any gateway / + # reverse-proxy in front can apply replay protection, + # and we ledger the key so our own retry logic stays + # deterministic regardless of server support. + headers["Idempotency-Key"] = idempotency_key logger.debug("%s %s (attempt %d)", method, url, attempt + 1) @@ -294,13 +302,29 @@ async def get_absolute(self, url: str) -> dict[str, Any]: assert_bc_origin(url) return await self._request("GET", url) - async def post(self, url: str, *, json_body: dict[str, Any]) -> dict[str, Any]: - return await self._request("POST", url, json_body=json_body) + async def post( + self, + url: str, + *, + json_body: dict[str, Any], + idempotency_key: str | None = None, + ) -> dict[str, Any]: + return await self._request( + "POST", url, json_body=json_body, idempotency_key=idempotency_key, + ) async def patch( - self, url: str, *, json_body: dict[str, Any], etag: str = "*" + self, + url: str, + *, + json_body: dict[str, Any], + etag: str = "*", + idempotency_key: str | None = None, ) -> dict[str, Any]: - return await self._request("PATCH", url, json_body=json_body, etag=etag) + return await self._request( + "PATCH", url, json_body=json_body, etag=etag, + idempotency_key=idempotency_key, + ) async def patch_binary( self, @@ -309,14 +333,24 @@ async def patch_binary( content: bytes, content_type: str = "application/octet-stream", etag: str = "*", + idempotency_key: str | None = None, ) -> dict[str, Any]: """PATCH raw binary bytes with a custom Content-Type (e.g. attachments/content).""" return await self._request( "PATCH", url, content=content, content_type=content_type, etag=etag, + idempotency_key=idempotency_key, ) - async def delete(self, url: str, *, etag: str = "*") -> dict[str, Any]: - return await self._request("DELETE", url, etag=etag) + async def delete( + self, + url: str, + *, + etag: str = "*", + idempotency_key: str | None = None, + ) -> dict[str, Any]: + return await self._request( + "DELETE", url, etag=etag, idempotency_key=idempotency_key, + ) def _parse_bc_error(response: httpx.Response) -> tuple[str | None, str | None]: diff --git a/src/bcli/exit_codes.py b/src/bcli/exit_codes.py index be01ac6..78c8c15 100644 --- a/src/bcli/exit_codes.py +++ b/src/bcli/exit_codes.py @@ -32,6 +32,28 @@ EXIT_POLICY = 8 +# Public, ordered taxonomy. ``bcli describe`` consumes this exact map so +# AI agents can render meaningful errors from a CLI exit. The label is a +# short human-readable string — keep it stable across minor versions; +# breaking changes here are user-visible. +EXIT_CODES: dict[int, str] = { + EXIT_OK: "success", + EXIT_GENERIC_ERROR: "uncategorised error", + EXIT_USAGE: "usage error", + EXIT_AUTH: "authentication failure", + EXIT_NOT_FOUND: "not found", + EXIT_VALIDATION: "input validation", + EXIT_REMOTE_4XX: "remote 4xx", + EXIT_REMOTE_5XX: "remote 5xx", + EXIT_POLICY: "policy violation", +} + + +def describe_exit_code(code: int) -> str: + """Return the short label for an exit code, or ``"unknown"``.""" + return EXIT_CODES.get(code, "unknown") + + def exit_code_for_status(status_code: int | None) -> int: """Map an HTTP status to a CLI exit code. diff --git a/src/bcli_cli/_envelope_wrap.py b/src/bcli_cli/_envelope_wrap.py index 8a200fa..35f4c5b 100644 --- a/src/bcli_cli/_envelope_wrap.py +++ b/src/bcli_cli/_envelope_wrap.py @@ -225,14 +225,20 @@ def emit_success(self, *, bc_correlation_id: str | None = None) -> None: ) _write_if_active(self._cap, env) - def emit_failure(self, exc: BaseException) -> None: + def emit_failure( + self, + exc: BaseException, + *, + exit_code: int | None = None, + ) -> None: if not self.is_active: return status_code = getattr(exc, "status_code", None) correlation_id = getattr(exc, "correlation_id", None) - exit_code = exit_code_for_status(status_code) - if exit_code == EXIT_GENERIC_ERROR and status_code is None: - exit_code = EXIT_GENERIC_ERROR + if exit_code is None: + exit_code = exit_code_for_status(status_code) + if exit_code == EXIT_GENERIC_ERROR and status_code is None: + exit_code = EXIT_GENERIC_ERROR env = _build_envelope( self._cap, status="failed", diff --git a/src/bcli_cli/_error_handler.py b/src/bcli_cli/_error_handler.py new file mode 100644 index 0000000..b429eb5 --- /dev/null +++ b/src/bcli_cli/_error_handler.py @@ -0,0 +1,140 @@ +"""Centralized CLI error handling — exit code + remediation hint. + +AIP §Phase 4c: every error names the fix. Rather than touch every raise +site, we catch :class:`BCLIError` subclasses at the outer ``main()`` +boundary and: + +1. Map the exception class to a documented exit code (taxonomy from + §Phase 4a). +2. Append a short, executable remediation hint after the message — the + "Did you mean ..." extension to auth, registry, and profile errors. + +The handler is idempotent: if the upstream raise site already named the +fix (e.g. ``ConfigError("Run 'bcli config init' ...")``), we don't append +a duplicate. The fuzzy-match path on :class:`RegistryError` stays in the +raise site (``EndpointRegistry.resolve``); this module only adds the +"no fuzzy match → try import" fallback. +""" + +from __future__ import annotations + +import difflib + +from bcli.errors import ( + AuthError, + BCLIError, + ConfigError, + ForbiddenError, + NotFoundError, + RegistryError, + SafetyError, + ValidationError, +) +from bcli.exit_codes import ( + EXIT_AUTH, + EXIT_GENERIC_ERROR, + EXIT_NOT_FOUND, + EXIT_POLICY, + EXIT_USAGE, + EXIT_VALIDATION, + exit_code_for_status, +) + + +# ─── Exit-code mapping ─────────────────────────────────────────────── + + +def map_error_to_exit_code(exc: BaseException) -> int: + """Map a raised exception to the documented CLI exit code. + + Order: explicit error class → HTTP status_code → generic. + """ + if isinstance(exc, AuthError): + return EXIT_AUTH + if isinstance(exc, ForbiddenError): + # 403 is auth-adjacent; bias toward the AUTH code so an agent + # knows to re-authenticate or escalate permission. + return EXIT_AUTH + if isinstance(exc, (NotFoundError, RegistryError)): + return EXIT_NOT_FOUND + if isinstance(exc, ValidationError): + return EXIT_VALIDATION + if isinstance(exc, ConfigError): + return EXIT_USAGE + if isinstance(exc, SafetyError): + return EXIT_POLICY + if isinstance(exc, BCLIError): + # Fall back to HTTP-status-derived code (mirrors envelope wrap). + return exit_code_for_status(getattr(exc, "status_code", None)) + return EXIT_GENERIC_ERROR + + +# ─── Remediation hint composition ──────────────────────────────────── + + +def _login_hint(active_profile: str | None) -> str: + if active_profile: + return f"Run 'bcli auth login --profile {active_profile}' to re-authenticate." + return "Run 'bcli auth login' to authenticate." + + +def _config_init_hint() -> str: + return "Run 'bcli config init' to create a profile." + + +def _registry_import_hint() -> str: + return ( + "Run 'bcli registry import --from-metadata ' " + "or 'bcli registry import --from-postman ' " + "to register the endpoint." + ) + + +def _did_you_mean_profiles(name: str, candidates: list[str]) -> str | None: + matches = difflib.get_close_matches(name, candidates, n=3, cutoff=0.5) + if not matches: + return None + return f"Did you mean: {', '.join(matches)}?" + + +def format_error_for_cli( + exc: BaseException, + *, + active_profile: str | None, + available_profiles: list[str] | None = None, +) -> str: + """Compose the user-facing error message + remediation. + + The original exception message is kept verbatim; remediation is + appended on a second line so log-tailers can grep for either. + """ + base = str(exc) + extras: list[str] = [] + + if isinstance(exc, AuthError): + if "bcli auth login" not in base: + extras.append(_login_hint(active_profile)) + + elif isinstance(exc, ConfigError): + if "bcli config init" not in base: + extras.append(_config_init_hint()) + if available_profiles and active_profile and active_profile not in available_profiles: + hint = _did_you_mean_profiles(active_profile, available_profiles) + if hint: + extras.append(hint) + + elif isinstance(exc, RegistryError): + # Raise site already injects "Did you mean: X, Y, Z?" when fuzzy + # matches exist. Only suggest import when no such hint is there. + if "Did you mean" not in base and "bcli registry import" not in base: + extras.append(_registry_import_hint()) + + if not extras: + return base + return base + "\n " + "\n ".join(extras) + + +__all__ = [ + "format_error_for_cli", + "map_error_to_exit_code", +] diff --git a/src/bcli_cli/_progress.py b/src/bcli_cli/_progress.py new file mode 100644 index 0000000..5c8b5ba --- /dev/null +++ b/src/bcli_cli/_progress.py @@ -0,0 +1,92 @@ +"""AIP §Phase 4e — JSON progress events on a dedicated file descriptor. + +For long-running ``bcli batch run`` / ``bcli extract run`` work, agents +want per-step structured events without having to scrape stderr. We +write one JSON object per line to ``--progress-fd N``: + +:: + + {"event": "step_started", "seq": 3, "method": "POST", ...} + {"event": "step_completed", "seq": 3, "status": "committed", ...} + +Stderr stays human-readable (progress bars, Rich tables); the fd channel +is structured and stable. Using a separate fd from ``--result-fd`` +(Phase 2) lets a caller demux: result envelope is one final object, +progress events are a stream. + +The emitter is a no-op when ``fd is None`` so command code can call +``emit()`` unconditionally without a guard. +""" + +from __future__ import annotations + +import json +import os +from datetime import datetime, timezone +from typing import Any + + +def _is_real_value(v: object) -> bool: + """Mirror of ``_envelope_wrap._is_real_value`` — recognise Typer defaults. + + Tests call command functions directly without keyword arguments, so + Typer's ``OptionInfo`` instances leak through as the default. Treat + them as "not provided" so the no-fd path is exercised correctly. + """ + if v is None: + return False + cls_name = type(v).__name__ + if cls_name in {"OptionInfo", "ArgumentInfo", "ParameterInfo"}: + return False + return True + + +def _now_iso_utc() -> str: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +class ProgressEmitter: + """Write JSON-lines to a file descriptor. + + Construction with ``fd=None`` produces a no-op instance; callers can + invoke :meth:`emit` unconditionally. + """ + + def __init__(self, fd: int | None) -> None: + self._fd = fd if _is_real_value(fd) else None + self._closed = False + + @property + def is_active(self) -> bool: + return self._fd is not None and not self._closed + + def emit(self, **fields: Any) -> None: + """Write one JSON object to the fd. Injects an ISO-8601 ``ts``. + + Silently drops the event if no fd is set; that's by design so + command code stays simple. + """ + if not self.is_active: + return + event = {"ts": _now_iso_utc(), **fields} + payload = json.dumps(event, default=str) + "\n" + try: + os.write(self._fd, payload.encode("utf-8")) # type: ignore[arg-type] + except OSError: + # The consumer closed its end of the pipe — disable further + # writes rather than crash the whole batch. + self._closed = True + + def close(self) -> None: + """Close the fd if we still own it. Idempotent.""" + if self._closed or self._fd is None: + self._closed = True + return + try: + os.close(self._fd) + except OSError: + pass + self._closed = True + + +__all__ = ["ProgressEmitter", "_is_real_value"] diff --git a/src/bcli_cli/_safety.py b/src/bcli_cli/_safety.py index d7014f0..ef6a75c 100644 --- a/src/bcli_cli/_safety.py +++ b/src/bcli_cli/_safety.py @@ -18,6 +18,7 @@ import typer from rich.console import Console +from bcli.exit_codes import EXIT_POLICY from bcli_cli._state import state _console = Console(stderr=True) @@ -33,10 +34,10 @@ def confirm_write_or_exit(method: str, endpoint: str, yes: bool = False) -> None print a warning to stderr but proceed (scripted use). * Profile sets ``disable_writes = true``, interactive TTY, ``yes`` is False → print warning, prompt, accept only the literal string - ``"yes"``; anything else exits 1. + ``"yes"``; anything else exits with ``EXIT_POLICY`` (8). * Profile sets ``disable_writes = true``, *non-interactive* (no - TTY), ``yes`` is False → exit 1 immediately. Scripts must opt in - with ``--yes``. + TTY), ``yes`` is False → exit with ``EXIT_POLICY`` (8) immediately. + Scripts must opt in with ``--yes``. """ profile = state.profile if not getattr(profile, "disable_writes", False): @@ -61,7 +62,7 @@ def confirm_write_or_exit(method: str, endpoint: str, yes: bool = False) -> None "[red]✗ Refusing to write: non-interactive session and " "--yes was not passed.[/red]" ) - raise typer.Exit(1) + raise typer.Exit(EXIT_POLICY) answer = typer.prompt( "Type 'yes' to proceed, anything else to cancel", @@ -70,4 +71,4 @@ def confirm_write_or_exit(method: str, endpoint: str, yes: bool = False) -> None ) if answer.strip().lower() != "yes": _console.print("[red]✗ Cancelled.[/red]") - raise typer.Exit(1) + raise typer.Exit(EXIT_POLICY) diff --git a/src/bcli_cli/app.py b/src/bcli_cli/app.py index ec1ef05..97dc6e8 100644 --- a/src/bcli_cli/app.py +++ b/src/bcli_cli/app.py @@ -219,10 +219,16 @@ def _emit_command_summary() -> None: def main() -> None: """Console-script entry point. - Wraps the Typer ``app`` with a SIGPIPE handler so that ``bcli ... | head`` - and similar pipe-truncating consumers terminate the CLI silently — - matching the Unix idiom of ``cat``, ``grep`` and friends — instead of - surfacing ``BrokenPipeError`` at interpreter shutdown. + Wraps the Typer ``app`` with: + + 1. A SIGPIPE handler so ``bcli ... | head`` and similar pipe- + truncating consumers terminate the CLI silently — matching the + Unix idiom of ``cat``, ``grep`` and friends — instead of + surfacing ``BrokenPipeError`` at interpreter shutdown. + 2. AIP §Phase 4c centralized error handler — :class:`BCLIError` + subclasses are mapped to documented exit codes and the message + is enriched with a "Did you mean / run X" remediation hint. + Avoids touching every raise site. On Windows the ``signal.SIGPIPE`` constant is absent; the safety-net ``try`` below catches the error in that path. @@ -246,6 +252,36 @@ def main() -> None: except Exception: pass sys.exit(0) + except SystemExit: + # ``typer.Exit`` raises SystemExit — propagate to let the + # configured exit code surface unchanged. + raise + except BaseException as exc: # noqa: BLE001 + # AIP §Phase 4c — central error handler. Map BCLIError subclasses + # to documented exit codes and inject a remediation hint. Other + # exceptions pass through so a real crash still surfaces a traceback. + from bcli.errors import BCLIError + from bcli_cli._error_handler import ( + format_error_for_cli, + map_error_to_exit_code, + ) + + if not isinstance(exc, BCLIError): + raise + + active_profile = state.profile_name + available: list[str] | None = None + try: + if state._config is not None: + available = list(state._config.profiles.keys()) + except Exception: # noqa: BLE001 + available = None + + msg = format_error_for_cli( + exc, active_profile=active_profile, available_profiles=available, + ) + sys.stderr.write(f"Error: {msg}\n") + sys.exit(map_error_to_exit_code(exc)) if __name__ == "__main__": diff --git a/src/bcli_cli/commands/attach_cmd.py b/src/bcli_cli/commands/attach_cmd.py index 14d8ebd..60bfccd 100644 --- a/src/bcli_cli/commands/attach_cmd.py +++ b/src/bcli_cli/commands/attach_cmd.py @@ -58,6 +58,11 @@ def upload_command( "--result-fd", help="Write the JSON result envelope to this file descriptor and close it.", ), + idempotency_key: Optional[str] = typer.Option( + None, + "--idempotency-key", + help="Opaque token forwarded as Idempotency-Key header on the metadata POST (AIP §Phase 4d).", + ), ) -> None: """Upload a file as a documentAttachment linked to an existing parent record. @@ -130,6 +135,7 @@ def upload_command( group=group, version=version, force_standard=standard, + idempotency_key=idempotency_key, ), method="UPLOAD", endpoint="documentAttachments", @@ -239,6 +245,7 @@ async def _execute_attach( group: Optional[str], version: Optional[str], force_standard: bool = False, + idempotency_key: Optional[str] = None, ) -> dict: async with state.make_async_client() as client: return await client.upload_attachment( @@ -251,6 +258,7 @@ async def _execute_attach( group=group, version=version, force_standard=force_standard, + idempotency_key=idempotency_key, ) diff --git a/src/bcli_cli/commands/batch_cmd.py b/src/bcli_cli/commands/batch_cmd.py index 7db07fd..173cb0b 100644 --- a/src/bcli_cli/commands/batch_cmd.py +++ b/src/bcli_cli/commands/batch_cmd.py @@ -25,6 +25,7 @@ from rich.table import Table from bcli.batch.ledger import Ledger +from bcli.exit_codes import EXIT_POLICY from bcli_cli._envelope_wrap import capture, validate_flags from bcli_cli._state import state from bcli_cli.output import format_output, print_context_banner @@ -229,6 +230,11 @@ def run_batch( "--result-fd", help="Write the JSON result envelope to this file descriptor and close it.", ), + progress_fd: int | None = typer.Option( + None, + "--progress-fd", + help="Stream JSON ``step_started`` / ``step_completed`` events to this fd (AIP §Phase 4e).", + ), ) -> None: """Execute a YAML batch file (sequence of API calls). @@ -363,22 +369,41 @@ def run_batch( try: confirm_write_or_exit("BATCH WRITE", preview, yes=yes) except typer.Exit: - # Policy refusal. Mark ledger + envelope as failed so the - # operator sees a consistent attestation, then re-raise. + # Policy refusal. Keep failure-path ordering consistent + # with the BaseException branch below: emit the envelope + # BEFORE finalizing the ledger so both attestations + # describe the same outcome. Pass the explicit + # EXIT_POLICY so the envelope's ``exit_code`` matches + # what the CLI actually exits with. + cap.emit_failure( + RuntimeError("BATCH WRITE refused by disable_writes gate"), + exit_code=EXIT_POLICY, + ) final_state = ledger.compute_run_state(run_id) if final_state not in {"partially_committed", "failed"}: final_state = "failed" ledger.finish_run(run_id, final_state) ledger.close() - cap.emit_failure(RuntimeError("BATCH WRITE refused by disable_writes gate")) raise output_format = format + from bcli_cli._progress import ProgressEmitter + + progress = ProgressEmitter(fd=progress_fd) try: - results = asyncio.run( - _execute_batch(steps, context=context, output_format=output_format, ledger=ledger) - ) + try: + results = asyncio.run( + _execute_batch( + steps, context=context, output_format=output_format, + ledger=ledger, progress=progress, + ) + ) + finally: + # Close the progress fd as soon as the batch finishes so + # the consumer EOFs and can stop reading. The result + # envelope still lands separately on --result-out/-fd. + progress.close() succeeded = sum(1 for r in results if r.get("status") == "ok") failed_count = sum(1 for r in results if r.get("status") == "error") @@ -491,17 +516,26 @@ async def _execute_batch( context: Any | None = None, output_format: str | None = None, ledger: Ledger | None = None, + progress: Any | None = None, ) -> list[dict]: """Execute the batch steps, writing intent + outcome rows around each HTTP call when a ``ledger`` is supplied. - The ``ledger`` argument is keyword-only and optional so existing + ``ledger`` and ``progress`` are keyword-only and optional so existing integration tests that call this function directly continue to work - unchanged. + unchanged. ``progress`` is a :class:`bcli_cli._progress.ProgressEmitter` + that receives a JSON event before and after each step (AIP §Phase 4e). """ + import time as _time + from bcli.workflow._models import StepResult, WorkflowContext from bcli.workflow._resolver import resolve_references + # Lightweight no-op stand-in if the caller didn't pass an emitter. + class _NullProgress: + def emit(self, **_: Any) -> None: ... + prog = progress if progress is not None else _NullProgress() + results: list[dict] = [] async with state.make_async_client() as client: for i, step in enumerate(steps, 1): @@ -527,9 +561,70 @@ async def _execute_batch( params = step.get("params", {}) record_id = step.get("id") etag = step.get("etag", "*") + # AIP §Phase 4d — optional per-step idempotency token; same-run + # replay protection is enforced below. + idempotency_key = step.get("idempotency_key") console.print(f" [dim]Step {i}:[/dim] {action.upper()} {endpoint}...", end=" ") + # ── Same-run idempotency replay (AIP §Phase 4d). ── + # If the step carries an idempotency_key and a *prior* step in + # this same run already committed under that key, skip the + # HTTP call entirely and surface a "replayed" result. We do + # this BEFORE write_intent so a duplicate intent row isn't + # left behind — the prior committed row is the audit truth. + if ( + ledger is not None + and idempotency_key + and action in {"post", "patch", "delete"} + ): + prior = ledger.find_committed_idempotent_step(idempotency_key) + if prior is not None: + console.print( + f"[yellow]↺[/yellow] replayed (idempotency_key='{idempotency_key}' " + f"already committed at seq={prior['seq']})" + ) + replayed_entry = { + "step": i, + "name": step_name, + "action": action, + "endpoint": endpoint, + "status": "ok", + "replayed": True, + "prior_seq": prior["seq"], + "prior_step_id": prior["step_id"], + "prior_bc_correlation_id": prior.get("bc_correlation_id"), + "idempotency_key": idempotency_key, + } + results.append(replayed_entry) + if isinstance(context, WorkflowContext): + context.set_result( + step_name, + StepResult( + name=step_name, action=action, endpoint=endpoint, + status="ok", data={}, + ), + ) + # Emit a started/completed pair so the progress stream + # still tells the agent "this step happened" with the + # ``replayed`` outcome. + step_method_replay = action.upper() + started_ns_replay = _time.monotonic_ns() + prog.emit( + event="step_started", + seq=i, name=step_name, method=step_method_replay, + endpoint=endpoint, replayed=True, + ) + prog.emit( + event="step_completed", + seq=i, name=step_name, method=step_method_replay, + endpoint=endpoint, status="replayed", + idempotency_key=idempotency_key, + prior_seq=prior["seq"], + duration_ms=max(0, int((_time.monotonic_ns() - started_ns_replay) / 1_000_000)), + ) + continue + # ── Ledger: write intent BEFORE the HTTP call. ── # The resolved URL is computed lazily via the client's # registry. If resolution fails (or the client is a mock @@ -556,8 +651,25 @@ async def _execute_batch( method=action.upper() if action != "get" else "GET", url=intent_url, body_hash=_body_hash(data) if data is not None else None, + idempotency_key=idempotency_key, ) + # AIP §Phase 4e — emit one ``step_started`` event per step. + step_method = action.upper() if action != "get" else "GET" + step_started_ns = _time.monotonic_ns() + prog.emit( + event="step_started", + seq=i, + name=step_name, + method=step_method, + endpoint=endpoint, + ) + + # Sentinel that each branch updates so the matching + # ``step_completed`` event reports the right outcome. + step_terminal_status = "unknown" + step_terminal_error: str | None = None + try: if action == "get": from bcli.odata._query import Query @@ -594,9 +706,13 @@ async def _execute_batch( bc_correlation_id=None, error_message=None, rollback_url=None, # GETs are not rollback-eligible ) + step_terminal_status = "committed" elif action == "post": - result = await client.post(endpoint, data or {}) + result = await client.post( + endpoint, data or {}, + idempotency_key=idempotency_key, + ) console.print("[green]✓[/green] created") result_entry = {"step": i, "name": step_name, "action": action, "endpoint": endpoint, "status": "ok", "data": [result] if result else []} results.append(result_entry) @@ -617,6 +733,7 @@ async def _execute_batch( bc_correlation_id=None, error_message=None, rollback_url=rb_url, ) + step_terminal_status = "committed" elif action == "patch": if not record_id: @@ -629,8 +746,23 @@ async def _execute_batch( error_message="missing id", rollback_url=None, ) + step_terminal_status = "failed" + step_terminal_error = "missing id" + # Emit step_completed before the early ``continue`` + # so the agent sees a complete pair. + prog.emit( + event="step_completed", + seq=i, name=step_name, method=step_method, + endpoint=endpoint, status=step_terminal_status, + error=step_terminal_error, + duration_ms=max(0, int((_time.monotonic_ns() - step_started_ns) / 1_000_000)), + ) continue - result = await client.patch(endpoint, record_id, data or {}, etag=etag) + result = await client.patch( + endpoint, record_id, data or {}, + etag=etag, + idempotency_key=idempotency_key, + ) console.print("[green]✓[/green] updated") result_entry = {"step": i, "name": step_name, "action": action, "endpoint": endpoint, "status": "ok", "data": [result] if result else []} results.append(result_entry) @@ -650,6 +782,7 @@ async def _execute_batch( # ``bcli batch rollback``. rollback_url=None, ) + step_terminal_status = "committed" elif action == "delete": if not record_id: @@ -662,8 +795,21 @@ async def _execute_batch( error_message="missing id", rollback_url=None, ) + step_terminal_status = "failed" + step_terminal_error = "missing id" + prog.emit( + event="step_completed", + seq=i, name=step_name, method=step_method, + endpoint=endpoint, status=step_terminal_status, + error=step_terminal_error, + duration_ms=max(0, int((_time.monotonic_ns() - step_started_ns) / 1_000_000)), + ) continue - await client.delete(endpoint, record_id, etag=etag) + await client.delete( + endpoint, record_id, + etag=etag, + idempotency_key=idempotency_key, + ) console.print("[green]✓[/green] deleted") result_entry = {"step": i, "name": step_name, "action": action, "endpoint": endpoint, "status": "ok"} results.append(result_entry) @@ -680,6 +826,7 @@ async def _execute_batch( bc_correlation_id=None, error_message=None, rollback_url=None, # DELETE has no clean inverse ) + step_terminal_status = "committed" else: console.print(f"[yellow]? unknown action '{action}'[/yellow]") @@ -691,6 +838,7 @@ async def _execute_batch( error_message=f"unknown action '{action}'", rollback_url=None, ) + step_terminal_status = "skipped" except Exception as e: console.print(f"[red]✗ {e}[/red]") @@ -707,6 +855,19 @@ async def _execute_batch( error_message=str(e), rollback_url=None, ) + step_terminal_status = "failed" + step_terminal_error = str(e) + + # AIP §Phase 4e — emit ``step_completed`` for every step that + # didn't ``continue`` out early (the early-continue branches + # emit their own completed event before exiting). + prog.emit( + event="step_completed", + seq=i, name=step_name, method=step_method, + endpoint=endpoint, status=step_terminal_status, + error=step_terminal_error, + duration_ms=max(0, int((_time.monotonic_ns() - step_started_ns) / 1_000_000)), + ) return results diff --git a/src/bcli_cli/commands/delete_cmd.py b/src/bcli_cli/commands/delete_cmd.py index 3f17693..fcf89ba 100644 --- a/src/bcli_cli/commands/delete_cmd.py +++ b/src/bcli_cli/commands/delete_cmd.py @@ -35,6 +35,11 @@ def delete_command( "--result-fd", help="Write the JSON result envelope to this file descriptor and close it.", ), + idempotency_key: Optional[str] = typer.Option( + None, + "--idempotency-key", + help="Opaque token forwarded as Idempotency-Key header (AIP §Phase 4d).", + ), ) -> None: """DELETE a record.""" validate_flags(result_out, result_fd) @@ -82,6 +87,7 @@ def delete_command( asyncio.run(_audited_delete( endpoint, record_id, etag=etag, publisher=publisher, group=group, version=version, + idempotency_key=idempotency_key, )) cap.emit_success() console.print(f"[green]✓[/green] Deleted {endpoint}({record_id})") diff --git a/src/bcli_cli/commands/describe_cmd.py b/src/bcli_cli/commands/describe_cmd.py index 76e5b33..53856be 100644 --- a/src/bcli_cli/commands/describe_cmd.py +++ b/src/bcli_cli/commands/describe_cmd.py @@ -348,6 +348,8 @@ def _build_payload() -> dict[str, Any]: else: registry_projection = _project_registry(registry, tier_2_enabled=tier_2_enabled) + from bcli.exit_codes import EXIT_CODES + return { "version": "0.1", "tool": "bcli", @@ -356,6 +358,10 @@ def _build_payload() -> dict[str, Any]: "commands": commands, "registry": registry_projection, "profile_constraints": _project_profile_constraints(profile), + # AIP §Phase 4a — project the documented exit-code taxonomy so an + # agent runtime can render a meaningful error from any non-zero + # bcli exit. + "exit_codes": {str(code): label for code, label in EXIT_CODES.items()}, } diff --git a/src/bcli_cli/commands/extract_cmd.py b/src/bcli_cli/commands/extract_cmd.py index 04af89a..7eab222 100644 --- a/src/bcli_cli/commands/extract_cmd.py +++ b/src/bcli_cli/commands/extract_cmd.py @@ -52,6 +52,10 @@ def run_command( False, "--overwrite", help="Overwrite existing output files instead of erroring.", ), + progress_fd: Optional[int] = typer.Option( + None, "--progress-fd", + help="Stream JSON ``extract_started`` / ``extract_completed`` events to this fd (AIP §Phase 4e).", + ), ) -> None: """Extract structured records from a PDF and emit batch.yaml + sidecar. @@ -88,14 +92,46 @@ def run_command( f"([dim]{cfg.extract.model}[/dim])" ) + # AIP §Phase 4e — emit one ``extract_started`` event before the + # extraction call and a matching ``extract_completed`` after. + from bcli_cli._progress import ProgressEmitter + import time as _time + + progress = ProgressEmitter(fd=progress_fd) + started_ns = _time.monotonic_ns() + progress.emit( + event="extract_started", + schema=schema_obj.name, + pdf=str(pdf_path), + backend=cfg.extract.backend, + ) + try: result = extractor.extract(pdf_path, schema_obj) except ExtractError as e: + progress.emit( + event="extract_completed", + schema=schema_obj.name, + pdf=str(pdf_path), + status="failed", + error=str(e), + duration_ms=max(0, int((_time.monotonic_ns() - started_ns) / 1_000_000)), + ) + progress.close() console.print(f"[red]Extract failed:[/red] {e}") raise typer.Exit(1) if not result.records: joined = "; ".join(result.warnings) if result.warnings else "no warnings" + progress.emit( + event="extract_completed", + schema=schema_obj.name, + pdf=str(pdf_path), + status="empty", + warnings=list(result.warnings or []), + duration_ms=max(0, int((_time.monotonic_ns() - started_ns) / 1_000_000)), + ) + progress.close() console.print( f"[yellow]No records extracted.[/yellow] {joined}" ) @@ -108,6 +144,17 @@ def run_command( render_sidecar_json(result, schema_obj, source_pdf=pdf_path), encoding="utf-8" ) + progress.emit( + event="extract_completed", + schema=schema_obj.name, + pdf=str(pdf_path), + status="ok", + record_count=len(result.records), + warnings=list(result.warnings or []), + duration_ms=max(0, int((_time.monotonic_ns() - started_ns) / 1_000_000)), + ) + progress.close() + console.print( f"[green]✓[/green] {len(result.records)} record(s) → " f"[bold]{out_yaml}[/bold]" diff --git a/src/bcli_cli/commands/patch_cmd.py b/src/bcli_cli/commands/patch_cmd.py index be8a914..0a64f12 100644 --- a/src/bcli_cli/commands/patch_cmd.py +++ b/src/bcli_cli/commands/patch_cmd.py @@ -37,6 +37,11 @@ def patch_command( "--result-fd", help="Write the JSON result envelope to this file descriptor and close it.", ), + idempotency_key: Optional[str] = typer.Option( + None, + "--idempotency-key", + help="Opaque token forwarded as Idempotency-Key header (AIP §Phase 4d).", + ), ) -> None: """PATCH (update) an existing record.""" validate_flags(result_out, result_fd) @@ -86,6 +91,7 @@ def patch_command( result = asyncio.run(_audited_patch( endpoint, record_id, body, etag=etag, publisher=publisher, group=group, version=version, + idempotency_key=idempotency_key, )) cap.emit_success() format_output([result] if result else [], output_format) diff --git a/src/bcli_cli/commands/post_cmd.py b/src/bcli_cli/commands/post_cmd.py index 0a054b6..49d8843 100644 --- a/src/bcli_cli/commands/post_cmd.py +++ b/src/bcli_cli/commands/post_cmd.py @@ -35,6 +35,11 @@ def post_command( "--result-fd", help="Write the JSON result envelope to this file descriptor and close it.", ), + idempotency_key: Optional[str] = typer.Option( + None, + "--idempotency-key", + help="Opaque token forwarded as Idempotency-Key header (AIP §Phase 4d).", + ), ) -> None: """POST (create) a new record.""" validate_flags(result_out, result_fd) @@ -83,6 +88,7 @@ def post_command( result = asyncio.run(_audited_post( endpoint, body, publisher=publisher, group=group, version=version, + idempotency_key=idempotency_key, )) cap.extract_record_id_from(result) cap.emit_success() @@ -111,6 +117,7 @@ async def _audited_post(endpoint, body, **kwargs): async def _execute_post(endpoint, body, **kwargs): async with state.make_async_client() as client: + # ``client.post`` accepts ``idempotency_key``; passthrough. return await client.post(endpoint, body, **kwargs) diff --git a/src/bcli_cli/output/_formatters.py b/src/bcli_cli/output/_formatters.py index d688810..7ce4a07 100644 --- a/src/bcli_cli/output/_formatters.py +++ b/src/bcli_cli/output/_formatters.py @@ -32,15 +32,22 @@ def detect_default_format() -> str: """Pick a sensible default format based on environment. - AI coding agents (Claude Code, etc.) and piped/redirected stdout - get a markdown table — readable, parseable, no ANSI escapes or - box-drawing characters. Interactive TTYs get the rich table. - - On Windows, classic PowerShell pretends to be a TTY even when its - stdout is being captured by a parent process (e.g. an AI agent's - Bash tool), and renders rich's UTF-8 box-drawing as `�` mojibake on - the default codepage. So we treat anything-but-Windows-Terminal as - non-table by default. Set ``BCLI_FORMAT=table`` to force it. + AIP §Phase 4b — when stdout isn't a TTY the consumer is programmatic + (pipe, redirect, CI step, agent runtime). Emit JSON: the canonical + machine-readable shape, no ANSI/box-drawing characters, no + ambiguity. Interactive TTYs still get the rich table. + + Explicit user hints continue to win: + + * ``BCLI_FORMAT=`` — pin any format. + * ``CLAUDECODE`` / ``BCLI_AGENT`` — markdown (legacy AI-agent + semantics; agents that prefer JSON now can just pass + ``--format json`` or unset the env var). + + On Windows, classic PowerShell pretends to be a TTY even when + captured. ``rich`` renders box-drawing as ``?`` mojibake there, + so anything-but-Windows-Terminal stays on markdown. Setting + ``BCLI_FORMAT=table`` forces tables if the user prefers. """ if os.environ.get("BCLI_FORMAT"): return os.environ["BCLI_FORMAT"] @@ -48,7 +55,9 @@ def detect_default_format() -> str: if os.environ.get("CLAUDECODE") or os.environ.get("BCLI_AGENT"): return "markdown" if not sys.stdout.isatty(): - return "markdown" + # Phase 4b: pipes / redirects default to JSON for programmatic + # consumers. Explicit ``--format`` on the CLI always wins. + return "json" if sys.platform == "win32" and not os.environ.get("WT_SESSION"): # Legacy console host (conhost.exe) — table rendering is unreliable. # Windows Terminal sets WT_SESSION; keep tables there. diff --git a/tests/test_batch_ledger/test_idempotency_replay.py b/tests/test_batch_ledger/test_idempotency_replay.py new file mode 100644 index 0000000..1be8704 --- /dev/null +++ b/tests/test_batch_ledger/test_idempotency_replay.py @@ -0,0 +1,225 @@ +"""Same-run idempotency replay protection — integration tests. + +PR #18 review: ``Ledger.find_committed_idempotent_step`` was a primitive +without production call sites. This module covers the wiring through +``bcli batch run``: + +* Two mutating steps with the same ``idempotency_key`` inside one + ``batch run`` invocation → the second is **replayed** (no HTTP fired, + no duplicate ledger row, ``status="replayed"`` on the result entry). +* Different ``idempotency_key`` values do NOT collide. +* No ``idempotency_key`` on either step is the existing behavior — both + fire HTTP independently. + +Cross-run replay is explicitly out of scope for v0.1 (see PR body). +""" + +from __future__ import annotations + +import sqlite3 +import textwrap +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import pytest + +from bcli.config._model import BCConfig, BCDefaults, BCProfile +from bcli_cli._state import state +from bcli_cli.commands.batch_cmd import run_batch + + +# ─── Fixtures (mirror tests/test_batch_ledger/test_batch_cmd_ledger.py) ─ + + +def _writable_config() -> BCConfig: + return BCConfig( + defaults=BCDefaults(profile="dev"), + profiles={ + "dev": BCProfile( + tenant_id="t1", + environment="Sandbox", + company_id="c-1", + disable_writes=False, + ), + }, + ) + + +@pytest.fixture +def writable_state(): + state._config = _writable_config() + state._registry = None + state.profile_name = None + state.dry_run = False + state.quiet = True + yield + state._config = None + state._registry = None + + +@pytest.fixture +def ledger_home(tmp_path, monkeypatch): + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + yield tmp_path / ".config" / "bcli" / "batch" + + +@pytest.fixture +def fake_client(): + c = AsyncMock() + c.__aenter__ = AsyncMock(return_value=c) + c.__aexit__ = AsyncMock(return_value=False) + c.post = AsyncMock(return_value={"id": "rec-1", "systemId": "rec-1"}) + c.patch = AsyncMock(return_value={"id": "rec-1"}) + c.delete = AsyncMock(return_value=None) + c._resolve_url = lambda entity, record_id=None, **_: ( + f"https://x/{entity}({record_id})" if record_id else f"https://x/{entity}" + ) + return c + + +def _write_yaml(tmp_path: Path, name: str, content: str) -> Path: + f = tmp_path / name + f.write_text(textwrap.dedent(content).strip(), encoding="utf-8") + return f + + +# ─── Same-run replay ──────────────────────────────────────────────── + + +class TestSameRunIdempotencyReplay: + """Two steps with the same ``idempotency_key`` in one batch run. + + The second step must be replayed: no second HTTP call, no second + ``step`` row with that key in ``committed`` status, ``replayed=True`` + on the result entry. + """ + + def test_duplicate_key_replays_second_step( + self, writable_state, ledger_home, fake_client, tmp_path, + ): + yaml_file = _write_yaml(tmp_path, "dup.yaml", """ + name: dup-idempotent + steps: + - name: s1 + action: post + endpoint: vendors + data: {displayName: "Acme"} + idempotency_key: op-shared + - name: s2 + action: post + endpoint: vendors + data: {displayName: "Acme again"} + idempotency_key: op-shared + """) + + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + run_batch( + file=yaml_file, dry_run=False, output=None, format=None, + set_params=None, params_file=None, yes=False, + ) + + # The fake POST was awaited exactly once (the first step). The + # second step short-circuited on the replay path. + assert fake_client.post.await_count == 1, ( + f"second step must replay (no second POST); " + f"got await_count={fake_client.post.await_count}" + ) + + # The first step's idempotency_key landed in the ledger and was + # committed; the second step's intent row does NOT exist. + dbs = list(ledger_home.glob("*.db")) + assert len(dbs) == 1, dbs + with sqlite3.connect(dbs[0]) as conn: + rows = conn.execute( + "SELECT seq, status, idempotency_key FROM step " + "WHERE idempotency_key = ? ORDER BY seq", + ("op-shared",), + ).fetchall() + # Only the first step's row is persisted; the replayed second + # step does NOT write a duplicate intent. + assert rows == [(1, "committed", "op-shared")], rows + + def test_distinct_keys_both_fire_http( + self, writable_state, ledger_home, fake_client, tmp_path, + ): + """Sanity: different keys don't trigger replay.""" + yaml_file = _write_yaml(tmp_path, "two-keys.yaml", """ + name: two-distinct-keys + steps: + - name: s1 + action: post + endpoint: vendors + data: {displayName: "Acme"} + idempotency_key: op-A + - name: s2 + action: post + endpoint: vendors + data: {displayName: "Beta"} + idempotency_key: op-B + """) + + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + run_batch( + file=yaml_file, dry_run=False, output=None, format=None, + set_params=None, params_file=None, yes=False, + ) + + assert fake_client.post.await_count == 2, ( + "distinct keys must not trigger replay" + ) + + dbs = list(ledger_home.glob("*.db")) + with sqlite3.connect(dbs[0]) as conn: + rows = conn.execute( + "SELECT seq, status, idempotency_key FROM step ORDER BY seq", + ).fetchall() + assert rows == [ + (1, "committed", "op-A"), + (2, "committed", "op-B"), + ], rows + + def test_no_key_no_replay( + self, writable_state, ledger_home, fake_client, tmp_path, + ): + """Steps without an idempotency_key keep existing behavior: + every step fires HTTP and lands an intent + outcome row.""" + yaml_file = _write_yaml(tmp_path, "no-keys.yaml", """ + name: no-keys + steps: + - name: s1 + action: post + endpoint: vendors + data: {displayName: "Acme"} + - name: s2 + action: post + endpoint: vendors + data: {displayName: "Beta"} + """) + + with patch( + "bcli_cli.commands.batch_cmd.state.make_async_client", + return_value=fake_client, + ): + run_batch( + file=yaml_file, dry_run=False, output=None, format=None, + set_params=None, params_file=None, yes=False, + ) + + assert fake_client.post.await_count == 2 + + dbs = list(ledger_home.glob("*.db")) + with sqlite3.connect(dbs[0]) as conn: + rows = conn.execute( + "SELECT seq, status, idempotency_key FROM step ORDER BY seq", + ).fetchall() + assert rows == [ + (1, "committed", None), + (2, "committed", None), + ], rows diff --git a/tests/test_batch_ledger/test_ledger_idempotency.py b/tests/test_batch_ledger/test_ledger_idempotency.py new file mode 100644 index 0000000..cce2c5a --- /dev/null +++ b/tests/test_batch_ledger/test_ledger_idempotency.py @@ -0,0 +1,214 @@ +"""Ledger schema v2 — idempotency_key column + migration (AIP §Phase 4d). + +Phase 3 shipped schema_version=1. Phase 4d adds: + +* New ``step.idempotency_key`` TEXT column (nullable for un-keyed writes). +* SCHEMA_VERSION bumped to ``2``; opening an existing v1 ledger DB runs + ``ALTER TABLE step ADD COLUMN idempotency_key TEXT`` then stamps the + version row — preserves existing rows. +* ``Ledger.write_intent`` accepts ``idempotency_key=`` and persists it. +* ``Ledger.find_committed_idempotent_step(key)`` returns the prior + ``StepRow`` if any step in this run already committed with that key — + the same-run replay protection. + +Cross-run collision detection is *out of scope* for v0.1; documented in +the PR body. Same-run protection covers the agent-retry case where the +runtime re-issues the same step within one batch. +""" + +from __future__ import annotations + +import sqlite3 +from pathlib import Path + +import pytest + +from bcli.batch.ledger import SCHEMA_VERSION, Ledger + + +@pytest.fixture +def base_dir(tmp_path: Path) -> Path: + d = tmp_path / "batch" + d.mkdir() + return d + + +# ─── Schema version + column ───────────────────────────────────────── + + +def test_schema_version_bumped_to_2(): + """The module constant is the public version marker.""" + assert SCHEMA_VERSION == 2 + + +def test_fresh_ledger_has_idempotency_key_column(base_dir): + ledger = Ledger(run_id="x", base_dir=base_dir) + ledger.start_run( + manifest_path="m.yaml", manifest_hash="h", + profile="p", environment="Sandbox", company="C", + ) + with sqlite3.connect(ledger.db_path) as conn: + cols = {row[1] for row in conn.execute("PRAGMA table_info(step)")} + assert "idempotency_key" in cols + ledger.close() + + +def test_migration_from_v1_preserves_rows(base_dir, tmp_path: Path): + """Open a hand-crafted v1 DB; auto-migrate; existing step row survives.""" + db_path = base_dir / "legacy.db" + with sqlite3.connect(db_path) as conn: + conn.execute( + """ + CREATE TABLE run ( + run_id TEXT PRIMARY KEY, + manifest_path TEXT, manifest_hash TEXT, + profile TEXT, environment TEXT, company TEXT, + state TEXT, + started_at TEXT, finished_at TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE step ( + step_id INTEGER PRIMARY KEY AUTOINCREMENT, + run_id TEXT NOT NULL, + seq INTEGER NOT NULL, + intent_ts TEXT NOT NULL, + method TEXT NOT NULL, + url TEXT NOT NULL, + body_hash TEXT, + outcome_ts TEXT, + status TEXT, + bc_correlation_id TEXT, + error_message TEXT, + rollback_url TEXT + ) + """ + ) + conn.execute("CREATE TABLE schema_version (version INTEGER PRIMARY KEY)") + conn.execute("INSERT INTO schema_version VALUES (1)") + conn.execute( + "INSERT INTO run (run_id, state, started_at) VALUES ('legacy', 'running', 'T0')" + ) + conn.execute( + "INSERT INTO step (run_id, seq, intent_ts, method, url) " + "VALUES ('legacy', 1, 'T0', 'POST', 'http://x/y')" + ) + + # Opening triggers schema migration via _ensure_schema. + ledger = Ledger(run_id="legacy", base_dir=base_dir) + steps = ledger.get_steps("legacy") + assert len(steps) == 1, "migration must preserve existing step rows" + assert steps[0]["method"] == "POST" + # New column is present and defaulted to NULL. + assert "idempotency_key" in steps[0] + assert steps[0]["idempotency_key"] is None + + # schema_version row updated to 2. + with sqlite3.connect(ledger.db_path) as conn: + version = conn.execute("SELECT version FROM schema_version").fetchone()[0] + assert version == 2 + ledger.close() + + +# ─── write_intent persists the key ─────────────────────────────────── + + +def test_write_intent_persists_idempotency_key(base_dir): + ledger = Ledger(run_id="r1", base_dir=base_dir) + ledger.start_run( + manifest_path="m", manifest_hash="h", + profile="p", environment="Sandbox", company="C", + ) + step_id = ledger.write_intent( + seq=1, method="POST", url="http://x/y", body_hash="abc", + idempotency_key="op-key-123", + ) + rows = ledger.get_steps("r1") + assert len(rows) == 1 + assert rows[0]["step_id"] == step_id + assert rows[0]["idempotency_key"] == "op-key-123" + ledger.close() + + +def test_write_intent_without_key_keeps_column_null(base_dir): + """Backwards-compat: existing callers don't need to pass the key.""" + ledger = Ledger(run_id="r2", base_dir=base_dir) + ledger.start_run( + manifest_path="m", manifest_hash="h", + profile="p", environment="Sandbox", company="C", + ) + ledger.write_intent(seq=1, method="POST", url="http://x/y", body_hash="abc") + rows = ledger.get_steps("r2") + assert rows[0]["idempotency_key"] is None + ledger.close() + + +# ─── Same-run replay protection ────────────────────────────────────── + + +def test_find_committed_idempotent_step_returns_match(base_dir): + """A prior committed step with the same key is detectable.""" + ledger = Ledger(run_id="r3", base_dir=base_dir) + ledger.start_run( + manifest_path="m", manifest_hash="h", + profile="p", environment="Sandbox", company="C", + ) + step_id = ledger.write_intent( + seq=1, method="POST", url="http://x/y", body_hash="abc", + idempotency_key="dup-key", + ) + ledger.write_outcome( + step_id=step_id, status="committed", + bc_correlation_id="corr-1", error_message=None, rollback_url=None, + ) + + prior = ledger.find_committed_idempotent_step("dup-key") + assert prior is not None + assert prior["step_id"] == step_id + assert prior["status"] == "committed" + assert prior["bc_correlation_id"] == "corr-1" + ledger.close() + + +def test_find_committed_idempotent_step_ignores_uncommitted(base_dir): + """Intent rows that didn't reach ``committed`` aren't matches. + + An intent row with ``outcome_ts=NULL`` (process died mid-call) is a + classic SIGKILL case — the operator should be free to retry, + same-key, and the new attempt should NOT be refused. + """ + ledger = Ledger(run_id="r4", base_dir=base_dir) + ledger.start_run( + manifest_path="m", manifest_hash="h", + profile="p", environment="Sandbox", company="C", + ) + ledger.write_intent( + seq=1, method="POST", url="http://x/y", body_hash="abc", + idempotency_key="resumable", + ) + # No write_outcome → outcome_ts is NULL. + assert ledger.find_committed_idempotent_step("resumable") is None + + # Even an explicit failed outcome is replayable. + step_id = ledger.write_intent( + seq=2, method="POST", url="http://x/y2", body_hash="abc", + idempotency_key="failed-once", + ) + ledger.write_outcome( + step_id=step_id, status="failed", + bc_correlation_id=None, error_message="boom", rollback_url=None, + ) + assert ledger.find_committed_idempotent_step("failed-once") is None + ledger.close() + + +def test_find_committed_idempotent_step_returns_none_for_missing(base_dir): + ledger = Ledger(run_id="r5", base_dir=base_dir) + ledger.start_run( + manifest_path="m", manifest_hash="h", + profile="p", environment="Sandbox", company="C", + ) + assert ledger.find_committed_idempotent_step("never-seen") is None + ledger.close() diff --git a/tests/test_batch_ledger/test_ledger_schema.py b/tests/test_batch_ledger/test_ledger_schema.py index b3f8c04..24ed66f 100644 --- a/tests/test_batch_ledger/test_ledger_schema.py +++ b/tests/test_batch_ledger/test_ledger_schema.py @@ -77,7 +77,10 @@ def test_schema_version_recorded(self, base_dir): ) with sqlite3.connect(base_dir / "r.db") as conn: (version,) = conn.execute("SELECT version FROM schema_version").fetchone() - assert version == 1 + # AIP §Phase 4d bumped SCHEMA_VERSION 1 → 2 to introduce the + # ``step.idempotency_key`` column. Migration on existing v1 + # ledgers is exercised in ``test_ledger_idempotency.py``. + assert version == 2 def test_state_enum_allowed_values(self, base_dir): """Run.state CHECK enum includes every value in the contract.""" diff --git a/tests/test_cli/test_batch_safety.py b/tests/test_cli/test_batch_safety.py index 8a15d6f..b823998 100644 --- a/tests/test_cli/test_batch_safety.py +++ b/tests/test_cli/test_batch_safety.py @@ -148,7 +148,7 @@ def test_post_step_aborts_without_yes( file=f, dry_run=False, output=None, format=None, set_params=None, params_file=None, yes=False, ) - assert excinfo.value.exit_code == 1 + assert excinfo.value.exit_code == 8 # AIP §Phase 4a: EXIT_POLICY fake_client.post.assert_not_awaited() def test_patch_step_aborts_without_yes( diff --git a/tests/test_cli/test_output_format.py b/tests/test_cli/test_output_format.py index bdc49c1..5d36685 100644 --- a/tests/test_cli/test_output_format.py +++ b/tests/test_cli/test_output_format.py @@ -35,9 +35,16 @@ def test_generic_agent_gets_markdown(self, monkeypatch): _force_tty(monkeypatch, True) assert detect_default_format() == "markdown" - def test_non_tty_gets_markdown(self, monkeypatch): + def test_non_tty_gets_json(self, monkeypatch): + """AIP §Phase 4b: pipes / redirects default to JSON. + + Programmatic consumers (CI, agents, scripts) get the canonical + machine-readable shape unless they explicitly override. + """ _force_tty(monkeypatch, False) - assert detect_default_format() == "markdown" + # Ensure platform is non-Windows so we exercise the Phase 4b branch. + monkeypatch.setattr("sys.platform", "linux") + assert detect_default_format() == "json" def test_posix_tty_gets_table(self, monkeypatch): monkeypatch.setattr("sys.platform", "linux") diff --git a/tests/test_cli/test_safety.py b/tests/test_cli/test_safety.py index 8a19411..5e7d213 100644 --- a/tests/test_cli/test_safety.py +++ b/tests/test_cli/test_safety.py @@ -70,7 +70,7 @@ def test_non_interactive_without_yes_aborts(self, readonly_profile, monkeypatch) monkeypatch.setattr(sys.stdin, "isatty", lambda: False) with pytest.raises(typer.Exit) as excinfo: confirm_write_or_exit("POST", "vendors", yes=False) - assert excinfo.value.exit_code == 1 + assert excinfo.value.exit_code == 8 # AIP §Phase 4a: EXIT_POLICY def test_interactive_yes_input_proceeds(self, readonly_profile, monkeypatch): # Pretend stdin is a TTY and the user types 'yes' @@ -91,7 +91,7 @@ def test_interactive_anything_else_aborts(self, readonly_profile, monkeypatch): monkeypatch.setattr("typer.prompt", lambda *a, **kw: "y") # not literal 'yes' with pytest.raises(typer.Exit) as excinfo: confirm_write_or_exit("POST", "vendors", yes=False) - assert excinfo.value.exit_code == 1 + assert excinfo.value.exit_code == 8 # AIP §Phase 4a: EXIT_POLICY def test_empty_input_aborts(self, readonly_profile, monkeypatch): import sys diff --git a/tests/test_describe/test_describe_cmd.py b/tests/test_describe/test_describe_cmd.py index ec30a8c..f1644fc 100644 --- a/tests/test_describe/test_describe_cmd.py +++ b/tests/test_describe/test_describe_cmd.py @@ -452,6 +452,36 @@ def test_describe_cache_invalidates_on_registry_mtime(tmp_config): ) +# ─── Exit codes projection (Phase 4a) ──────────────────────────────── + + +def test_describe_includes_exit_codes(tmp_config): + """Phase 4a: top-level `exit_codes` projects the CLI taxonomy. + + Agents consume `bcli describe` to learn how to interpret a non-zero + `bcli` exit. The map MUST include every documented code so the + runtime can render meaningful errors. + """ + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + assert result.exit_code == 0, result.stderr or result.stdout + data = json.loads(result.stdout) + assert "exit_codes" in data, "top-level exit_codes key missing" + codes = data["exit_codes"] + keyset = {int(k) for k in codes.keys()} + for expected in (0, 1, 2, 3, 4, 5, 6, 7, 8): + assert expected in keyset, f"missing exit code {expected} in describe output" + for label in codes.values(): + assert isinstance(label, str) and label + + +def test_describe_subtree_drops_exit_codes(tmp_config): + """Subtree mode trims metadata — exit_codes belongs to the full doc.""" + _write_basic_profile(tmp_config["config_file"]) + sub = json.loads(_invoke_describe("get", "--format", "json").stdout) + assert "exit_codes" not in sub + + # ─── Table format smoke test ───────────────────────────────────────── diff --git a/tests/test_envelope/test_batch_envelope_with_ledger.py b/tests/test_envelope/test_batch_envelope_with_ledger.py index 7061fbe..046328d 100644 --- a/tests/test_envelope/test_batch_envelope_with_ledger.py +++ b/tests/test_envelope/test_batch_envelope_with_ledger.py @@ -233,7 +233,8 @@ class TestEnvelopeOnPolicyRefusalForBatch: Chosen behavior: the ledger row exists (``start_run`` ran before the gate, so an audit trail of "the run was attempted" is preserved), finalized as ``failed``. The envelope is ``status="failed"``, - ``exit_code=1``. No ``_execute_batch`` call, no step rows. + ``exit_code=8`` (AIP §Phase 4a: ``EXIT_POLICY``). No + ``_execute_batch`` call, no step rows. """ def test_policy_refusal_emits_failed_envelope_and_failed_ledger( @@ -262,11 +263,11 @@ def test_policy_refusal_emits_failed_envelope_and_failed_ledger( set_params=None, params_file=None, yes=False, result_out=out, result_fd=None, ) - assert excinfo.value.exit_code == 1 + assert excinfo.value.exit_code == 8 # AIP §Phase 4a: EXIT_POLICY env = json.loads(out.read_text()) assert env["status"] == "failed" - assert env["exit_code"] == 1 + assert env["exit_code"] == 8 assert env["profile"] == "prod" assert env["environment"] == "Production" diff --git a/tests/test_envelope/test_envelope_policy_violation.py b/tests/test_envelope/test_envelope_policy_violation.py index ca88e45..32ce748 100644 --- a/tests/test_envelope/test_envelope_policy_violation.py +++ b/tests/test_envelope/test_envelope_policy_violation.py @@ -11,8 +11,9 @@ ``_envelope_wrap.capture`` catches the resulting ``typer.Exit(1)`` and emits the failed envelope. -Per the lead's review note: ``exit_code`` stays at ``1`` for now (don't -rename to ``EXIT_POLICY`` / 8 — Phase 4 owns the taxonomy rename). +Phase 4a renamed the policy-refusal exit from ``1`` to +``EXIT_POLICY`` / ``8`` so an agent can distinguish a deliberate refusal +from a generic crash. """ from __future__ import annotations @@ -44,7 +45,7 @@ def _assert_policy_failure_envelope(envelope_path: Path, *, method: str, endpoin ) env = json.loads(envelope_path.read_text()) assert env["status"] == "failed", env - assert env["exit_code"] == 1, env # Phase 4 will rename to EXIT_POLICY (8) + assert env["exit_code"] == 8, env # Phase 4a renamed: EXIT_POLICY assert env["method"] == method assert env["endpoint"] == endpoint # Profile context is captured even on a refused write — that's the @@ -71,7 +72,7 @@ def test_post_emits_failed_envelope_when_disable_writes_blocks( result_out=out, result_fd=None, ) - assert excinfo.value.exit_code == 1 + assert excinfo.value.exit_code == 8 _assert_policy_failure_envelope(out, method="POST", endpoint="vendors") fake_client.post.assert_not_awaited() @@ -96,7 +97,7 @@ def test_patch_emits_failed_envelope_when_disable_writes_blocks( result_out=out, result_fd=None, ) - assert excinfo.value.exit_code == 1 + assert excinfo.value.exit_code == 8 _assert_policy_failure_envelope(out, method="PATCH", endpoint="vendors") # record_id was captured before the gate refused — confirms the gate # is now inside the capture block. @@ -124,7 +125,7 @@ def test_delete_emits_failed_envelope_when_disable_writes_blocks( result_out=out, result_fd=None, ) - assert excinfo.value.exit_code == 1 + assert excinfo.value.exit_code == 8 _assert_policy_failure_envelope(out, method="DELETE", endpoint="vendors") env = json.loads(out.read_text()) assert env["record_id"] == "vnd-1" @@ -155,7 +156,7 @@ def test_attach_upload_emits_failed_envelope_when_disable_writes_blocks( result_out=out, result_fd=None, ) - assert excinfo.value.exit_code == 1 + assert excinfo.value.exit_code == 8 _assert_policy_failure_envelope( out, method="UPLOAD", endpoint="documentAttachments", ) diff --git a/tests/test_errors/__init__.py b/tests/test_errors/__init__.py new file mode 100644 index 0000000..644704c --- /dev/null +++ b/tests/test_errors/__init__.py @@ -0,0 +1 @@ +"""Tests for the centralized "Did you mean" error handler (AIP §Phase 4c).""" diff --git a/tests/test_errors/test_did_you_mean.py b/tests/test_errors/test_did_you_mean.py new file mode 100644 index 0000000..daf66d2 --- /dev/null +++ b/tests/test_errors/test_did_you_mean.py @@ -0,0 +1,116 @@ +"""AIP §Phase 4c — every error names the fix. + +The CLI catches ``BCLIError`` subclasses at the outer ``main()`` boundary +and adds: + +1. The right exit code (taxonomy from §Phase 4a). +2. A short, executable remediation hint after the error message. + +The hint format is always ``Run 'bcli ...'.`` so an agent +can pattern-match the suggestion. ``RegistryError`` already carries +fuzzy "Did you mean: X, Y, Z?" suggestions; that path stays — we just +extend the recipe to ``AuthError``, ``ConfigError``, and the no-fuzzy- +match registry case. + +These tests pin the *exit code* and the *suggestion text*. Hint placement +on stderr vs stdout is a detail the handler owns. +""" + +from __future__ import annotations + +from bcli.errors import AuthError, ConfigError, RegistryError +from bcli.exit_codes import EXIT_AUTH, EXIT_NOT_FOUND, EXIT_USAGE +from bcli_cli._error_handler import format_error_for_cli, map_error_to_exit_code + + +# ─── Exit-code mapping ─────────────────────────────────────────────── + + +def test_auth_error_maps_to_exit_auth(): + assert map_error_to_exit_code(AuthError("token expired")) == EXIT_AUTH + + +def test_config_error_maps_to_exit_usage(): + """ConfigError is a setup/usage problem the user can fix locally.""" + assert map_error_to_exit_code(ConfigError("no profile")) == EXIT_USAGE + + +def test_registry_error_maps_to_exit_not_found(): + assert map_error_to_exit_code(RegistryError("foo not found")) == EXIT_NOT_FOUND + + +# ─── Remediation hint injection ────────────────────────────────────── + + +def test_auth_error_remediation_names_login_command(): + """AuthError messages get a `bcli auth login` hint appended.""" + msg = format_error_for_cli(AuthError("token expired"), active_profile="finance") + assert "bcli auth login" in msg + assert "--profile finance" in msg + + +def test_auth_error_without_profile_still_gets_generic_login_hint(): + msg = format_error_for_cli(AuthError("token expired"), active_profile=None) + assert "bcli auth login" in msg + + +def test_config_error_when_no_profiles_suggests_config_init(): + """ConfigError messages from ``get_profile`` already mention init. + + The CLI handler is idempotent: if the upstream message already + carries the hint, don't append a duplicate. + """ + exc = ConfigError( + "No profiles configured. Run 'bcli config init' to create your first profile." + ) + msg = format_error_for_cli(exc, active_profile=None) + # Hint appears exactly once. + assert msg.count("bcli config init") == 1 + + +def test_config_error_unknown_profile_gets_did_you_mean(monkeypatch): + """Unknown profile name → suggest similar names + config init. + + We pass the candidate list explicitly so the handler doesn't need to + re-load config (which the CLI already loaded once to fail). + """ + exc = ConfigError("Profile 'finence' not found.") + msg = format_error_for_cli( + exc, + active_profile="finence", + available_profiles=["finance", "production", "sandbox"], + ) + assert "Did you mean" in msg + assert "finance" in msg + + +def test_registry_error_with_no_fuzzy_match_suggests_import(): + """RegistryError without ``Did you mean:`` → hint to import metadata. + + The fuzzy-match case is already handled at the raise site + (``EndpointRegistry.resolve``); the handler only adds the import + hint when no fuzzy candidates were found. + """ + exc = RegistryError("Endpoint 'glargen' not found in any registry.") + msg = format_error_for_cli(exc, active_profile="finance") + assert "bcli registry import" in msg + + +def test_registry_error_with_fuzzy_match_preserves_hint(): + """If raise-site already has ``Did you mean:``, don't double-hint.""" + exc = RegistryError( + "Endpoint 'vendor' not found in any registry. Did you mean: vendors?" + ) + msg = format_error_for_cli(exc, active_profile="finance") + assert "Did you mean: vendors" in msg + # No duplicate import-hint when the fuzzy match already pointed somewhere. + assert msg.count("bcli registry import") == 0 + + +# ─── No-op for unknown error types ─────────────────────────────────── + + +def test_unknown_exception_is_pass_through(): + """A non-BCLIError shouldn't be reformatted — just echoes the message.""" + msg = format_error_for_cli(RuntimeError("kaboom"), active_profile="finance") + assert "kaboom" in msg diff --git a/tests/test_exit_codes/__init__.py b/tests/test_exit_codes/__init__.py new file mode 100644 index 0000000..2c95aac --- /dev/null +++ b/tests/test_exit_codes/__init__.py @@ -0,0 +1 @@ +"""Tests for the AIP §Phase 4 CLI exit-code taxonomy.""" diff --git a/tests/test_exit_codes/test_taxonomy.py b/tests/test_exit_codes/test_taxonomy.py new file mode 100644 index 0000000..e5f8bcf --- /dev/null +++ b/tests/test_exit_codes/test_taxonomy.py @@ -0,0 +1,57 @@ +"""Pin the AIP §Phase 4 exit-code taxonomy values and helper behavior.""" + +from __future__ import annotations + +from bcli.exit_codes import ( + EXIT_AUTH, + EXIT_GENERIC_ERROR, + EXIT_NOT_FOUND, + EXIT_OK, + EXIT_POLICY, + EXIT_REMOTE_4XX, + EXIT_REMOTE_5XX, + EXIT_USAGE, + EXIT_VALIDATION, + EXIT_CODES, + describe_exit_code, + exit_code_for_status, +) + + +def test_taxonomy_values_match_contract(): + """These integers are the public contract. Bumping them is breaking.""" + assert EXIT_OK == 0 + assert EXIT_GENERIC_ERROR == 1 + assert EXIT_USAGE == 2 + assert EXIT_AUTH == 3 + assert EXIT_NOT_FOUND == 4 + assert EXIT_VALIDATION == 5 + assert EXIT_REMOTE_4XX == 6 + assert EXIT_REMOTE_5XX == 7 + assert EXIT_POLICY == 8 + + +def test_exit_codes_map_covers_full_taxonomy(): + """`EXIT_CODES` is the data the `bcli describe` projection consumes.""" + for code in (0, 1, 2, 3, 4, 5, 6, 7, 8): + assert code in EXIT_CODES + assert isinstance(EXIT_CODES[code], str) and EXIT_CODES[code] + + +def test_describe_exit_code_returns_label(): + assert describe_exit_code(0) == EXIT_CODES[0] + assert describe_exit_code(8) == EXIT_CODES[8] + + +def test_exit_code_for_status_4xx_5xx(): + assert exit_code_for_status(400) == EXIT_REMOTE_4XX + assert exit_code_for_status(404) == EXIT_REMOTE_4XX + assert exit_code_for_status(429) == EXIT_REMOTE_4XX + assert exit_code_for_status(500) == EXIT_REMOTE_5XX + assert exit_code_for_status(503) == EXIT_REMOTE_5XX + + +def test_exit_code_for_status_none_or_other_returns_generic(): + assert exit_code_for_status(None) == EXIT_GENERIC_ERROR + assert exit_code_for_status(200) == EXIT_GENERIC_ERROR + assert exit_code_for_status(0) == EXIT_GENERIC_ERROR diff --git a/tests/test_idempotency/__init__.py b/tests/test_idempotency/__init__.py new file mode 100644 index 0000000..a5d3f7d --- /dev/null +++ b/tests/test_idempotency/__init__.py @@ -0,0 +1 @@ +"""Tests for the --idempotency-key flag (AIP §Phase 4d).""" diff --git a/tests/test_idempotency/conftest.py b/tests/test_idempotency/conftest.py new file mode 100644 index 0000000..9223714 --- /dev/null +++ b/tests/test_idempotency/conftest.py @@ -0,0 +1,64 @@ +"""Shared fixtures for idempotency CLI-level tests. + +Mirrors the ``cli_state`` pattern in ``tests/test_envelope/conftest.py``: +seed a writable Sandbox profile so ``confirm_write_or_exit`` — +unconditionally invoked from the mutation commands — can read +``state.profile`` without raising ``ConfigError`` in a hermetic +environment (no ``~/.config/bcli/config.toml``). +""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from bcli.config._model import BCConfig, BCDefaults, BCProfile +from bcli_cli._state import state + + +@pytest.fixture(autouse=True) +def isolated_home(tmp_path, monkeypatch): + """Scope HOME / Path.home to tmp_path so any incidental ledger or + cache write lands under the per-test tree rather than the + developer's real ``~/.config/bcli/``. + """ + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + yield tmp_path + + +@pytest.fixture +def writable_state(monkeypatch): + """CLI state pointing at a writable Sandbox profile. + + Required because ``confirm_write_or_exit`` reads ``state.profile`` + even when ``yes=True``; a missing config raises ``ConfigError`` and + breaks the test before the assertion runs. + """ + cfg = BCConfig( + defaults=BCDefaults(profile="dev"), + profiles={ + "dev": BCProfile( + tenant_id="t1", + environment="Sandbox", + company_id="c-123", + disable_writes=False, + ), + }, + ) + state._config = cfg + state._registry = None + state._telemetry = None + state.profile_name = None + state.env_override = None + state.company_override = None + state.format = "table" + state.dry_run = False + state.quiet = False + yield state + state._config = None + state._registry = None + state._telemetry = None + state.profile_name = None + state.dry_run = False diff --git a/tests/test_idempotency/test_cli_flags.py b/tests/test_idempotency/test_cli_flags.py new file mode 100644 index 0000000..e6825bd --- /dev/null +++ b/tests/test_idempotency/test_cli_flags.py @@ -0,0 +1,135 @@ +"""CLI-level test: --idempotency-key flag plumbs through to the client. + +Pins that the four single-mutation commands (``post``, ``patch``, +``delete``, ``attach upload``) accept the flag and forward it to +:class:`AsyncBCClient`. The transport-level header injection is +covered separately in ``test_idempotency_key.py``. + +Same-run replay protection through the batch ledger lives in +``tests/test_batch_ledger/test_ledger_idempotency.py``; cross-command +ledger integration is deferred to v0.2 (see PR body). +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock + +import typer + +from bcli_cli._state import state + + +def _fake_client(monkeypatch): + """Stand up an AsyncBCClient stub that records every kwargs dict.""" + client = AsyncMock() + + async def _ctx_enter(): + return client + + async def _ctx_exit(*_): + return False + + fake_cm = AsyncMock() + fake_cm.__aenter__ = AsyncMock(side_effect=_ctx_enter) + fake_cm.__aexit__ = AsyncMock(side_effect=_ctx_exit) + client.post = AsyncMock(return_value={"systemId": "rec-1", "id": "rec-1"}) + client.patch = AsyncMock(return_value={"systemId": "rec-1"}) + client.delete = AsyncMock(return_value={}) + + monkeypatch.setattr( + state, "make_async_client", lambda **_: fake_cm, + ) + # Make typer.Exit short-circuit cleanly; tests inspect the client. + return client + + +def test_post_command_forwards_idempotency_key(writable_state, monkeypatch, tmp_path): + from bcli_cli.commands import post_cmd + + client = _fake_client(monkeypatch) + # Bypass print_context_banner / audit wrap noise. + monkeypatch.setattr("bcli_cli.commands.post_cmd.print_context_banner", lambda: None) + monkeypatch.setattr( + "bcli_cli.commands.post_cmd._audited_post", + lambda endpoint, body, **kw: client.post(endpoint, body, **kw), + ) + monkeypatch.setattr("bcli_cli.commands.post_cmd.format_output", lambda *a, **k: None) + monkeypatch.setattr(state, "dry_run", False) + + try: + post_cmd.post_command( + endpoint="vendors", + data='{"displayName": "Acme"}', + format=None, + publisher=None, group=None, version=None, + yes=True, + result_out=None, result_fd=None, + idempotency_key="op-key-1", + ) + except typer.Exit: + pass + + # Last call to the fake client carries the idempotency_key kw. + client.post.assert_awaited() + args, kwargs = client.post.call_args + assert kwargs.get("idempotency_key") == "op-key-1" + + +def test_patch_command_forwards_idempotency_key(writable_state, monkeypatch, tmp_path): + from bcli_cli.commands import patch_cmd + + client = _fake_client(monkeypatch) + monkeypatch.setattr("bcli_cli.commands.patch_cmd.print_context_banner", lambda: None) + monkeypatch.setattr( + "bcli_cli.commands.patch_cmd._audited_patch", + lambda endpoint, record_id, body, **kw: client.patch(endpoint, record_id, body, **kw), + ) + monkeypatch.setattr("bcli_cli.commands.patch_cmd.format_output", lambda *a, **k: None) + monkeypatch.setattr(state, "dry_run", False) + + try: + patch_cmd.patch_command( + endpoint="vendors", record_id="vnd-1", + data='{"displayName": "Renamed"}', + etag="*", + format=None, + publisher=None, group=None, version=None, + yes=True, + result_out=None, result_fd=None, + idempotency_key="op-patch-1", + ) + except typer.Exit: + pass + + client.patch.assert_awaited() + args, kwargs = client.patch.call_args + assert kwargs.get("idempotency_key") == "op-patch-1" + + +def test_delete_command_forwards_idempotency_key(writable_state, monkeypatch): + from bcli_cli.commands import delete_cmd + + client = _fake_client(monkeypatch) + monkeypatch.setattr("bcli_cli.commands.delete_cmd.print_context_banner", lambda: None) + monkeypatch.setattr( + "bcli_cli.commands.delete_cmd._audited_delete", + lambda endpoint, record_id, **kw: client.delete(endpoint, record_id, **kw), + ) + monkeypatch.setattr(state, "dry_run", False) + + try: + delete_cmd.delete_command( + endpoint="vendors", record_id="vnd-1", + etag="*", + format=None, + publisher=None, group=None, version=None, + yes=True, + result_out=None, result_fd=None, + idempotency_key="op-del-1", + ) + except typer.Exit: + pass + + client.delete.assert_awaited() + args, kwargs = client.delete.call_args + assert kwargs.get("idempotency_key") == "op-del-1" diff --git a/tests/test_idempotency/test_idempotency_key.py b/tests/test_idempotency/test_idempotency_key.py new file mode 100644 index 0000000..04e1ded --- /dev/null +++ b/tests/test_idempotency/test_idempotency_key.py @@ -0,0 +1,117 @@ +"""Transport sends the Idempotency-Key header when set on the client method. + +This is the http-layer half of AIP §Phase 4d. The CLI half — flag plumbing, +ledger persistence, same-run replay protection — lives in +``tests/test_batch_ledger/test_ledger_idempotency.py`` and the batch run +integration tests. + +The BC Online API doesn't currently document Idempotency-Key as a +first-class feature, but standardizing on the IETF draft header +(``Idempotency-Key: ``) means any reverse proxy / gateway in +front of BC can apply replay protection too, and lets us ledger the +key for our own retry logic regardless of server support. +""" + +from __future__ import annotations + +import asyncio + +import httpx +import pytest + +from bcli.client._async import AsyncBCClient + + +class _FakeTransport: + """Tiny stub that records request kwargs for assertion.""" + + def __init__(self) -> None: + self.calls: list[dict] = [] + + async def request(self, method, url, **kwargs): + self.calls.append( + {"method": method, "url": url, "headers": dict(kwargs.get("headers", {}))} + ) + # Return a minimal valid OData response. + return httpx.Response( + 201, json={"systemId": "rec-1", "id": "rec-1"}, request=httpx.Request(method, url), + ) + + +@pytest.fixture +def fake_client(monkeypatch): + """Build an AsyncBCClient wired to a fake httpx layer.""" + client = AsyncBCClient.__new__(AsyncBCClient) + # The minimal attributes the post/patch/delete paths touch. + client._closed = False # type: ignore[attr-defined] + transport_stub = _FakeTransport() + + # Patch _ensure_transport to return our stub for the post path. + class _StubTransport: + def __init__(self): + self._client = transport_stub + self.calls = transport_stub.calls + + async def post(self, url, *, json_body, idempotency_key=None): + headers = {} + if idempotency_key is not None: + headers["Idempotency-Key"] = idempotency_key + transport_stub.calls.append( + {"method": "POST", "url": url, "headers": headers, "json": json_body} + ) + return {"systemId": "rec-1", "id": "rec-1"} + + async def patch(self, url, *, json_body, etag="*", idempotency_key=None): + headers = {"If-Match": etag} + if idempotency_key is not None: + headers["Idempotency-Key"] = idempotency_key + transport_stub.calls.append( + {"method": "PATCH", "url": url, "headers": headers, "json": json_body} + ) + return {"systemId": "rec-1"} + + async def delete(self, url, *, etag="*", idempotency_key=None): + headers = {"If-Match": etag} + if idempotency_key is not None: + headers["Idempotency-Key"] = idempotency_key + transport_stub.calls.append( + {"method": "DELETE", "url": url, "headers": headers} + ) + return {} + + stub = _StubTransport() + monkeypatch.setattr(client, "_ensure_transport", lambda: stub, raising=False) + monkeypatch.setattr(client, "_resolve_url", lambda *a, **kw: "https://example.com/api/x", raising=False) + return client, stub + + +def test_post_passes_idempotency_key_to_transport(fake_client): + client, stub = fake_client + asyncio.run( + client.post("vendors", {"name": "Acme"}, idempotency_key="op-abc") + ) + assert stub.calls[-1]["method"] == "POST" + assert stub.calls[-1]["headers"].get("Idempotency-Key") == "op-abc" + + +def test_post_without_idempotency_key_omits_header(fake_client): + client, stub = fake_client + asyncio.run(client.post("vendors", {"name": "Acme"})) + assert "Idempotency-Key" not in stub.calls[-1]["headers"] + + +def test_patch_passes_idempotency_key_to_transport(fake_client): + client, stub = fake_client + asyncio.run( + client.patch("vendors", "rec-1", {"name": "Renamed"}, + idempotency_key="op-patch") + ) + assert stub.calls[-1]["headers"].get("Idempotency-Key") == "op-patch" + + +def test_delete_passes_idempotency_key_to_transport(fake_client): + client, stub = fake_client + asyncio.run( + client.delete("vendors", "rec-1", idempotency_key="op-del") + ) + assert stub.calls[-1]["headers"].get("Idempotency-Key") == "op-del" diff --git a/tests/test_output/__init__.py b/tests/test_output/__init__.py new file mode 100644 index 0000000..cf2621c --- /dev/null +++ b/tests/test_output/__init__.py @@ -0,0 +1 @@ +"""Tests for the output module — JSON-on-pipe default (AIP §Phase 4b).""" diff --git a/tests/test_output/test_json_on_pipe.py b/tests/test_output/test_json_on_pipe.py new file mode 100644 index 0000000..6fa2fc3 --- /dev/null +++ b/tests/test_output/test_json_on_pipe.py @@ -0,0 +1,85 @@ +"""``detect_default_format`` should default to JSON when stdout isn't a TTY. + +AIP §Phase 4b: piped/redirected stdout means a programmatic consumer is +reading the output. JSON is the canonical machine-readable shape — agents +shouldn't have to spell ``--format json`` to get it. + +The CLAUDECODE / BCLI_AGENT env-var branches are *explicit* user opt-ins +for markdown and are left alone. The win32 mojibake branch likewise +stays as ``markdown`` since the issue is rendering, not parseability. +""" + +from __future__ import annotations + +import sys + +import pytest + +from bcli_cli.output import detect_default_format + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + """Clear env-vars that would short-circuit the detection.""" + for var in ("BCLI_FORMAT", "CLAUDECODE", "BCLI_AGENT"): + monkeypatch.delenv(var, raising=False) + yield + + +def test_non_tty_defaults_to_json(monkeypatch): + """Pipe / redirect → JSON (the Phase 4b standardization).""" + monkeypatch.setattr(sys.stdout, "isatty", lambda: False) + monkeypatch.setattr(sys, "platform", "linux") + assert detect_default_format() == "json" + + +def test_tty_keeps_table(monkeypatch): + """Interactive shell still gets the rich table.""" + monkeypatch.setattr(sys.stdout, "isatty", lambda: True) + monkeypatch.setattr(sys, "platform", "linux") + assert detect_default_format() == "table" + + +def test_bcli_format_env_overrides_pipe(monkeypatch): + """Explicit env-var beats any auto-detection.""" + monkeypatch.setattr(sys.stdout, "isatty", lambda: False) + monkeypatch.setenv("BCLI_FORMAT", "csv") + assert detect_default_format() == "csv" + + +def test_claudecode_env_still_picks_markdown(monkeypatch): + """Explicit AI-agent hint keeps its existing semantics (markdown). + + Flipping this would also be a Phase-4-like standardization, but the + task scopes 4b to the non-TTY branch only — agents that want JSON + pass ``--format json`` like everyone else. + """ + monkeypatch.setattr(sys.stdout, "isatty", lambda: False) + monkeypatch.setenv("CLAUDECODE", "1") + assert detect_default_format() == "markdown" + + +def test_bcli_agent_env_still_picks_markdown(monkeypatch): + monkeypatch.setattr(sys.stdout, "isatty", lambda: False) + monkeypatch.setenv("BCLI_AGENT", "1") + assert detect_default_format() == "markdown" + + +def test_windows_legacy_console_keeps_markdown(monkeypatch): + """conhost.exe still renders rich's UTF-8 box-drawing as `?`. + + JSON is *parseable* there but markdown stays a strictly safer + rendering choice for an interactive console — flipping to JSON + would surprise users who run ``bcli get`` in a normal cmd window. + """ + monkeypatch.setattr(sys.stdout, "isatty", lambda: True) + monkeypatch.setattr(sys, "platform", "win32") + monkeypatch.delenv("WT_SESSION", raising=False) + assert detect_default_format() == "markdown" + + +def test_windows_terminal_still_gets_table(monkeypatch): + monkeypatch.setattr(sys.stdout, "isatty", lambda: True) + monkeypatch.setattr(sys, "platform", "win32") + monkeypatch.setenv("WT_SESSION", "abc123") + assert detect_default_format() == "table" diff --git a/tests/test_progress/__init__.py b/tests/test_progress/__init__.py new file mode 100644 index 0000000..61307d0 --- /dev/null +++ b/tests/test_progress/__init__.py @@ -0,0 +1 @@ +"""Tests for --progress-fd progress events (AIP §Phase 4e).""" diff --git a/tests/test_progress/test_progress_events.py b/tests/test_progress/test_progress_events.py new file mode 100644 index 0000000..8217f05 --- /dev/null +++ b/tests/test_progress/test_progress_events.py @@ -0,0 +1,157 @@ +"""``--progress-fd N`` event stream contract (AIP §Phase 4e). + +For long-running ``bcli batch run`` / ``bcli extract run`` work, agents +want per-step structured events on a dedicated file descriptor: + + {"event": "step_started", "seq": 3, "method": "POST", ...} + {"event": "step_completed", "seq": 3, "status": "committed", ...} + +These tests pin the event shape via the small ``ProgressEmitter`` helper. +End-to-end wiring through ``batch_cmd.run_batch`` is exercised via a +fake fd in ``test_batch_progress_fd``. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest + +from bcli_cli._progress import ProgressEmitter, _is_real_value + + +# ─── Emitter primitives ────────────────────────────────────────────── + + +def test_emitter_writes_one_json_line_per_call(tmp_path: Path): + out = tmp_path / "events.jsonl" + fd = os.open(out, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + emitter = ProgressEmitter(fd=fd) + emitter.emit(event="step_started", seq=1, method="POST", endpoint="vendors") + emitter.emit(event="step_completed", seq=1, status="committed", + bc_correlation_id="corr-1", duration_ms=287) + emitter.close() + + lines = out.read_text().splitlines() + assert len(lines) == 2 + e1 = json.loads(lines[0]) + e2 = json.loads(lines[1]) + assert e1["event"] == "step_started" + assert e1["seq"] == 1 + assert e1["method"] == "POST" + assert e1["endpoint"] == "vendors" + assert "ts" in e1 # timestamp injected by the emitter + + assert e2["event"] == "step_completed" + assert e2["status"] == "committed" + assert e2["bc_correlation_id"] == "corr-1" + assert e2["duration_ms"] == 287 + + +def test_emitter_with_no_fd_is_noop(): + """When the user didn't pass --progress-fd, the emitter is a no-op + so command code can call ``emit()`` unconditionally.""" + emitter = ProgressEmitter(fd=None) + # Must not raise; nothing gets written. + emitter.emit(event="step_started", seq=1, method="POST", endpoint="x") + emitter.close() + + +def test_emitter_close_closes_fd(tmp_path: Path): + """Closing the emitter releases the fd so the consumer EOFs.""" + out = tmp_path / "events.jsonl" + fd = os.open(out, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + emitter = ProgressEmitter(fd=fd) + emitter.emit(event="x", seq=1) + emitter.close() + # Second close must be safe. + emitter.close() + # Writing to the fd post-close raises. + with pytest.raises(OSError): + os.write(fd, b"after close\n") + + +def test_emitter_ts_is_iso_z_format(tmp_path: Path): + out = tmp_path / "evt.jsonl" + fd = os.open(out, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + emitter = ProgressEmitter(fd=fd) + emitter.emit(event="x", seq=1) + emitter.close() + e = json.loads(out.read_text().splitlines()[0]) + # ISO 8601 with Z suffix matches the rest of bcli's audit/telemetry. + assert e["ts"].endswith("Z") + + +# ─── Typer-default detection ───────────────────────────────────────── + + +def test_is_real_value_detects_typer_defaults(): + """OptionInfo / ParameterInfo instances from Typer must be treated as + "not provided" so tests that call the command function directly + without keyword arguments don't accidentally enable the fd path.""" + class _Stub: + pass + stub = _Stub() + stub.__class__.__name__ = "OptionInfo" + assert _is_real_value(stub) is False + assert _is_real_value(None) is False + assert _is_real_value(3) is True + assert _is_real_value("/dev/stdout") is True + + +# ─── batch run end-to-end through the emitter ──────────────────────── + + +def test_batch_run_emits_step_events_to_progress_fd(tmp_path: Path, monkeypatch): + """End-to-end: --progress-fd 7 writes step_started + step_completed + events for each step in the manifest.""" + import asyncio + from unittest.mock import AsyncMock + + from bcli.workflow._models import StepResult, WorkflowContext # noqa: F401 + from bcli_cli._progress import ProgressEmitter + from bcli_cli.commands import batch_cmd + + # Build a fake async-with client that responds to .post. + fake_client = AsyncMock() + fake_client.post = AsyncMock(return_value={"systemId": "rec-1"}) + fake_client._resolve_url = lambda *a, **kw: "https://example.com/api/vendors" + + class _CM: + async def __aenter__(self_inner): + return fake_client + async def __aexit__(self_inner, *a): + return False + + monkeypatch.setattr( + "bcli_cli.commands.batch_cmd.state.make_async_client", + lambda **_: _CM(), + ) + + out = tmp_path / "events.jsonl" + fd = os.open(out, os.O_WRONLY | os.O_CREAT | os.O_TRUNC) + + # Directly exercise _execute_batch with a progress emitter. + progress = ProgressEmitter(fd=fd) + steps = [ + {"action": "post", "endpoint": "vendors", "data": {"name": "A"}}, + {"action": "post", "endpoint": "vendors", "data": {"name": "B"}}, + ] + asyncio.run(batch_cmd._execute_batch(steps, progress=progress)) + progress.close() + + events = [json.loads(line) for line in out.read_text().splitlines() if line] + # Each step → 1 started + 1 completed. + started = [e for e in events if e["event"] == "step_started"] + completed = [e for e in events if e["event"] == "step_completed"] + assert len(started) == 2 + assert len(completed) == 2 + # Step sequence numbers in order. + assert [e["seq"] for e in started] == [1, 2] + assert [e["seq"] for e in completed] == [1, 2] + # Each completed event names the outcome. + for e in completed: + assert e["status"] in {"committed", "failed", "error"} + assert e["method"] == "POST"