From 7ed302f1ce421d847b3aae7c591c2b265b2835c9 Mon Sep 17 00:00:00 2001 From: igor-ctrl Date: Sat, 16 May 2026 20:03:40 -0500 Subject: [PATCH 1/2] feat(envelope): mutation result envelope via --result-out/--result-fd (AIP Phase 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an opt-in JSON result envelope an agent runtime can consume on a side channel — never on stdout. Five mutating commands now accept `--result-out PATH` (atomic write) or `--result-fd N` (write+close): bcli post / patch / delete / attach upload / batch run Envelope shape matches contract doc §Phase 2: version, invocation_id, tool_version, profile, environment, company, method, endpoint, resolved_url, record_id, dry_run, status, exit_code, bc_correlation_id, telemetry_event_id, audit_log_offset, started_at, duration_ms. Implementation: - `bcli.result_envelope` — frozen dataclass + atomic writer (tmp file + os.replace; fsync; mkdir parents). - `bcli.exit_codes` — taxonomy seeded for Phase 4 (Worker A) to extend. Phase 2 only wires 0/1/6/7 today. - `bcli_cli._envelope_wrap` — sibling to `_audit_wrap`, owns the context-manager lifecycle so the five commands share one implementation (validate flags, capture start_at/uuid, build+write the envelope on success or failure). Behaviour: - Stdout output is untouched; `--format` still drives the existing dump. - Envelope writes on BOTH success and failure paths (HTTP 4xx → exit 6, 5xx → exit 7, other → exit 1; bc_correlation_id captured when present). - `--dry-run` produces an envelope with `dry_run: true`, `status: "succeeded"`, no HTTP call. - `record_id` extracted from response body: `systemId` → `id` → first `*Id` field; null when none available. - `--result-out` and `--result-fd` are mutually exclusive (typer.BadParameter on conflict). - Both flags are strictly opt-in — when unset, commands behave exactly as before (additive change). Tests: 36 new (`tests/test_envelope/`). 667 total pass; ruff clean. Known limitations / follow-ups: - `telemetry_event_id` and `audit_log_offset` are always null; wiring needs a protocol extension on the telemetry + audit sinks. Deferred to a follow-up to keep this PR additive. - Policy violations (`disable_writes=true`, non-interactive, no `--yes`) exit before the capture block — no envelope is emitted. Phase 4 will define exit 8 and surface this in the envelope. - Existing direct-call tests (e.g. `test_batch_safety.py`) pass typer `OptionInfo` defaults to the new params; the wrapper treats those as "not provided" so we don't have to update every call site. --- src/bcli/exit_codes.py | 47 +++ src/bcli/result_envelope.py | 110 ++++++ src/bcli_cli/_envelope_wrap.py | 338 ++++++++++++++++++ src/bcli_cli/commands/attach_cmd.py | 110 +++--- src/bcli_cli/commands/batch_cmd.py | 192 ++++++---- src/bcli_cli/commands/delete_cmd.py | 67 +++- src/bcli_cli/commands/patch_cmd.py | 66 +++- src/bcli_cli/commands/post_cmd.py | 62 +++- tests/test_envelope/__init__.py | 0 tests/test_envelope/conftest.py | 92 +++++ .../test_envelope_other_verbs.py | 241 +++++++++++++ tests/test_envelope/test_envelope_post.py | 289 +++++++++++++++ tests/test_envelope/test_result_envelope.py | 165 +++++++++ 13 files changed, 1615 insertions(+), 164 deletions(-) create mode 100644 src/bcli/exit_codes.py create mode 100644 src/bcli/result_envelope.py create mode 100644 src/bcli_cli/_envelope_wrap.py create mode 100644 tests/test_envelope/__init__.py create mode 100644 tests/test_envelope/conftest.py create mode 100644 tests/test_envelope/test_envelope_other_verbs.py create mode 100644 tests/test_envelope/test_envelope_post.py create mode 100644 tests/test_envelope/test_result_envelope.py diff --git a/src/bcli/exit_codes.py b/src/bcli/exit_codes.py new file mode 100644 index 0000000..be01ac6 --- /dev/null +++ b/src/bcli/exit_codes.py @@ -0,0 +1,47 @@ +"""Canonical bcli exit codes — AIP §Phase 4 taxonomy, seeded in Phase 2. + +Phase 2 needs ``EXIT_REMOTE_4XX`` and ``EXIT_REMOTE_5XX`` for the result +envelope's ``exit_code`` field on failure. Phase 4 (Worker A) will wire +the rest into the CLI so ``bcli`` exits with the documented status. + +Codes match the contract doc: + +* ``0`` — success +* ``1`` — uncategorised failure (default for exceptions we can't map) +* ``2`` — usage error (wrong flag combination, invalid argument) +* ``3`` — auth failure +* ``4`` — not found +* ``5`` — input validation (client-side) +* ``6`` — remote 4xx +* ``7`` — remote 5xx +* ``8`` — policy violation (e.g. ``disable_writes`` triggered without ``--yes``) + +Importing the constants keeps test assertions and CLI plumbing in lock-step. +""" + +from __future__ import annotations + +EXIT_OK = 0 +EXIT_GENERIC_ERROR = 1 +EXIT_USAGE = 2 +EXIT_AUTH = 3 +EXIT_NOT_FOUND = 4 +EXIT_VALIDATION = 5 +EXIT_REMOTE_4XX = 6 +EXIT_REMOTE_5XX = 7 +EXIT_POLICY = 8 + + +def exit_code_for_status(status_code: int | None) -> int: + """Map an HTTP status to a CLI exit code. + + Falls back to ``EXIT_GENERIC_ERROR`` when ``status_code`` is ``None`` + or doesn't fall in the 4xx/5xx range. + """ + if status_code is None: + return EXIT_GENERIC_ERROR + if 400 <= status_code < 500: + return EXIT_REMOTE_4XX + if 500 <= status_code < 600: + return EXIT_REMOTE_5XX + return EXIT_GENERIC_ERROR diff --git a/src/bcli/result_envelope.py b/src/bcli/result_envelope.py new file mode 100644 index 0000000..07d5bfe --- /dev/null +++ b/src/bcli/result_envelope.py @@ -0,0 +1,110 @@ +"""Mutation result envelope (AIP v0.1 §Phase 2). + +The envelope is a single JSON object an agent runtime can consume on a +side channel — never on stdout. Every mutating CLI verb (`post`, `patch`, +`delete`, `attach upload`, `batch run`) emits one envelope per invocation +when the user passes ``--result-out PATH`` or ``--result-fd N``. + +Path mode writes atomically (tmp + ``os.replace``) so a SIGKILL between +write and rename never leaves a half-written file on the documented +output path. Fd mode writes the JSON object then closes the descriptor +so a pipe reader can see EOF. + +Stdout output is untouched: the existing ``--format`` flag still drives +whatever the human/CSV/JSON dump looks like. The envelope is the +*attestation* (profile, target, correlation id, outcome), not the +response body. +""" + +from __future__ import annotations + +import json +import os +import tempfile +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Optional + +ENVELOPE_VERSION = "0.1" + + +@dataclass(frozen=True) +class ResultEnvelope: + """One mutation, attested. + + Fields mirror the contract doc table in §Phase 2. ``record_id`` is + extracted from the response body on success (``systemId``/``id``/first + ``*Id``) when available, ``None`` otherwise. ``telemetry_event_id`` and + ``audit_log_offset`` are currently always ``None`` — wiring them needs + a protocol extension on the telemetry + audit sinks, deferred to a + follow-up so this PR stays additive. + """ + + version: str + invocation_id: str + tool_version: str + profile: str | None + environment: str | None + company: str | None + method: str + endpoint: str + resolved_url: str | None + record_id: str | None + dry_run: bool + status: str # "succeeded" | "failed" + exit_code: int + bc_correlation_id: str | None + telemetry_event_id: str | None # TODO: wire via TelemetrySink follow-up + audit_log_offset: int | None # TODO: wire via AuditSink follow-up + started_at: str # ISO 8601 UTC + duration_ms: int + + +def write_envelope( + envelope: ResultEnvelope, + *, + path: Optional[Path] = None, + fd: Optional[int] = None, +) -> None: + """Serialize ``envelope`` to ``path`` (atomic) or ``fd`` (write+close). + + Exactly one of ``path`` / ``fd`` must be provided. Path mode creates + parent directories if missing and uses ``os.replace`` for atomicity. + Fd mode writes the JSON and closes the descriptor so a downstream pipe + reader sees EOF. + """ + if path is not None and fd is not None: + raise ValueError("write_envelope: pass either path or fd, not both") + if path is None and fd is None: + raise ValueError("write_envelope: must pass either path or fd") + + payload = json.dumps(asdict(envelope), default=str, indent=2) + + if path is not None: + target = Path(path) + target.parent.mkdir(parents=True, exist_ok=True) + # NamedTemporaryFile in the same dir so os.replace is on one FS. + with tempfile.NamedTemporaryFile( + mode="w", + encoding="utf-8", + dir=str(target.parent), + prefix=target.name + ".", + suffix=".tmp", + delete=False, + ) as tmp: + tmp.write(payload) + tmp.flush() + os.fsync(tmp.fileno()) + tmp_name = tmp.name + os.replace(tmp_name, str(target)) + return + + # fd path + assert fd is not None + try: + os.write(fd, payload.encode("utf-8")) + finally: + os.close(fd) + + +__all__ = ["ENVELOPE_VERSION", "ResultEnvelope", "write_envelope"] diff --git a/src/bcli_cli/_envelope_wrap.py b/src/bcli_cli/_envelope_wrap.py new file mode 100644 index 0000000..8a200fa --- /dev/null +++ b/src/bcli_cli/_envelope_wrap.py @@ -0,0 +1,338 @@ +"""CLI-side result-envelope wrapper for mutating commands. + +Sibling to :mod:`bcli_cli._audit_wrap`. Where audit emits one entry to a +JSONL sink (for compliance/forensics), the envelope writes a single JSON +object to a side channel an agent runtime parses (``--result-out PATH`` +or ``--result-fd N``). + +The two concerns intentionally stay separate: + +* Audit logs are an admin opt-in, profile-scoped, multi-call append-only. +* Envelopes are per-invocation, requested explicitly per command call, + and consumed by a programmatic caller (MCP, CI, agent runtime). + +Five commands need the same wiring (``post``, ``patch``, ``delete``, +``attach upload``, ``batch run``) so this module owns the lifecycle: + +1. :func:`validate_flags` — raise ``BadParameter`` if both flags are set. +2. :func:`Capture` context manager — start_at + uuid. +3. ``Capture.emit_*`` helpers — build the envelope and write it. + +The wrapper never raises if the user didn't pass ``--result-out`` / +``--result-fd``: that keeps the flag strictly opt-in (additive only). +""" + +from __future__ import annotations + +import uuid +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterator + +import typer + +from bcli._version import __version__ +from bcli.exit_codes import EXIT_GENERIC_ERROR, EXIT_OK, exit_code_for_status +from bcli.result_envelope import ( + ENVELOPE_VERSION, + ResultEnvelope, + write_envelope, +) +from bcli_cli._state import state + + +def _is_real_value(v: object) -> bool: + """True if ``v`` is a value the user actually provided. + + ``typer.Option`` instances leak through when a command function is + called directly (not via Typer's CLI parsing) without explicit + keyword arguments — that's the pattern existing tests use to invoke + ``post_command`` / ``run_batch`` etc. Treat those as "not provided" + so a test that doesn't care about the envelope flags doesn't trip + the mutual-exclusion guard. + """ + if v is None: + return False + # Be tolerant: Typer wraps defaults in click's OptionInfo / ParameterInfo. + cls_name = type(v).__name__ + if cls_name in {"OptionInfo", "ArgumentInfo", "ParameterInfo"}: + return False + return True + + +def validate_flags( + result_out: Path | None, + result_fd: int | None, +) -> None: + """Raise ``typer.BadParameter`` if the user passed both flags. + + Mutual exclusion is checked here (not in Typer's ``Option``) so the + error path is identical across the five commands and the spec test + can pin a single failure mode. + """ + if _is_real_value(result_out) and _is_real_value(result_fd): + raise typer.BadParameter( + "--result-out and --result-fd are mutually exclusive; pass one or the other.", + ) + + +def _now_iso_utc() -> str: + return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + +def _record_id_from(body: Any) -> str | None: + """Pull a record id out of a response body for the envelope. + + Order: + 1. ``systemId`` — BC's canonical immutable id, our preferred shape. + 2. ``id`` — fallback for endpoints that don't surface ``systemId``. + 3. First key that ends with ``Id`` (case-insensitive). + """ + if not isinstance(body, dict): + return None + for key in ("systemId", "id"): + if key in body and body[key] is not None: + return str(body[key]) + for key, value in body.items(): + if value is None: + continue + if key.lower().endswith("id"): + return str(value) + return None + + +@dataclass +class _CaptureState: + """Mutable bag used by the context manager.""" + + started_at: str + started_monotonic_ns: int + invocation_id: str + method: str + endpoint: str + result_out: Path | None + result_fd: int | None + resolved_url: str | None = None + record_id: str | None = None + dry_run: bool = False + written: bool = False # guard against double-emit + + +def _build_envelope( + cap: _CaptureState, + *, + status: str, + exit_code: int, + bc_correlation_id: str | None, +) -> ResultEnvelope: + import time as _time + + duration_ns = _time.monotonic_ns() - cap.started_monotonic_ns + duration_ms = max(0, int(duration_ns / 1_000_000)) + + profile = None + environment = None + company = None + profile_name = state.active_profile_name + try: + prof = state.profile + environment = prof.environment + company = prof.company_id + profile = profile_name + except Exception: + # Defensive: building the profile shouldn't kill the envelope write. + profile = profile_name + + return ResultEnvelope( + version=ENVELOPE_VERSION, + invocation_id=cap.invocation_id, + tool_version=__version__, + profile=profile, + environment=environment, + company=company, + method=cap.method.upper(), + endpoint=cap.endpoint, + resolved_url=cap.resolved_url, + record_id=cap.record_id, + dry_run=cap.dry_run, + status=status, + exit_code=exit_code, + bc_correlation_id=bc_correlation_id, + telemetry_event_id=None, + audit_log_offset=None, + started_at=cap.started_at, + duration_ms=duration_ms, + ) + + +def _write_if_active(cap: _CaptureState, env: ResultEnvelope) -> None: + if cap.result_out is None and cap.result_fd is None: + return + if cap.written: + return + write_envelope(env, path=cap.result_out, fd=cap.result_fd) + cap.written = True + + +class Capture: + """Active capture handle returned by :func:`capture`. + + The command wires these calls: + + * :meth:`set_resolved_url` — once the URL is known (pre-flight). + * :meth:`set_record_id` — after a successful response. + * :meth:`mark_dry_run` — when the command short-circuits. + * :meth:`emit_success` — happy path. + * :meth:`emit_failure` — exception path. + + The :func:`capture` context manager emits an envelope on its way out + if the command didn't already, so a forgotten ``emit_*`` call still + produces output (failure path). + """ + + def __init__(self, cap: _CaptureState) -> None: + self._cap = cap + + @property + def is_active(self) -> bool: + return self._cap.result_out is not None or self._cap.result_fd is not None + + def set_resolved_url(self, url: str | None) -> None: + self._cap.resolved_url = url + + def set_record_id(self, value: str | None) -> None: + self._cap.record_id = value + + def extract_record_id_from(self, response: Any) -> None: + """Pull the record id out of a response and stash it.""" + rid = _record_id_from(response) + if rid is not None: + self._cap.record_id = rid + + def mark_dry_run(self) -> None: + self._cap.dry_run = True + + def emit_success(self, *, bc_correlation_id: str | None = None) -> None: + if not self.is_active: + return + env = _build_envelope( + self._cap, + status="succeeded", + exit_code=EXIT_OK, + bc_correlation_id=bc_correlation_id, + ) + _write_if_active(self._cap, env) + + def emit_failure(self, exc: BaseException) -> None: + if not self.is_active: + return + status_code = getattr(exc, "status_code", None) + correlation_id = getattr(exc, "correlation_id", None) + exit_code = exit_code_for_status(status_code) + if exit_code == EXIT_GENERIC_ERROR and status_code is None: + exit_code = EXIT_GENERIC_ERROR + env = _build_envelope( + self._cap, + status="failed", + exit_code=exit_code, + bc_correlation_id=correlation_id, + ) + _write_if_active(self._cap, env) + + +@contextmanager +def capture( + *, + method: str, + endpoint: str, + result_out: Path | None, + result_fd: int | None, +) -> Iterator[Capture]: + """Context manager that captures invocation metadata and writes the envelope. + + Usage: + + with capture(method="POST", endpoint=endpoint, + result_out=result_out, result_fd=result_fd) as cap: + cap.set_resolved_url(try_resolve_url(endpoint, ...)) + try: + result = await ... + cap.extract_record_id_from(result) + cap.emit_success() + except Exception as exc: + cap.emit_failure(exc) + raise + + When the inner block raises without calling ``emit_failure`` (or + ``typer.Exit`` slipping past), the context manager emits a generic + failure envelope on the way out so the side channel always has a + record when the flag was passed. + """ + import time as _time + + real_out = result_out if _is_real_value(result_out) else None + real_fd = result_fd if _is_real_value(result_fd) else None + cap_state = _CaptureState( + started_at=_now_iso_utc(), + started_monotonic_ns=_time.monotonic_ns(), + invocation_id=uuid.uuid4().hex, + method=method, + endpoint=endpoint, + result_out=Path(real_out) if real_out is not None else None, + result_fd=real_fd, + ) + handle = Capture(cap_state) + try: + yield handle + except BaseException as exc: # noqa: BLE001 + # If the command body raised without telling us, salvage an envelope. + # typer.Exit with exit_code=0 still means the command short-circuited + # (dry-run flow) — leave it alone if dry-run was marked + success emitted. + if isinstance(exc, typer.Exit): + # On clean Exit (code 0/None) treat as success if not already + # written; non-zero Exit means a failure path the caller flagged. + code = exc.exit_code or 0 + if not cap_state.written: + if code == 0: + env = _build_envelope( + cap_state, + status="succeeded", + exit_code=EXIT_OK, + bc_correlation_id=None, + ) + else: + env = _build_envelope( + cap_state, + status="failed", + exit_code=int(code) if isinstance(code, int) else EXIT_GENERIC_ERROR, + bc_correlation_id=None, + ) + _write_if_active(cap_state, env) + raise + # Non-Exit exception we didn't see — record it and re-raise. + if not cap_state.written: + handle.emit_failure(exc) + raise + else: + # Clean exit path — caller already invoked emit_success on the happy + # path. If they didn't, write a default success envelope. + if not cap_state.written and ( + cap_state.result_out is not None or cap_state.result_fd is not None + ): + env = _build_envelope( + cap_state, + status="succeeded", + exit_code=EXIT_OK, + bc_correlation_id=None, + ) + _write_if_active(cap_state, env) + + +__all__ = ["Capture", "capture", "validate_flags"] + + +# Re-export for unit tests / direct use: +def record_id_from(body: Any) -> str | None: # pragma: no cover - thin shim + return _record_id_from(body) diff --git a/src/bcli_cli/commands/attach_cmd.py b/src/bcli_cli/commands/attach_cmd.py index 8d0b372..3fbaad5 100644 --- a/src/bcli_cli/commands/attach_cmd.py +++ b/src/bcli_cli/commands/attach_cmd.py @@ -26,6 +26,7 @@ import typer from rich.console import Console +from bcli_cli._envelope_wrap import capture, validate_flags from bcli_cli._state import state from bcli_cli.output import format_output, print_context_banner @@ -47,6 +48,16 @@ def upload_command( standard: bool = typer.Option(False, "--standard", "--no-registry", help="Bypass the custom registry and force Microsoft's standard /api/v2.0/documentAttachments route. Use when a custom page isn't persisting (zero-GUID ids)."), format: Optional[str] = typer.Option(None, "--format", "-f", help="Output format: table, json, csv, ndjson, raw"), yes: bool = typer.Option(False, "--yes", "-y", help="Skip the read-only-profile warning prompt"), + result_out: Optional[Path] = typer.Option( + None, + "--result-out", + help="Write a JSON result envelope to this path (atomic). See AIP §Phase 2.", + ), + result_fd: Optional[int] = typer.Option( + None, + "--result-fd", + help="Write the JSON result envelope to this file descriptor and close it.", + ), ) -> None: """Upload a file as a documentAttachment linked to an existing parent record. @@ -60,6 +71,8 @@ def upload_command( Routing follows the registry — custom entries for ``documentAttachments`` take priority. Force a specific route with ``--publisher/--group/--version``. """ + validate_flags(result_out, result_fd) + output_format = format or state.format state.format = output_format # propagate subcommand -f to dry-run + audit if output_format in ("json", "csv", "ndjson", "raw"): @@ -70,23 +83,12 @@ def upload_command( from bcli_cli._safety import confirm_write_or_exit confirm_write_or_exit("UPLOAD", "documentAttachments", yes=yes) - if state.dry_run: - from bcli_cli._dry_run import render_dry_run - render_dry_run( - "UPLOAD", "documentAttachments", - publisher=publisher, group=group, version=version, - force_standard=standard, - extra={ - "file_path": str(file_path), - "byte_size": file_path.stat().st_size, - "parent_type": parent_type, - "parent_id": parent_id, - "file_name": file_name or file_path.name, - "force_standard": standard, - }, - ) - - try: + with capture( + method="UPLOAD", + endpoint="documentAttachments", + result_out=result_out, + result_fd=result_fd, + ) as cap: from bcli_cli._audit_wrap import audited_write from bcli_cli._url_resolve import try_resolve_url resolved_url = try_resolve_url( @@ -94,32 +96,56 @@ def upload_command( publisher=publisher, group=group, version=version, force_standard=standard, ) - result = asyncio.run(audited_write( - _execute_attach( - file_path=file_path, - parent_type=parent_type, - parent_id=parent_id, - file_name=file_name, - content_type=content_type, - publisher=publisher, - group=group, - version=version, + cap.set_resolved_url(resolved_url) + + if state.dry_run: + from bcli_cli._dry_run import render_dry_run + cap.mark_dry_run() + cap.emit_success() + render_dry_run( + "UPLOAD", "documentAttachments", + publisher=publisher, group=group, version=version, force_standard=standard, - ), - method="UPLOAD", - endpoint="documentAttachments", - body={ - "parent_type": parent_type, - "parent_id": parent_id, - "file_name": file_name or file_path.name, - "byte_size": file_path.stat().st_size, - }, - resolved_url=resolved_url, - )) - format_output([result] if result else [], output_format) - except Exception as e: - console.print(f"[red]Error:[/red] {e}") - raise typer.Exit(1) + extra={ + "file_path": str(file_path), + "byte_size": file_path.stat().st_size, + "parent_type": parent_type, + "parent_id": parent_id, + "file_name": file_name or file_path.name, + "force_standard": standard, + }, + ) + + try: + result = asyncio.run(audited_write( + _execute_attach( + file_path=file_path, + parent_type=parent_type, + parent_id=parent_id, + file_name=file_name, + content_type=content_type, + publisher=publisher, + group=group, + version=version, + force_standard=standard, + ), + method="UPLOAD", + endpoint="documentAttachments", + body={ + "parent_type": parent_type, + "parent_id": parent_id, + "file_name": file_name or file_path.name, + "byte_size": file_path.stat().st_size, + }, + resolved_url=resolved_url, + )) + cap.extract_record_id_from(result) + cap.emit_success() + format_output([result] if result else [], output_format) + except Exception as e: + cap.emit_failure(e) + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) @app.command("test") diff --git a/src/bcli_cli/commands/batch_cmd.py b/src/bcli_cli/commands/batch_cmd.py index 77ad3e9..c01fff4 100644 --- a/src/bcli_cli/commands/batch_cmd.py +++ b/src/bcli_cli/commands/batch_cmd.py @@ -10,6 +10,7 @@ import typer from rich.console import Console +from bcli_cli._envelope_wrap import capture, validate_flags from bcli_cli._state import state from bcli_cli.output import format_output, print_context_banner @@ -137,6 +138,16 @@ def run_batch( set_params: list[str] | None = typer.Option(None, "--set", help="Set parameter: key=value (repeatable)"), params_file: Path | None = typer.Option(None, "--params", help="YAML file with parameter values"), yes: bool = typer.Option(False, "--yes", "-y", help="Skip the read-only-profile prompt for mutating batch steps"), + result_out: Path | None = typer.Option( + None, + "--result-out", + help="Write a JSON result envelope to this path (atomic). See AIP §Phase 2.", + ), + result_fd: int | None = typer.Option( + None, + "--result-fd", + help="Write the JSON result envelope to this file descriptor and close it.", + ), ) -> None: """Execute a YAML batch file (sequence of API calls). @@ -149,6 +160,8 @@ def run_batch( bcli batch run workflow.yaml --params values.yaml bcli batch run workflow.yaml --set vendor_no=V00011 --dry-run """ + validate_flags(result_out, result_fd) + print_context_banner() if not file.is_file(): @@ -165,85 +178,110 @@ def run_batch( from bcli.errors import WorkflowError from bcli.workflow import load_workflow_yaml - try: - raw = load_workflow_yaml(file) - except WorkflowError as e: - console.print(f"[red]Invalid workflow YAML:[/red] {e}") - raise typer.Exit(1) - batch_name = raw.get("name", file.stem) - steps = raw.get("steps", []) - - if not steps: - console.print("[yellow]No steps found in batch file.[/yellow]") - raise typer.Exit() - - # Detect workflow mode: ${{ }} references OR top-level params OR --set/--params flags - is_workflow = ( - "params" in raw - or set_params - or params_file is not None - or _has_references(steps) - ) - - context = None - if is_workflow: - from bcli.workflow._models import WorkflowContext - - params = _build_workflow_params(raw, set_params, params_file) - context = WorkflowContext(params=params) - _validate_step_names(steps) - - console.print(f"[bold]Batch:[/bold] {batch_name}") - if is_workflow and context and context.params: - console.print(f"[dim]Params: {context.params}[/dim]") - console.print(f"[dim]{len(steps)} step(s)[/dim]\n") - - if dry_run or state.dry_run: - _print_dry_run(steps, context) - return - - # Apply the same disable_writes gate that direct post/patch/delete commands - # use. Without this, a profile marked read-only would still execute - # mutating batch steps in non-interactive automation — see vuln-0002. - # We inspect raw steps here (workflow `${{ }}` references are resolved - # later inside `_execute_batch`); any step that statically declares a - # mutating action triggers a single batch-level confirmation. - mutating_actions = {"post", "patch", "delete"} - mutating_steps = [ - step for step in steps - if (step.get("action") or "get").lower() in mutating_actions - ] - if mutating_steps: - from bcli_cli._safety import confirm_write_or_exit - - preview = ", ".join( - f"{(step.get('action') or 'get').upper()} {step.get('endpoint', '?')}" - for step in mutating_steps[:3] + with capture( + method="BATCH_RUN", + endpoint="batch", + result_out=result_out, + result_fd=result_fd, + ) as cap: + try: + raw = load_workflow_yaml(file) + except WorkflowError as e: + console.print(f"[red]Invalid workflow YAML:[/red] {e}") + cap.emit_failure(e) + raise typer.Exit(1) + batch_name = raw.get("name", file.stem) + steps = raw.get("steps", []) + + if not steps: + console.print("[yellow]No steps found in batch file.[/yellow]") + cap.emit_success() + raise typer.Exit() + + # Detect workflow mode: ${{ }} references OR top-level params OR --set/--params flags + is_workflow = ( + "params" in raw + or set_params + or params_file is not None + or _has_references(steps) ) - if len(mutating_steps) > 3: - preview += f", +{len(mutating_steps) - 3} more" - confirm_write_or_exit("BATCH WRITE", preview, yes=yes) - output_format = format - - try: - results = asyncio.run(_execute_batch(steps, context=context, output_format=output_format)) - - succeeded = sum(1 for r in results if r.get("status") == "ok") - console.print(f"\n[green]✓[/green] Batch complete: {succeeded}/{len(steps)} steps succeeded") - - if output: - output.parent.mkdir(parents=True, exist_ok=True) - output_data = { - "batch": batch_name, - "steps": results, - } - output.write_text(json.dumps(output_data, indent=2, default=str)) - console.print(f"[dim]Results saved to {output}[/dim]") - - except Exception as e: - console.print(f"[red]Batch failed:[/red] {e}") - raise typer.Exit(1) + context = None + if is_workflow: + from bcli.workflow._models import WorkflowContext + + params = _build_workflow_params(raw, set_params, params_file) + context = WorkflowContext(params=params) + _validate_step_names(steps) + + console.print(f"[bold]Batch:[/bold] {batch_name}") + if is_workflow and context and context.params: + console.print(f"[dim]Params: {context.params}[/dim]") + console.print(f"[dim]{len(steps)} step(s)[/dim]\n") + + if dry_run or state.dry_run: + _print_dry_run(steps, context) + cap.mark_dry_run() + cap.emit_success() + return + + # Apply the same disable_writes gate that direct post/patch/delete commands + # use. Without this, a profile marked read-only would still execute + # mutating batch steps in non-interactive automation — see vuln-0002. + # We inspect raw steps here (workflow `${{ }}` references are resolved + # later inside `_execute_batch`); any step that statically declares a + # mutating action triggers a single batch-level confirmation. + mutating_actions = {"post", "patch", "delete"} + mutating_steps = [ + step for step in steps + if (step.get("action") or "get").lower() in mutating_actions + ] + if mutating_steps: + from bcli_cli._safety import confirm_write_or_exit + + preview = ", ".join( + f"{(step.get('action') or 'get').upper()} {step.get('endpoint', '?')}" + for step in mutating_steps[:3] + ) + if len(mutating_steps) > 3: + preview += f", +{len(mutating_steps) - 3} more" + confirm_write_or_exit("BATCH WRITE", preview, yes=yes) + + output_format = format + + try: + results = asyncio.run(_execute_batch(steps, context=context, output_format=output_format)) + + succeeded = sum(1 for r in results if r.get("status") == "ok") + console.print(f"\n[green]✓[/green] Batch complete: {succeeded}/{len(steps)} steps succeeded") + + if output: + output.parent.mkdir(parents=True, exist_ok=True) + output_data = { + "batch": batch_name, + "steps": results, + } + output.write_text(json.dumps(output_data, indent=2, default=str)) + console.print(f"[dim]Results saved to {output}[/dim]") + + # Any step that didn't return status="ok" means the batch is not + # fully succeeded — surface that on the envelope so the caller + # knows to consult the (future) ledger or step logs. + if succeeded < len(results): + # Synthetic exception to carry the failure into emit_failure. + err = RuntimeError( + f"{len(results) - succeeded} of {len(results)} steps failed", + ) + cap.emit_failure(err) + raise typer.Exit(1) + cap.emit_success() + + except typer.Exit: + raise + except Exception as e: + console.print(f"[red]Batch failed:[/red] {e}") + cap.emit_failure(e) + raise typer.Exit(1) # ─── Dry run ───────────────────────────────────────────────────────── diff --git a/src/bcli_cli/commands/delete_cmd.py b/src/bcli_cli/commands/delete_cmd.py index 1997fa3..96d2e94 100644 --- a/src/bcli_cli/commands/delete_cmd.py +++ b/src/bcli_cli/commands/delete_cmd.py @@ -3,11 +3,13 @@ from __future__ import annotations import asyncio +from pathlib import Path from typing import Optional import typer from rich.console import Console +from bcli_cli._envelope_wrap import capture, validate_flags from bcli_cli._state import state from bcli_cli.output import print_context_banner @@ -23,8 +25,20 @@ def delete_command( group: Optional[str] = typer.Option(None, "--group", hidden=True), version: Optional[str] = typer.Option(None, "--version", hidden=True), yes: bool = typer.Option(False, "--yes", "-y", help="Skip the read-only-profile warning prompt"), + result_out: Optional[Path] = typer.Option( + None, + "--result-out", + help="Write a JSON result envelope to this path (atomic). See AIP §Phase 2.", + ), + result_fd: Optional[int] = typer.Option( + None, + "--result-fd", + help="Write the JSON result envelope to this file descriptor and close it.", + ), ) -> None: """DELETE a record.""" + validate_flags(result_out, result_fd) + if format: state.format = format # propagate subcommand -f to dry-run + audit if format in ("json", "csv", "ndjson", "raw"): @@ -35,23 +49,44 @@ def delete_command( from bcli_cli._safety import confirm_write_or_exit confirm_write_or_exit("DELETE", endpoint, yes=yes) - if state.dry_run: - from bcli_cli._dry_run import render_dry_run - render_dry_run( - "DELETE", endpoint, record_id=record_id, - publisher=publisher, group=group, version=version, - extra={"etag": etag}, - ) - - try: - asyncio.run(_audited_delete( - endpoint, record_id, - etag=etag, publisher=publisher, group=group, version=version, + with capture( + method="DELETE", + endpoint=endpoint, + result_out=result_out, + result_fd=result_fd, + ) as cap: + from bcli_cli._url_resolve import try_resolve_url + + cap.set_resolved_url(try_resolve_url( + endpoint, + record_id=record_id, + publisher=publisher, + group=group, + version=version, )) - console.print(f"[green]✓[/green] Deleted {endpoint}({record_id})") - except Exception as e: - console.print(f"[red]Error:[/red] {e}") - raise typer.Exit(1) + cap.set_record_id(record_id) + + if state.dry_run: + from bcli_cli._dry_run import render_dry_run + cap.mark_dry_run() + cap.emit_success() + render_dry_run( + "DELETE", endpoint, record_id=record_id, + publisher=publisher, group=group, version=version, + extra={"etag": etag}, + ) + + try: + asyncio.run(_audited_delete( + endpoint, record_id, + etag=etag, publisher=publisher, group=group, version=version, + )) + cap.emit_success() + console.print(f"[green]✓[/green] Deleted {endpoint}({record_id})") + except Exception as e: + cap.emit_failure(e) + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) async def _audited_delete(endpoint, record_id, **kwargs): diff --git a/src/bcli_cli/commands/patch_cmd.py b/src/bcli_cli/commands/patch_cmd.py index fa0bb6d..1b79376 100644 --- a/src/bcli_cli/commands/patch_cmd.py +++ b/src/bcli_cli/commands/patch_cmd.py @@ -10,6 +10,7 @@ import typer from rich.console import Console +from bcli_cli._envelope_wrap import capture, validate_flags from bcli_cli._state import state from bcli_cli.output import format_output, print_context_banner @@ -26,8 +27,20 @@ def patch_command( group: Optional[str] = typer.Option(None, "--group", hidden=True), version: Optional[str] = typer.Option(None, "--version", hidden=True), yes: bool = typer.Option(False, "--yes", "-y", help="Skip the read-only-profile warning prompt"), + result_out: Optional[Path] = typer.Option( + None, + "--result-out", + help="Write a JSON result envelope to this path (atomic). See AIP §Phase 2.", + ), + result_fd: Optional[int] = typer.Option( + None, + "--result-fd", + help="Write the JSON result envelope to this file descriptor and close it.", + ), ) -> None: """PATCH (update) an existing record.""" + validate_flags(result_out, result_fd) + output_format = format or state.format state.format = output_format # propagate subcommand -f to dry-run + audit if output_format in ("json", "csv", "ndjson", "raw"): @@ -40,23 +53,44 @@ def patch_command( body = _parse_data(data) - if state.dry_run: - from bcli_cli._dry_run import render_dry_run - render_dry_run( - "PATCH", endpoint, body=body, record_id=record_id, - publisher=publisher, group=group, version=version, - extra={"etag": etag}, - ) - - try: - result = asyncio.run(_audited_patch( - endpoint, record_id, body, - etag=etag, publisher=publisher, group=group, version=version, + with capture( + method="PATCH", + endpoint=endpoint, + result_out=result_out, + result_fd=result_fd, + ) as cap: + from bcli_cli._url_resolve import try_resolve_url + + cap.set_resolved_url(try_resolve_url( + endpoint, + record_id=record_id, + publisher=publisher, + group=group, + version=version, )) - format_output([result] if result else [], output_format) - except Exception as e: - console.print(f"[red]Error:[/red] {e}") - raise typer.Exit(1) + cap.set_record_id(record_id) + + if state.dry_run: + from bcli_cli._dry_run import render_dry_run + cap.mark_dry_run() + cap.emit_success() + render_dry_run( + "PATCH", endpoint, body=body, record_id=record_id, + publisher=publisher, group=group, version=version, + extra={"etag": etag}, + ) + + try: + result = asyncio.run(_audited_patch( + endpoint, record_id, body, + etag=etag, publisher=publisher, group=group, version=version, + )) + cap.emit_success() + format_output([result] if result else [], output_format) + except Exception as e: + cap.emit_failure(e) + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) async def _audited_patch(endpoint, record_id, body, **kwargs): diff --git a/src/bcli_cli/commands/post_cmd.py b/src/bcli_cli/commands/post_cmd.py index 01881dd..28e8da5 100644 --- a/src/bcli_cli/commands/post_cmd.py +++ b/src/bcli_cli/commands/post_cmd.py @@ -10,6 +10,7 @@ import typer from rich.console import Console +from bcli_cli._envelope_wrap import capture, validate_flags from bcli_cli._state import state from bcli_cli.output import format_output, print_context_banner @@ -24,8 +25,20 @@ def post_command( group: Optional[str] = typer.Option(None, "--group", hidden=True), version: Optional[str] = typer.Option(None, "--version", hidden=True), yes: bool = typer.Option(False, "--yes", "-y", help="Skip the read-only-profile warning prompt"), + result_out: Optional[Path] = typer.Option( + None, + "--result-out", + help="Write a JSON result envelope to this path (atomic). See AIP §Phase 2.", + ), + result_fd: Optional[int] = typer.Option( + None, + "--result-fd", + help="Write the JSON result envelope to this file descriptor and close it.", + ), ) -> None: """POST (create) a new record.""" + validate_flags(result_out, result_fd) + output_format = format or state.format state.format = output_format # propagate subcommand -f to dry-run + audit if output_format in ("json", "csv", "ndjson", "raw"): @@ -38,19 +51,42 @@ def post_command( body = _parse_data(data) - if state.dry_run: - from bcli_cli._dry_run import render_dry_run - render_dry_run( - "POST", endpoint, body=body, - publisher=publisher, group=group, version=version, - ) - - try: - result = asyncio.run(_audited_post(endpoint, body, publisher=publisher, group=group, version=version)) - format_output([result] if result else [], output_format) - except Exception as e: - console.print(f"[red]Error:[/red] {e}") - raise typer.Exit(1) + with capture( + method="POST", + endpoint=endpoint, + result_out=result_out, + result_fd=result_fd, + ) as cap: + from bcli_cli._url_resolve import try_resolve_url + + cap.set_resolved_url(try_resolve_url( + endpoint, + publisher=publisher, + group=group, + version=version, + )) + + if state.dry_run: + from bcli_cli._dry_run import render_dry_run + cap.mark_dry_run() + cap.emit_success() + render_dry_run( + "POST", endpoint, body=body, + publisher=publisher, group=group, version=version, + ) + + try: + result = asyncio.run(_audited_post( + endpoint, body, + publisher=publisher, group=group, version=version, + )) + cap.extract_record_id_from(result) + cap.emit_success() + format_output([result] if result else [], output_format) + except Exception as e: + cap.emit_failure(e) + console.print(f"[red]Error:[/red] {e}") + raise typer.Exit(1) async def _audited_post(endpoint, body, **kwargs): diff --git a/tests/test_envelope/__init__.py b/tests/test_envelope/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_envelope/conftest.py b/tests/test_envelope/conftest.py new file mode 100644 index 0000000..1765121 --- /dev/null +++ b/tests/test_envelope/conftest.py @@ -0,0 +1,92 @@ +"""Shared envelope-tests fixtures. + +Sets up a writable, sandboxed profile and stubs ``state.make_async_client`` +so the mutation commands can run end-to-end against a fake transport. +""" + +from __future__ import annotations + +from pathlib import Path +from unittest.mock import AsyncMock + +import pytest + +from bcli.config._model import BCConfig, BCDefaults, BCProfile +from bcli_cli._state import state + + +@pytest.fixture +def cli_state(monkeypatch): + """Active CLI state pointing at a writable Sandbox profile.""" + cfg = BCConfig( + defaults=BCDefaults(profile="dev"), + profiles={ + "dev": BCProfile( + tenant_id="t1", + environment="Sandbox", + company_id="c-123", + disable_writes=False, + ), + }, + ) + state._config = cfg + state._registry = None + state._telemetry = None + state.profile_name = None + state.env_override = None + state.company_override = None + state.format = "table" + state.dry_run = False + state.quiet = False + yield state + state._config = None + state._registry = None + state._telemetry = None + state.profile_name = None + state.dry_run = False + + +@pytest.fixture +def fake_client(): + """Async client with mockable post/patch/delete/upload_attachment. + + Default success returns include a ``systemId`` so the envelope can + pick a record_id; tests can override per-test. + """ + c = AsyncMock() + c.__aenter__ = AsyncMock(return_value=c) + c.__aexit__ = AsyncMock(return_value=False) + c.post = AsyncMock(return_value={"systemId": "vnd-9", "displayName": "Acme"}) + c.patch = AsyncMock(return_value={"systemId": "vnd-9", "displayName": "Acme2"}) + c.delete = AsyncMock(return_value=None) + c.upload_attachment = AsyncMock( + return_value={"id": "att-9", "fileName": "x.pdf", "byteSize": 12} + ) + c._resolve_url = lambda entity, **kw: f"https://example.test/{entity}" + return c + + +@pytest.fixture +def stub_client(cli_state, fake_client, monkeypatch): + """Patch state.make_async_client to return the fake client.""" + monkeypatch.setattr(state, "make_async_client", lambda **_: fake_client) + return fake_client + + +@pytest.fixture +def stub_resolve_url(monkeypatch): + """Stub ``try_resolve_url`` so we don't need a fully-wired client to + capture the URL for dry-run / failure tests.""" + def _fake(endpoint, **kw): + rid = kw.get("record_id") + suffix = f"({rid})" if rid else "" + return f"https://example.test/{endpoint}{suffix}" + monkeypatch.setattr("bcli_cli._url_resolve.try_resolve_url", _fake) + yield _fake + + +def write_yaml_file(tmp_path: Path, name: str, content: str) -> Path: + import textwrap + f = tmp_path / name + f.write_text(textwrap.dedent(content).strip(), encoding="utf-8") + return f diff --git a/tests/test_envelope/test_envelope_other_verbs.py b/tests/test_envelope/test_envelope_other_verbs.py new file mode 100644 index 0000000..5caf122 --- /dev/null +++ b/tests/test_envelope/test_envelope_other_verbs.py @@ -0,0 +1,241 @@ +"""Envelope coverage for patch, delete, attach upload, and batch run. + +The detailed shape/atomicity contract is pinned in ``test_envelope_post.py``. +Here we only verify the flags exist and the envelope reflects the right +method/endpoint/status for the other verbs. +""" + +from __future__ import annotations + +import json +from pathlib import Path + +import pytest +import typer + +from bcli.errors import ValidationError +from bcli.exit_codes import EXIT_REMOTE_4XX +from bcli_cli.commands import attach_cmd, batch_cmd, delete_cmd, patch_cmd + + +@pytest.fixture(autouse=True) +def force_non_interactive(monkeypatch): + import sys + monkeypatch.setattr(sys.stdin, "isatty", lambda: False) + + +# ── PATCH ────────────────────────────────────────────────────────────── + + +def _run_patch(**kwargs): + defaults = dict( + endpoint="vendors", + record_id="vnd-1", + data='{"displayName": "Renamed"}', + etag="*", + format=None, + publisher=None, + group=None, + version=None, + yes=False, + result_out=None, + result_fd=None, + ) + defaults.update(kwargs) + return patch_cmd.patch_command(**defaults) + + +class TestPatchEnvelope: + def test_patch_writes_envelope(self, stub_client, tmp_path: Path): + out = tmp_path / "out.json" + _run_patch(result_out=out) + env = json.loads(out.read_text()) + assert env["method"] == "PATCH" + assert env["endpoint"] == "vendors" + assert env["record_id"] == "vnd-1" + assert env["status"] == "succeeded" + assert env["exit_code"] == 0 + + def test_patch_envelope_on_failure(self, stub_client, tmp_path: Path): + stub_client.patch.side_effect = ValidationError( + "etag mismatch", status_code=412, correlation_id="corr-patch", + ) + out = tmp_path / "out.json" + with pytest.raises(typer.Exit): + _run_patch(result_out=out) + env = json.loads(out.read_text()) + assert env["status"] == "failed" + assert env["exit_code"] == EXIT_REMOTE_4XX + assert env["bc_correlation_id"] == "corr-patch" + + +# ── DELETE ───────────────────────────────────────────────────────────── + + +def _run_delete(**kwargs): + defaults = dict( + endpoint="vendors", + record_id="vnd-1", + etag="*", + format=None, + publisher=None, + group=None, + version=None, + yes=False, + result_out=None, + result_fd=None, + ) + defaults.update(kwargs) + return delete_cmd.delete_command(**defaults) + + +class TestDeleteEnvelope: + def test_delete_writes_envelope(self, stub_client, tmp_path: Path): + out = tmp_path / "out.json" + _run_delete(result_out=out) + env = json.loads(out.read_text()) + assert env["method"] == "DELETE" + assert env["endpoint"] == "vendors" + assert env["record_id"] == "vnd-1" + assert env["status"] == "succeeded" + + def test_delete_envelope_on_failure(self, stub_client, tmp_path: Path): + stub_client.delete.side_effect = ValidationError( + "not found", status_code=404, correlation_id="corr-del", + ) + out = tmp_path / "out.json" + with pytest.raises(typer.Exit): + _run_delete(result_out=out) + env = json.loads(out.read_text()) + assert env["status"] == "failed" + assert env["exit_code"] == EXIT_REMOTE_4XX + assert env["bc_correlation_id"] == "corr-del" + + +# ── ATTACH UPLOAD ────────────────────────────────────────────────────── + + +def _run_upload(file_path: Path, **kwargs): + defaults = dict( + file_path=file_path, + parent_id="inv-1", + parent_type="Purchase Invoice", + file_name=None, + content_type=None, + publisher=None, + group=None, + version=None, + standard=False, + format=None, + yes=False, + result_out=None, + result_fd=None, + ) + defaults.update(kwargs) + return attach_cmd.upload_command(**defaults) + + +class TestAttachUploadEnvelope: + def test_attach_upload_writes_envelope(self, stub_client, tmp_path: Path): + pdf = tmp_path / "x.pdf" + pdf.write_bytes(b"%PDF-fake") + out = tmp_path / "out.json" + _run_upload(file_path=pdf, result_out=out) + env = json.loads(out.read_text()) + assert env["method"] == "UPLOAD" + assert env["endpoint"] == "documentAttachments" + assert env["status"] == "succeeded" + # record_id should reflect the attachment id ('id' from the response) + assert env["record_id"] == "att-9" + + def test_attach_upload_envelope_on_failure( + self, stub_client, tmp_path: Path, + ): + stub_client.upload_attachment.side_effect = ValidationError( + "parent not found", + status_code=404, + correlation_id="corr-att", + ) + pdf = tmp_path / "x.pdf" + pdf.write_bytes(b"%PDF-fake") + out = tmp_path / "out.json" + with pytest.raises(typer.Exit): + _run_upload(file_path=pdf, result_out=out) + env = json.loads(out.read_text()) + assert env["status"] == "failed" + assert env["exit_code"] == EXIT_REMOTE_4XX + assert env["bc_correlation_id"] == "corr-att" + + +# ── BATCH RUN ────────────────────────────────────────────────────────── + + +def _write_batch_yaml(tmp_path: Path, body: str) -> Path: + import textwrap + f = tmp_path / "batch.yaml" + f.write_text(textwrap.dedent(body).strip(), encoding="utf-8") + return f + + +def _run_batch(file: Path, **kwargs): + defaults = dict( + file=file, + dry_run=False, + output=None, + format=None, + set_params=None, + params_file=None, + yes=True, # bypass the confirmation prompt in tests + result_out=None, + result_fd=None, + ) + defaults.update(kwargs) + return batch_cmd.run_batch(**defaults) + + +class TestBatchRunEnvelope: + def test_batch_run_writes_envelope_all_ok( + self, stub_client, tmp_path: Path, + ): + yaml_file = _write_batch_yaml(tmp_path, """ + name: ok + steps: + - name: create_vendor + action: post + endpoint: vendors + data: {displayName: Acme} + """) + out = tmp_path / "out.json" + _run_batch(yaml_file, result_out=out) + env = json.loads(out.read_text()) + assert env["method"] == "BATCH_RUN" + # Batch envelope identifies the manifest stem, not a single endpoint + assert env["endpoint"] == "batch" + assert env["status"] == "succeeded" + assert env["exit_code"] == 0 + # Aggregate fields not applicable for batch envelope + assert env["record_id"] is None + assert env["resolved_url"] is None + + def test_batch_run_envelope_failed_when_any_step_fails( + self, stub_client, tmp_path: Path, + ): + stub_client.post.side_effect = ValidationError( + "boom", status_code=400, correlation_id="corr-batch", + ) + yaml_file = _write_batch_yaml(tmp_path, """ + name: oops + steps: + - name: create_vendor + action: post + endpoint: vendors + data: {displayName: Acme} + """) + out = tmp_path / "out.json" + with pytest.raises(typer.Exit): + _run_batch(yaml_file, result_out=out) + env = json.loads(out.read_text()) + assert env["status"] == "failed" + # batch failure is a generic exit 1 (Phase 3 will surface step-level + # info via the ledger; we don't reach for 6/7 at batch level here) + assert env["exit_code"] == 1 diff --git a/tests/test_envelope/test_envelope_post.py b/tests/test_envelope/test_envelope_post.py new file mode 100644 index 0000000..4cfafba --- /dev/null +++ b/tests/test_envelope/test_envelope_post.py @@ -0,0 +1,289 @@ +"""Envelope behaviour for ``bcli post``. + +The post command is the canonical example. Tests here pin: + +* envelope written to ``--result-out PATH`` (success path). +* envelope schema (all spec fields present). +* atomic write contract (uses ``os.replace``). +* stdout output unaffected by the envelope flag. +* envelope written via ``--result-fd N``. +* envelope on HTTP failure (status="failed", correct exit_code). +* envelope on dry_run (no HTTP call, status="succeeded"). +* mutual exclusion of ``--result-out`` and ``--result-fd``. +* ``record_id`` extracted from response body. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from unittest.mock import AsyncMock + +import pytest +import typer + +from bcli.errors import ServerError, ValidationError +from bcli.exit_codes import EXIT_REMOTE_4XX, EXIT_REMOTE_5XX +from bcli.result_envelope import ENVELOPE_VERSION +from bcli_cli._state import state +from bcli_cli.commands import post_cmd + + +@pytest.fixture(autouse=True) +def force_non_interactive(monkeypatch): + """Stdin isn't a TTY in CI — match that in tests too.""" + import sys + monkeypatch.setattr(sys.stdin, "isatty", lambda: False) + + +def _run_post(endpoint="vendors", data='{"displayName": "Acme"}', **kwargs): + kwargs.setdefault("format", None) + kwargs.setdefault("publisher", None) + kwargs.setdefault("group", None) + kwargs.setdefault("version", None) + kwargs.setdefault("yes", False) + kwargs.setdefault("result_out", None) + kwargs.setdefault("result_fd", None) + return post_cmd.post_command( + endpoint=endpoint, + data=data, + **kwargs, + ) + + +class TestEnvelopeOnSuccess: + def test_writes_envelope_to_path(self, stub_client, tmp_path: Path): + out = tmp_path / "out.json" + _run_post(result_out=out) + assert out.is_file() + env = json.loads(out.read_text()) + assert env["version"] == ENVELOPE_VERSION + assert env["method"] == "POST" + assert env["endpoint"] == "vendors" + assert env["status"] == "succeeded" + assert env["exit_code"] == 0 + + def test_envelope_has_all_spec_fields(self, stub_client, tmp_path: Path): + out = tmp_path / "out.json" + _run_post(result_out=out) + env = json.loads(out.read_text()) + for field in ( + "version", "invocation_id", "tool_version", "profile", + "environment", "company", "method", "endpoint", "resolved_url", + "record_id", "dry_run", "status", "exit_code", "bc_correlation_id", + "telemetry_event_id", "audit_log_offset", "started_at", "duration_ms", + ): + assert field in env, f"Missing field: {field}" + # invocation_id should look like a uuid hex + assert isinstance(env["invocation_id"], str) + assert len(env["invocation_id"]) >= 16 + + def test_envelope_captures_profile_environment_company( + self, stub_client, tmp_path: Path, + ): + out = tmp_path / "out.json" + _run_post(result_out=out) + env = json.loads(out.read_text()) + assert env["profile"] == "dev" + assert env["environment"] == "Sandbox" + assert env["company"] == "c-123" + + def test_record_id_extracted_from_systemid( + self, stub_client, tmp_path: Path, + ): + stub_client.post.return_value = {"systemId": "vnd-12345", "x": 1} + out = tmp_path / "out.json" + _run_post(result_out=out) + env = json.loads(out.read_text()) + assert env["record_id"] == "vnd-12345" + + def test_record_id_falls_back_to_id_field( + self, stub_client, tmp_path: Path, + ): + stub_client.post.return_value = {"id": "vnd-fallback"} + out = tmp_path / "out.json" + _run_post(result_out=out) + env = json.loads(out.read_text()) + assert env["record_id"] == "vnd-fallback" + + def test_record_id_null_when_response_has_no_id_field( + self, stub_client, tmp_path: Path, + ): + stub_client.post.return_value = {"opaqueResult": True} + out = tmp_path / "out.json" + _run_post(result_out=out) + env = json.loads(out.read_text()) + assert env["record_id"] is None + + def test_resolved_url_populated_on_success( + self, stub_client, tmp_path: Path, + ): + out = tmp_path / "out.json" + _run_post(result_out=out) + env = json.loads(out.read_text()) + # try_resolve_url -> AsyncBCClient._resolve_url stub returned this + assert env["resolved_url"] == "https://example.test/vendors" + + def test_started_at_and_duration_present( + self, stub_client, tmp_path: Path, + ): + out = tmp_path / "out.json" + _run_post(result_out=out) + env = json.loads(out.read_text()) + assert env["started_at"] is not None + assert isinstance(env["duration_ms"], int) + assert env["duration_ms"] >= 0 + + def test_dry_run_field_false_on_real_write( + self, stub_client, tmp_path: Path, + ): + out = tmp_path / "out.json" + _run_post(result_out=out) + env = json.loads(out.read_text()) + assert env["dry_run"] is False + + +class TestAtomicity: + def test_uses_os_replace(self, stub_client, tmp_path: Path, monkeypatch): + out = tmp_path / "out.json" + real_replace = os.replace + calls: list[tuple[str, str]] = [] + + def spy(src, dst): + calls.append((str(src), str(dst))) + return real_replace(src, dst) + + monkeypatch.setattr("bcli.result_envelope.os.replace", spy) + _run_post(result_out=out) + assert calls, "Expected at least one os.replace call" + _src, dst = calls[-1] + assert dst == str(out) + + +class TestStdoutUnaffected: + def test_stdout_does_not_contain_envelope( + self, stub_client, tmp_path: Path, capsys, + ): + """The envelope writes to its own channel — stdout still follows + ``--format`` (here: default table render). Critically, none of the + envelope JSON keys leak into stdout.""" + out = tmp_path / "out.json" + _run_post(result_out=out, format="json") + captured = capsys.readouterr() + # Envelope-only keys must not appear in stdout + assert "invocation_id" not in captured.out + assert "tool_version" not in captured.out + assert "duration_ms" not in captured.out + + +class TestResultFdMode: + def test_writes_envelope_to_fd(self, stub_client, tmp_path: Path): + r, w = os.pipe() + try: + _run_post(result_fd=w) + buf = b"" + while True: + chunk = os.read(r, 65536) + if not chunk: + break + buf += chunk + env = json.loads(buf) + assert env["method"] == "POST" + assert env["status"] == "succeeded" + finally: + os.close(r) + + +class TestMutualExclusion: + def test_result_out_and_result_fd_mutually_exclusive( + self, stub_client, tmp_path: Path, + ): + with pytest.raises(typer.BadParameter): + _run_post(result_out=tmp_path / "x.json", result_fd=2) + + +class TestEnvelopeOnFailure: + def test_envelope_on_http_4xx(self, stub_client, tmp_path: Path): + stub_client.post.side_effect = ValidationError( + "field 'displayName' missing", + status_code=400, + correlation_id="corr-400", + ) + out = tmp_path / "out.json" + with pytest.raises(typer.Exit): + _run_post(result_out=out) + assert out.is_file() + env = json.loads(out.read_text()) + assert env["status"] == "failed" + assert env["exit_code"] == EXIT_REMOTE_4XX + assert env["bc_correlation_id"] == "corr-400" + + def test_envelope_on_http_5xx(self, stub_client, tmp_path: Path): + stub_client.post.side_effect = ServerError( + "boom", + status_code=502, + correlation_id="corr-5xx", + ) + out = tmp_path / "out.json" + with pytest.raises(typer.Exit): + _run_post(result_out=out) + env = json.loads(out.read_text()) + assert env["status"] == "failed" + assert env["exit_code"] == EXIT_REMOTE_5XX + assert env["bc_correlation_id"] == "corr-5xx" + + def test_envelope_on_generic_exception(self, stub_client, tmp_path: Path): + stub_client.post.side_effect = RuntimeError("oops") + out = tmp_path / "out.json" + with pytest.raises(typer.Exit): + _run_post(result_out=out) + env = json.loads(out.read_text()) + assert env["status"] == "failed" + # Generic errors default to exit_code=1 + assert env["exit_code"] == 1 + + +class TestEnvelopeOnDryRun: + def test_dry_run_writes_envelope_no_http( + self, cli_state, stub_resolve_url, tmp_path: Path, monkeypatch, + ): + """``--dry-run`` short-circuits HTTP but still drops an envelope so + an agent driving the CLI can verify the request shape was correct + without parsing stdout prose.""" + called = {"post": False} + + async def _no_call(*a, **kw): + called["post"] = True + return {} + + fake = AsyncMock() + fake.__aenter__ = AsyncMock(return_value=fake) + fake.__aexit__ = AsyncMock(return_value=False) + fake.post = _no_call + fake._resolve_url = lambda entity, **kw: f"https://example.test/{entity}" + monkeypatch.setattr(state, "make_async_client", lambda **_: fake) + + state.dry_run = True + out = tmp_path / "out.json" + with pytest.raises(typer.Exit) as exc: + _run_post(result_out=out) + # render_dry_run uses typer.Exit(); exit_code attr will be 0/None + assert (exc.value.exit_code or 0) == 0 + assert not called["post"], "Dry-run must NOT issue the HTTP call" + env = json.loads(out.read_text()) + assert env["dry_run"] is True + assert env["status"] == "succeeded" + assert env["exit_code"] == 0 + + +class TestAddedFlagIsOptional: + def test_envelope_not_written_when_neither_flag_set( + self, stub_client, tmp_path: Path, + ): + """Backward-compat — calling post_command without the envelope flags + leaves no envelope file anywhere. The flag is strictly opt-in.""" + _run_post() + # No files should be created in tmp_path + files = list(tmp_path.glob("**/*")) + assert files == [], f"Expected no envelope output, found {files}" diff --git a/tests/test_envelope/test_result_envelope.py b/tests/test_envelope/test_result_envelope.py new file mode 100644 index 0000000..2b3d805 --- /dev/null +++ b/tests/test_envelope/test_result_envelope.py @@ -0,0 +1,165 @@ +"""Tests for the core ``bcli.result_envelope`` module. + +Covers the data class shape, atomic write to ``--result-out PATH``, and +write+close semantics for ``--result-fd N``. CLI integration tests live +in ``test_envelope_post.py`` and friends. +""" + +from __future__ import annotations + +import json +import os +from dataclasses import asdict, fields +from pathlib import Path + +import pytest + +from bcli.result_envelope import ( + ENVELOPE_VERSION, + ResultEnvelope, + write_envelope, +) + + +SPEC_FIELDS = { + "version", + "invocation_id", + "tool_version", + "profile", + "environment", + "company", + "method", + "endpoint", + "resolved_url", + "record_id", + "dry_run", + "status", + "exit_code", + "bc_correlation_id", + "telemetry_event_id", + "audit_log_offset", + "started_at", + "duration_ms", +} + + +def _make_envelope(**overrides) -> ResultEnvelope: + base = dict( + version=ENVELOPE_VERSION, + invocation_id="inv-1", + tool_version="0.3.0", + profile="dev", + environment="Sandbox", + company="c-1", + method="POST", + endpoint="vendors", + resolved_url="https://example.test/vendors", + record_id="vnd-1", + dry_run=False, + status="succeeded", + exit_code=0, + bc_correlation_id=None, + telemetry_event_id=None, + audit_log_offset=None, + started_at="2026-05-16T00:00:00Z", + duration_ms=42, + ) + base.update(overrides) + return ResultEnvelope(**base) + + +class TestEnvelopeShape: + def test_has_all_spec_fields(self): + names = {f.name for f in fields(ResultEnvelope)} + assert SPEC_FIELDS.issubset(names), SPEC_FIELDS - names + + def test_envelope_is_frozen_dataclass(self): + env = _make_envelope() + with pytest.raises(Exception): + env.method = "PATCH" # type: ignore[misc] + + +class TestAtomicWriteToPath: + def test_writes_json_file_to_path(self, tmp_path: Path): + env = _make_envelope() + out = tmp_path / "out.json" + write_envelope(env, path=out) + assert out.is_file() + loaded = json.loads(out.read_text()) + assert loaded == asdict(env) + + def test_creates_parent_directories(self, tmp_path: Path): + env = _make_envelope() + out = tmp_path / "nested" / "dir" / "out.json" + write_envelope(env, path=out) + assert out.is_file() + assert json.loads(out.read_text())["method"] == "POST" + + def test_uses_tmp_plus_rename_atomic(self, tmp_path: Path, monkeypatch): + """The envelope writer must use ``os.replace`` so a SIGKILL between + ``write`` and ``rename`` never leaves a half-written file on the + documented output path. + """ + env = _make_envelope() + out = tmp_path / "out.json" + + calls: list[tuple[str, str]] = [] + real_replace = os.replace + + def spy_replace(src, dst): + calls.append((str(src), str(dst))) + return real_replace(src, dst) + + monkeypatch.setattr("bcli.result_envelope.os.replace", spy_replace) + write_envelope(env, path=out) + + assert len(calls) == 1 + src, dst = calls[0] + assert dst == str(out) + assert src != str(out) # tmp file != final path + assert src.startswith(str(out)) # tmp file colocated with target + + def test_overwrites_existing_envelope(self, tmp_path: Path): + out = tmp_path / "out.json" + out.write_text('{"stale": true}') + write_envelope(_make_envelope(method="PATCH"), path=out) + loaded = json.loads(out.read_text()) + assert loaded["method"] == "PATCH" + + +class TestWriteToFd: + def test_writes_envelope_to_file_descriptor(self): + r, w = os.pipe() + try: + write_envelope(_make_envelope(record_id="vnd-9"), fd=w) + # write_envelope closes the fd it was handed + data = os.read(r, 65536) + loaded = json.loads(data) + assert loaded["record_id"] == "vnd-9" + finally: + os.close(r) + # fd ``w`` was closed by write_envelope; closing again is fine + try: + os.close(w) + except OSError: + pass + + def test_fd_is_closed_after_write(self): + r, w = os.pipe() + try: + write_envelope(_make_envelope(), fd=w) + # subsequent write should fail because fd is closed + with pytest.raises(OSError): + os.write(w, b"x") + finally: + os.close(r) + + +class TestArgValidation: + def test_rejects_both_path_and_fd(self, tmp_path: Path): + with pytest.raises(ValueError): + write_envelope(_make_envelope(), path=tmp_path / "x.json", fd=2) + + def test_rejects_neither_path_nor_fd(self): + with pytest.raises(ValueError): + write_envelope(_make_envelope()) From 113200e9a617d534528ccec9311a5aea59d29675 Mon Sep 17 00:00:00 2001 From: igor-ctrl Date: Sat, 16 May 2026 20:17:58 -0500 Subject: [PATCH 2/2] fix(envelope): emit envelope on policy-violation exit (PR #15 review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lead's review caught: a read-only profile (`disable_writes=true`) + non-interactive + no `--yes` runs `confirm_write_or_exit` BEFORE the `capture(...)` block, so the command exits 1 with no envelope file. An agent runtime can't distinguish that from a true crash — it should see `status="failed"` like any other refused write. Fix: move `confirm_write_or_exit` *inside* the `with capture(...)` block on `post`, `patch`, `delete`, and `attach upload`. The salvage path in `_envelope_wrap.capture` already turns a `typer.Exit(1)` into a `status="failed"` envelope, so the gate just had to move into the captured region. `batch run` already had it correct — its `confirm_write_or_exit` was already inside the capture block. Tests: 4 new under `tests/test_envelope/test_envelope_policy_violation.py` (one per verb). Each pins: - the command exits with code 1 - the envelope file exists on disk - `status="failed"`, `exit_code=1`, profile/environment captured - the underlying client write method was NOT awaited `exit_code` stays at 1 for now per lead's note — Phase 4 (Worker A) owns the taxonomy rename to `EXIT_POLICY` (8). The envelope shape remains stable; only the int constant changes later. Totals: 671 passed (was 667; +4 new), 5 skipped. Ruff clean. --- src/bcli_cli/commands/attach_cmd.py | 8 +- src/bcli_cli/commands/delete_cmd.py | 8 +- src/bcli_cli/commands/patch_cmd.py | 8 +- src/bcli_cli/commands/post_cmd.py | 10 +- tests/test_envelope/conftest.py | 36 ++++ .../test_envelope_policy_violation.py | 162 ++++++++++++++++++ 6 files changed, 220 insertions(+), 12 deletions(-) create mode 100644 tests/test_envelope/test_envelope_policy_violation.py diff --git a/src/bcli_cli/commands/attach_cmd.py b/src/bcli_cli/commands/attach_cmd.py index 3fbaad5..14d8ebd 100644 --- a/src/bcli_cli/commands/attach_cmd.py +++ b/src/bcli_cli/commands/attach_cmd.py @@ -80,9 +80,6 @@ def upload_command( print_context_banner() - from bcli_cli._safety import confirm_write_or_exit - confirm_write_or_exit("UPLOAD", "documentAttachments", yes=yes) - with capture( method="UPLOAD", endpoint="documentAttachments", @@ -90,6 +87,7 @@ def upload_command( result_fd=result_fd, ) as cap: from bcli_cli._audit_wrap import audited_write + from bcli_cli._safety import confirm_write_or_exit from bcli_cli._url_resolve import try_resolve_url resolved_url = try_resolve_url( "documentAttachments", @@ -98,6 +96,10 @@ def upload_command( ) cap.set_resolved_url(resolved_url) + # Policy gate runs *inside* the capture block so a refused write + # still emits a failed envelope. See PR #15 review. + confirm_write_or_exit("UPLOAD", "documentAttachments", yes=yes) + if state.dry_run: from bcli_cli._dry_run import render_dry_run cap.mark_dry_run() diff --git a/src/bcli_cli/commands/delete_cmd.py b/src/bcli_cli/commands/delete_cmd.py index 96d2e94..3f17693 100644 --- a/src/bcli_cli/commands/delete_cmd.py +++ b/src/bcli_cli/commands/delete_cmd.py @@ -46,15 +46,13 @@ def delete_command( print_context_banner() - from bcli_cli._safety import confirm_write_or_exit - confirm_write_or_exit("DELETE", endpoint, yes=yes) - with capture( method="DELETE", endpoint=endpoint, result_out=result_out, result_fd=result_fd, ) as cap: + from bcli_cli._safety import confirm_write_or_exit from bcli_cli._url_resolve import try_resolve_url cap.set_resolved_url(try_resolve_url( @@ -66,6 +64,10 @@ def delete_command( )) cap.set_record_id(record_id) + # Policy gate runs *inside* the capture block so a refused write + # still emits a failed envelope. See PR #15 review. + confirm_write_or_exit("DELETE", endpoint, yes=yes) + if state.dry_run: from bcli_cli._dry_run import render_dry_run cap.mark_dry_run() diff --git a/src/bcli_cli/commands/patch_cmd.py b/src/bcli_cli/commands/patch_cmd.py index 1b79376..be8a914 100644 --- a/src/bcli_cli/commands/patch_cmd.py +++ b/src/bcli_cli/commands/patch_cmd.py @@ -48,9 +48,6 @@ def patch_command( print_context_banner() - from bcli_cli._safety import confirm_write_or_exit - confirm_write_or_exit("PATCH", endpoint, yes=yes) - body = _parse_data(data) with capture( @@ -59,6 +56,7 @@ def patch_command( result_out=result_out, result_fd=result_fd, ) as cap: + from bcli_cli._safety import confirm_write_or_exit from bcli_cli._url_resolve import try_resolve_url cap.set_resolved_url(try_resolve_url( @@ -70,6 +68,10 @@ def patch_command( )) cap.set_record_id(record_id) + # Policy gate runs *inside* the capture block so a refused write + # still emits a failed envelope. See PR #15 review. + confirm_write_or_exit("PATCH", endpoint, yes=yes) + if state.dry_run: from bcli_cli._dry_run import render_dry_run cap.mark_dry_run() diff --git a/src/bcli_cli/commands/post_cmd.py b/src/bcli_cli/commands/post_cmd.py index 28e8da5..0a054b6 100644 --- a/src/bcli_cli/commands/post_cmd.py +++ b/src/bcli_cli/commands/post_cmd.py @@ -46,9 +46,6 @@ def post_command( print_context_banner() - from bcli_cli._safety import confirm_write_or_exit - confirm_write_or_exit("POST", endpoint, yes=yes) - body = _parse_data(data) with capture( @@ -57,6 +54,7 @@ def post_command( result_out=result_out, result_fd=result_fd, ) as cap: + from bcli_cli._safety import confirm_write_or_exit from bcli_cli._url_resolve import try_resolve_url cap.set_resolved_url(try_resolve_url( @@ -66,6 +64,12 @@ def post_command( version=version, )) + # Policy gate runs *inside* the capture block so a refused write + # still emits a failed envelope. The ``capture()`` salvage path + # turns the ``typer.Exit(1)`` into a status="failed" envelope. + # See PR #15 review. + confirm_write_or_exit("POST", endpoint, yes=yes) + if state.dry_run: from bcli_cli._dry_run import render_dry_run cap.mark_dry_run() diff --git a/tests/test_envelope/conftest.py b/tests/test_envelope/conftest.py index 1765121..fb834ec 100644 --- a/tests/test_envelope/conftest.py +++ b/tests/test_envelope/conftest.py @@ -46,6 +46,42 @@ def cli_state(monkeypatch): state.dry_run = False +@pytest.fixture +def readonly_state(monkeypatch): + """Active CLI state pointing at a read-only Production profile. + + Drives the policy-violation envelope tests: ``disable_writes=true`` + + non-interactive + no ``--yes`` must produce a failed envelope, not a + silent exit. + """ + cfg = BCConfig( + defaults=BCDefaults(profile="prod"), + profiles={ + "prod": BCProfile( + tenant_id="t1", + environment="Production", + company_id="c-999", + disable_writes=True, + ), + }, + ) + state._config = cfg + state._registry = None + state._telemetry = None + state.profile_name = None + state.env_override = None + state.company_override = None + state.format = "table" + state.dry_run = False + state.quiet = True + yield state + state._config = None + state._registry = None + state._telemetry = None + state.profile_name = None + state.dry_run = False + + @pytest.fixture def fake_client(): """Async client with mockable post/patch/delete/upload_attachment. diff --git a/tests/test_envelope/test_envelope_policy_violation.py b/tests/test_envelope/test_envelope_policy_violation.py new file mode 100644 index 0000000..ca88e45 --- /dev/null +++ b/tests/test_envelope/test_envelope_policy_violation.py @@ -0,0 +1,162 @@ +"""Policy-violation envelope tests — PR #15 review fix. + +A refused write IS an outcome and must produce a ``status="failed"`` +envelope. Before the fix, ``confirm_write_or_exit`` ran *before* the +``capture(...)`` block, so a read-only profile + non-interactive + no +``--yes`` exited with code 1 and **no envelope file**, which an agent +runtime can't distinguish from "the command crashed before writing +anything." + +The fix moves the gate inside ``capture()``; the salvage path in +``_envelope_wrap.capture`` catches the resulting ``typer.Exit(1)`` and +emits the failed envelope. + +Per the lead's review note: ``exit_code`` stays at ``1`` for now (don't +rename to ``EXIT_POLICY`` / 8 — Phase 4 owns the taxonomy rename). +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path + +import pytest +import typer + +from bcli_cli._state import state +from bcli_cli.commands import attach_cmd, delete_cmd, patch_cmd, post_cmd + + +@pytest.fixture(autouse=True) +def force_non_interactive(monkeypatch): + """``confirm_write_or_exit`` exits 1 only when stdin isn't a TTY + + no ``--yes`` was passed. Pin both here so every test in this module + exercises the policy-violation path.""" + monkeypatch.setattr(sys.stdin, "isatty", lambda: False) + + +def _assert_policy_failure_envelope(envelope_path: Path, *, method: str, endpoint: str) -> None: + """Shared assertion: a refused write left a failed envelope on disk.""" + assert envelope_path.is_file(), ( + f"Expected envelope at {envelope_path} after policy violation; " + "the command exited without writing one." + ) + env = json.loads(envelope_path.read_text()) + assert env["status"] == "failed", env + assert env["exit_code"] == 1, env # Phase 4 will rename to EXIT_POLICY (8) + assert env["method"] == method + assert env["endpoint"] == endpoint + # Profile context is captured even on a refused write — that's the + # whole point of the envelope. + assert env["profile"] == "prod" + assert env["environment"] == "Production" + + +class TestPostEnvelopeOnPolicyViolation: + def test_post_emits_failed_envelope_when_disable_writes_blocks( + self, readonly_state, fake_client, monkeypatch, tmp_path: Path, + ): + monkeypatch.setattr(state, "make_async_client", lambda **_: fake_client) + out = tmp_path / "post-refused.json" + with pytest.raises(typer.Exit) as excinfo: + post_cmd.post_command( + endpoint="vendors", + data='{"displayName": "Acme"}', + format=None, + publisher=None, + group=None, + version=None, + yes=False, + result_out=out, + result_fd=None, + ) + assert excinfo.value.exit_code == 1 + _assert_policy_failure_envelope(out, method="POST", endpoint="vendors") + fake_client.post.assert_not_awaited() + + +class TestPatchEnvelopeOnPolicyViolation: + def test_patch_emits_failed_envelope_when_disable_writes_blocks( + self, readonly_state, fake_client, monkeypatch, tmp_path: Path, + ): + monkeypatch.setattr(state, "make_async_client", lambda **_: fake_client) + out = tmp_path / "patch-refused.json" + with pytest.raises(typer.Exit) as excinfo: + patch_cmd.patch_command( + endpoint="vendors", + record_id="vnd-1", + data='{"displayName": "Renamed"}', + etag="*", + format=None, + publisher=None, + group=None, + version=None, + yes=False, + result_out=out, + result_fd=None, + ) + assert excinfo.value.exit_code == 1 + _assert_policy_failure_envelope(out, method="PATCH", endpoint="vendors") + # record_id was captured before the gate refused — confirms the gate + # is now inside the capture block. + env = json.loads(out.read_text()) + assert env["record_id"] == "vnd-1" + fake_client.patch.assert_not_awaited() + + +class TestDeleteEnvelopeOnPolicyViolation: + def test_delete_emits_failed_envelope_when_disable_writes_blocks( + self, readonly_state, fake_client, monkeypatch, tmp_path: Path, + ): + monkeypatch.setattr(state, "make_async_client", lambda **_: fake_client) + out = tmp_path / "delete-refused.json" + with pytest.raises(typer.Exit) as excinfo: + delete_cmd.delete_command( + endpoint="vendors", + record_id="vnd-1", + etag="*", + format=None, + publisher=None, + group=None, + version=None, + yes=False, + result_out=out, + result_fd=None, + ) + assert excinfo.value.exit_code == 1 + _assert_policy_failure_envelope(out, method="DELETE", endpoint="vendors") + env = json.loads(out.read_text()) + assert env["record_id"] == "vnd-1" + fake_client.delete.assert_not_awaited() + + +class TestAttachUploadEnvelopeOnPolicyViolation: + def test_attach_upload_emits_failed_envelope_when_disable_writes_blocks( + self, readonly_state, fake_client, monkeypatch, tmp_path: Path, + ): + monkeypatch.setattr(state, "make_async_client", lambda **_: fake_client) + pdf = tmp_path / "x.pdf" + pdf.write_bytes(b"%PDF-fake") + out = tmp_path / "upload-refused.json" + with pytest.raises(typer.Exit) as excinfo: + attach_cmd.upload_command( + file_path=pdf, + parent_id="inv-1", + parent_type="Purchase Invoice", + file_name=None, + content_type=None, + publisher=None, + group=None, + version=None, + standard=False, + format=None, + yes=False, + result_out=out, + result_fd=None, + ) + assert excinfo.value.exit_code == 1 + _assert_policy_failure_envelope( + out, method="UPLOAD", endpoint="documentAttachments", + ) + fake_client.upload_attachment.assert_not_awaited()