From 53ea96c8c8d64fadfb23cfa6a6cac2b7ad41dd0d Mon Sep 17 00:00:00 2001 From: igor-ctrl Date: Sat, 16 May 2026 20:03:37 -0500 Subject: [PATCH] feat(describe): bcli describe --format json (AIP v0.1 Phase 1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the canonical agent-facing surface introspection per agent-cli-contract-v0.1.md §Phase 1. - New `bcli describe [--format json|table] [...]` command - Projects live Typer app + EndpointRegistry + active BCProfile as one JSON document — the single source of truth that MCP, completions, and docs will consume (no parallel schemas). - Caches at ~/.config/bcli/describe/..json with registry/profile mtime invalidation; writes are atomic. - Subtree mode (`bcli describe get`, `bcli describe batch run`) trims the payload to one command for token-constrained agents — drops registry + profile_constraints, keeps version/tool/tool_version/profile for provenance. - Forward-compat declarations: * `emits_result_envelope: true` on mutating commands (post/patch/ delete/attach upload/batch run) — Phase 2 attaches to this. * `emits_operation_state: true` only on `batch run` — Phase 3. * `requires_confirmation: "production"` on mutating commands. - Tolerates broken/missing config: when no profile is configured the command still emits a populated commands list with `profile: null`, matching the self-rescue posture of `bcli doctor`. - Additive only — zero behavior change for existing commands. - Stdlib only, no new runtime deps. --- src/bcli_cli/app.py | 5 + src/bcli_cli/commands/describe_cmd.py | 491 +++++++++++++++++++++++ tests/test_describe/__init__.py | 1 + tests/test_describe/test_describe_cmd.py | 463 +++++++++++++++++++++ 4 files changed, 960 insertions(+) create mode 100644 src/bcli_cli/commands/describe_cmd.py create mode 100644 tests/test_describe/__init__.py create mode 100644 tests/test_describe/test_describe_cmd.py diff --git a/src/bcli_cli/app.py b/src/bcli_cli/app.py index 761d2f7..ec1ef05 100644 --- a/src/bcli_cli/app.py +++ b/src/bcli_cli/app.py @@ -165,6 +165,7 @@ def _emit_command_summary() -> None: config_cmd, context_cmd, delete_cmd, + describe_cmd, doctor_cmd, endpoint_cmd, env_cmd, @@ -192,6 +193,10 @@ def _emit_command_summary() -> None: app.command(name="q", help="Run a saved query (no OData required)")(query_cmd.query_command) app.command(name="ai-context")(context_cmd.ai_context_command) app.command(name="doctor", help="Diagnose your bcli install (self-rescue for team users)")(doctor_cmd.doctor_command) +app.command( + name="describe", + help="AIP v0.1 — project Typer surface + registry + profile as JSON for agents", +)(describe_cmd.describe_command) # ETL command — optional, only available when dlt is installed try: diff --git a/src/bcli_cli/commands/describe_cmd.py b/src/bcli_cli/commands/describe_cmd.py new file mode 100644 index 0000000..76e5b33 --- /dev/null +++ b/src/bcli_cli/commands/describe_cmd.py @@ -0,0 +1,491 @@ +"""``bcli describe`` — Agent Interface Profile (AIP) v0.1 surface introspection. + +Projects the live Typer app + ``EndpointRegistry`` + active ``BCProfile`` as a +single JSON document. This is the canonical artifact consumed by: + +* ``bcli_mcp`` (generates tool list dynamically from describe — no hand-written + schemas drifting from the CLI), +* shell completions and docs, +* any external AI agent that needs to know "what commands and entities does + this install actually expose for *me*." + +The command is designed to tolerate a broken install (no profile configured, +missing registry, etc.) — it's the *first* command an agent runs to figure +out what's even reachable, so it must produce output even when the user +hasn't completed ``bcli config init`` yet. Same self-rescue posture as +``bcli doctor``. + +Schema and forward-compat declarations match +`agent-cli-contract-v0.1.md` §Phase 1. +""" + +from __future__ import annotations + +import hashlib +import inspect +import json +import os +import tempfile +from pathlib import Path +from typing import Any + +import typer +from rich.console import Console +from rich.table import Table + +from bcli._version import __version__ +from bcli_cli._state import state + +console = Console() + + +# Effects mapping — derived from command path. Read commands are the +# default; mutating + other are explicit. +MUTATING_PATHS: set[tuple[str, ...]] = { + ("post",), + ("patch",), + ("delete",), + ("attach", "upload"), + ("batch", "run"), +} + +# Single command in this set spans multiple steps with durable state — +# Phase 3 (batch ledger) attaches to this declaration. +OPERATION_STATE_PATHS: set[tuple[str, ...]] = {("batch", "run")} + +# Read commands by path. Anything not here and not in MUTATING_PATHS +# is treated as "other" (config writes, auth login, registry import, etc.). +READ_PATHS: set[tuple[str, ...]] = { + ("get",), + ("q",), + ("ai-context",), + ("doctor",), + ("describe",), + ("endpoint", "search"), + ("endpoint", "info"), + ("endpoint", "fields"), + ("endpoint", "list"), + ("endpoint", "show"), + ("auth", "status"), + ("auth", "whoami"), + ("config", "show"), + ("config", "path"), + ("config", "list"), + ("registry", "list"), + ("registry", "show"), + ("test",), + ("test", "endpoint"), + ("env", "list"), + ("env", "show"), + ("company", "list"), + ("company", "show"), + ("batch", "list-templates"), + ("batch", "dry-run"), + ("attach", "test"), + ("extract", "list-schemas"), +} + + +def _describe_cache_path(profile_name: str | None, registry_path: Path | None, + profile_path: Path | None) -> Path: + """Resolve the cache file path for the active profile. + + Read CONFIG_DIR lazily here (not at module load) so tests that + monkeypatch ``bcli.config._defaults.CONFIG_DIR`` see the override. + """ + from bcli.config._defaults import CONFIG_DIR + + safe_name = profile_name or "_no_profile_" + parts: list[str] = [str(__version__)] + for p in (registry_path, profile_path): + if p is not None and p.exists(): + parts.append(f"{p}:{p.stat().st_mtime_ns}") + else: + parts.append(f"{p}:missing") + digest = hashlib.sha256("|".join(parts).encode("utf-8")).hexdigest()[:16] + return CONFIG_DIR / "describe" / f"{safe_name}.{digest}.json" + + +def _atomic_write(path: Path, data: str) -> None: + """Write ``data`` to ``path`` atomically via tempfile + os.replace. + + Cache files must never be observed half-written by a concurrent reader. + """ + path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_name = tempfile.mkstemp(prefix=path.name + ".", dir=str(path.parent)) + try: + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(data) + os.replace(tmp_name, path) + except Exception: + try: + os.unlink(tmp_name) + except OSError: + pass + raise + + +# ─── Walker: turn the live Typer app into a list of command entries ── + + +def _classify_effects(path: tuple[str, ...]) -> list[str]: + if path in MUTATING_PATHS: + return ["mutating"] + if path in READ_PATHS: + return ["read"] + return ["other"] + + +def _supported_formats_from_signature(sig: inspect.Signature) -> list[str]: + """Best-effort: parse ``--format`` help text to extract format names. + + Most ``--format`` options carry a help string of the form + ``"Output format: table, json, csv, ndjson, raw"``. We split on the + colon and treat the rest as a comma-separated list. Fallback: ["json"]. + """ + for name, param in sig.parameters.items(): + info = param.default + param_decls = getattr(info, "param_decls", None) or () + if "--format" in param_decls or "-f" in param_decls: + help_text = getattr(info, "help", "") or "" + if ":" in help_text: + tail = help_text.split(":", 1)[1] + fmts = [t.strip().split()[0] for t in tail.split(",") if t.strip()] + # Strip parenthetical notes like "(default)". + fmts = [f.split("(")[0].strip() for f in fmts if f] + fmts = [f for f in fmts if f.isalnum()] + if fmts: + return fmts + return ["json"] + return ["json"] + + +def _options_from_signature(sig: inspect.Signature) -> list[dict[str, Any]]: + options: list[dict[str, Any]] = [] + for name, param in sig.parameters.items(): + info = param.default + param_decls = getattr(info, "param_decls", None) + if not param_decls: + # Positional argument — describe lists options only. + continue + # Pick the longest decl (typically the ``--foo`` form). + long_name = sorted(param_decls, key=lambda d: (-len(d), d))[0] + type_name = _annotation_to_name(param.annotation) + opt: dict[str, Any] = {"name": long_name, "type": type_name} + # Crude validator hint — the spec example shows + # ``validates: "odata-filter"`` for ``--filter``. + if long_name == "--filter": + opt["validates"] = "odata-filter" + options.append(opt) + return options + + +def _annotation_to_name(ann: Any) -> str: + """Render a type annotation as a JSON-friendly string. + + ``Optional[int]`` → ``"int"``, ``bool`` → ``"bool"``, etc. + """ + if ann is inspect.Parameter.empty: + return "string" + # Strings via from __future__ import annotations. + if isinstance(ann, str): + s = ann + else: + s = getattr(ann, "__name__", None) or str(ann) + s = s.replace("Optional[", "").rstrip("]") + s = s.split(".")[-1] + return s.lower() or "string" + + +def _command_name(cmd_info) -> str: + """Resolve a Typer CommandInfo's user-facing name. + + Typer falls back to ``callback.__name__`` (underscores → hyphens) when + no explicit name is given. + """ + if cmd_info.name: + return cmd_info.name + cb = cmd_info.callback + if cb is None: + return "" + return cb.__name__.replace("_", "-") + + +def _walk_typer(typer_app, parent_path: tuple[str, ...] = ()) -> list[dict[str, Any]]: + """Recursively walk a Typer instance, yielding command entries.""" + out: list[dict[str, Any]] = [] + + for cmd_info in typer_app.registered_commands: + name = _command_name(cmd_info) + path = parent_path + (name,) + callback = cmd_info.callback + sig = inspect.signature(callback) if callback else inspect.Signature() + entry: dict[str, Any] = { + "path": list(path), + "summary": _summary_from_callback(callback), + "options": _options_from_signature(sig), + "effects": _classify_effects(path), + "supported_formats": _supported_formats_from_signature(sig), + } + # Forward-compat declarations for mutating commands only. + if entry["effects"] == ["mutating"]: + entry["emits_result_envelope"] = True + entry["requires_confirmation"] = "production" + if path in OPERATION_STATE_PATHS: + entry["emits_operation_state"] = True + out.append(entry) + + for group_info in typer_app.registered_groups: + group_name = group_info.name or ( + group_info.typer_instance.info.name if group_info.typer_instance else "?" + ) + out.extend(_walk_typer(group_info.typer_instance, parent_path + (group_name,))) + + return out + + +def _summary_from_callback(callback) -> str: + if callback is None or not callback.__doc__: + return "" + doc = inspect.cleandoc(callback.__doc__) + # First paragraph only (one-line summary in the JSON projection). + return doc.split("\n\n", 1)[0].strip().splitlines()[0] if doc else "" + + +# ─── Registry & profile-constraint projections ──────────────────────── + + +def _project_registry(registry, *, tier_2_enabled: bool) -> dict[str, Any]: + endpoints: list[dict[str, Any]] = [] + seen: set[tuple[str, str]] = set() + for meta in registry.list_all(): + tier = "custom" if meta.is_custom else "standard" + key = (meta.entity_set_name, tier) + if key in seen: + continue + seen.add(key) + endpoints.append({ + "entity": meta.entity_set_name, + "tier": tier, + "domain": getattr(meta, "domain", "standard"), + "ops": list(meta.supports), + "fields_known": len(meta.field_names) if meta.field_names else 0, + "fields_discovered_at": getattr(meta, "fields_discovered_at", None), + }) + endpoints.sort(key=lambda e: (e["tier"], e["entity"])) + return { + "tier_1_custom_count": registry.custom_count, + "tier_2_standard_enabled": tier_2_enabled, + "endpoints": endpoints, + } + + +def _project_profile_constraints(profile) -> dict[str, Any]: + if profile is None: + return { + "disable_writes": None, + "disable_standard_api": None, + "allowed_categories": None, + } + return { + "disable_writes": bool(profile.disable_writes), + "disable_standard_api": bool(profile.disable_standard_api), + "allowed_categories": list(profile.allowed_categories) if profile.allowed_categories else None, + } + + +# ─── Build the full describe payload ────────────────────────────────── + + +def _resolve_paths(profile_name: str | None) -> tuple[Path | None, Path | None]: + """Return (registry_path, config_path) for cache-key + invalidation. + + Both may be ``None`` if the install is incomplete. + """ + from bcli.config._defaults import CONFIG_FILE, REGISTRIES_DIR + + registry_path = (REGISTRIES_DIR / f"{profile_name}.json") if profile_name else None + config_path = CONFIG_FILE + return registry_path, config_path + + +def _build_payload() -> dict[str, Any]: + """Project the live app + registry + profile into the JSON shape.""" + # Lazy: avoid circular import at module load. + from bcli_cli.app import app + + profile_name: str | None = None + profile = None + registry = None + tier_2_enabled = True + + try: + profile_name = state.active_profile_name + profile = state.profile + registry = state.registry + tier_2_enabled = not bool(profile.disable_standard_api) + except Exception: # noqa: BLE001 + # Broken/missing config — agent-friendly: keep going with stubs. + from bcli.registry._registry import EndpointRegistry + + profile_name = None + profile = None + try: + registry = EndpointRegistry() + except Exception: # noqa: BLE001 + registry = None + tier_2_enabled = True + + commands = _walk_typer(app) + commands.sort(key=lambda c: tuple(c["path"])) + + if registry is None: + registry_projection: dict[str, Any] = { + "tier_1_custom_count": 0, + "tier_2_standard_enabled": tier_2_enabled, + "endpoints": [], + } + else: + registry_projection = _project_registry(registry, tier_2_enabled=tier_2_enabled) + + return { + "version": "0.1", + "tool": "bcli", + "tool_version": __version__, + "profile": profile_name, + "commands": commands, + "registry": registry_projection, + "profile_constraints": _project_profile_constraints(profile), + } + + +# ─── Cached payload helpers ─────────────────────────────────────────── + + +def _load_or_build_payload() -> dict[str, Any]: + """Read from cache if fresh, otherwise regenerate + write. + + Cache key is sha256 of registry mtime + profile mtime + tool version. + A new key means a new file; the old file is left behind harmlessly + (a future ``bcli describe clean`` could prune them). + """ + profile_name = state.profile_name or ( + state._config.defaults.profile if state._config is not None else None + ) + if profile_name is None: + try: + profile_name = state.active_profile_name + except Exception: # noqa: BLE001 + profile_name = None + + registry_path, profile_path = _resolve_paths(profile_name) + cache_path = _describe_cache_path(profile_name, registry_path, profile_path) + if cache_path.exists(): + try: + return json.loads(cache_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + pass # fall through to regenerate + + payload = _build_payload() + try: + _atomic_write(cache_path, json.dumps(payload, indent=2, sort_keys=False)) + except OSError: + # Cache failure must not crash describe; the payload is still valid. + pass + return payload + + +# ─── Subtree slicing ────────────────────────────────────────────────── + + +def _slice_subtree(payload: dict[str, Any], path: list[str]) -> dict[str, Any]: + """Return the entry whose ``path`` matches, wrapped in a small envelope. + + Drops ``registry`` and ``profile_constraints`` — agents calling + ``bcli describe get`` want minimum tokens, not the full surface. + """ + target = tuple(path) + matches = [c for c in payload["commands"] if tuple(c["path"]) == target] + if not matches: + # Also allow a prefix match for groups (e.g. `bcli describe attach`). + matches = [c for c in payload["commands"] if tuple(c["path"])[: len(target)] == target] + if not matches: + # Surface a clear error and exit non-zero so an agent can react. + console.print(f"[red]No command matches path: {' '.join(path)!r}[/red]") + raise typer.Exit(4) + + return { + "version": payload["version"], + "tool": payload["tool"], + "tool_version": payload["tool_version"], + "profile": payload["profile"], + "commands": matches, + } + + +# ─── Output rendering ───────────────────────────────────────────────── + + +def _emit_json(payload: dict[str, Any]) -> None: + typer.echo(json.dumps(payload, indent=2, sort_keys=False)) + + +def _emit_table(payload: dict[str, Any]) -> None: + table = Table(title=f"bcli describe (profile={payload['profile']!r})") + table.add_column("Command") + table.add_column("Effects") + table.add_column("Formats") + for cmd in payload["commands"]: + table.add_row( + " ".join(cmd["path"]), + ",".join(cmd["effects"]), + ",".join(cmd["supported_formats"]), + ) + console.print(table) + if "registry" in payload: + reg = payload["registry"] + console.print( + f"Registry: tier_1_custom_count={reg['tier_1_custom_count']}" + f" tier_2_standard_enabled={reg['tier_2_standard_enabled']}" + f" endpoints={len(reg['endpoints'])}" + ) + + +# ─── Typer entry point ──────────────────────────────────────────────── + + +def describe_command( + command_path: list[str] = typer.Argument( + None, + help="Optional command path (e.g. 'get' or 'batch run') for a narrow subtree", + ), + format: str = typer.Option( + "json", + "--format", + "-f", + help="Output format: json (default, agent-friendly), table (human summary)", + ), +) -> None: + """Project the live CLI surface + registry + profile as JSON. + + Designed for AI agents: one canonical artifact that MCP, completions, + and docs all consume. Tolerates broken installs (no profile, missing + registry) so it works as the *first* command an agent runs. + + \b + Examples: + bcli describe full surface, JSON + bcli describe --format table compact human summary + bcli describe get subtree for the `get` command + bcli describe batch run subtree for `batch run` + """ + payload = _load_or_build_payload() + + if command_path: + payload = _slice_subtree(payload, command_path) + + fmt = (format or "json").lower() + if fmt == "table": + _emit_table(payload) + else: + _emit_json(payload) diff --git a/tests/test_describe/__init__.py b/tests/test_describe/__init__.py new file mode 100644 index 0000000..97b0de0 --- /dev/null +++ b/tests/test_describe/__init__.py @@ -0,0 +1 @@ +"""Tests for `bcli describe` — AIP v0.1 Phase 1.""" diff --git a/tests/test_describe/test_describe_cmd.py b/tests/test_describe/test_describe_cmd.py new file mode 100644 index 0000000..ec30a8c --- /dev/null +++ b/tests/test_describe/test_describe_cmd.py @@ -0,0 +1,463 @@ +"""End-to-end tests for `bcli describe` (AIP v0.1 Phase 1). + +The command projects the live Typer app + EndpointRegistry + active +BCProfile as a single JSON document. These tests assert the schema +shape, forward-compat flags, cache behavior, and subtree mode. +""" + +from __future__ import annotations + +import json +import time +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from bcli._version import __version__ +from bcli_cli._state import state +from bcli_cli.app import app + +runner = CliRunner() + + +# ─── Fixtures ───────────────────────────────────────────────────────── + + +@pytest.fixture +def tmp_config(monkeypatch, tmp_path): + """Redirect global config + registries + describe cache to a tmp tree. + + Mirrors the pattern in test_config_cmd.py. Resets the singleton state + so cached config/registry from previous tests doesn't bleed in. + """ + config_dir = tmp_path / "bcli" + config_dir.mkdir() + registries_dir = config_dir / "registries" + registries_dir.mkdir() + describe_dir = config_dir / "describe" # describe cache lives under here + config_file = config_dir / "config.toml" + + monkeypatch.setattr("bcli.config._loader.CONFIG_FILE", config_file) + monkeypatch.setattr("bcli.config._loader.CONFIG_DIR", config_dir) + monkeypatch.setattr("bcli.config._defaults.CONFIG_DIR", config_dir) + monkeypatch.setattr("bcli.config._defaults.CONFIG_FILE", config_file) + monkeypatch.setattr("bcli.config._defaults.REGISTRIES_DIR", registries_dir) + monkeypatch.setattr("bcli.registry._registry.REGISTRIES_DIR", registries_dir) + monkeypatch.setattr("bcli.config._loader._find_project_config", lambda: None) + for env_var in ("BCLI_PROFILE", "BCLI_FORMAT", "BCLI_TIMEOUT"): + monkeypatch.delenv(env_var, raising=False) + + # Reset state singleton (lazy-loaded config/registry). + state._config = None + state._registry = None + state._telemetry = None + state.profile_name = None + state.env_override = None + state.company_override = None + state.format_explicit = False + state.format = "table" + yield { + "config_dir": config_dir, + "config_file": config_file, + "registries_dir": registries_dir, + "describe_dir": describe_dir, + } + state._config = None + state._registry = None + state._telemetry = None + state.profile_name = None + state.env_override = None + state.company_override = None + + +def _write_basic_profile(config_file: Path, *, disable_writes: bool = False, + disable_standard_api: bool = False, + allowed_categories: list[str] | None = None) -> None: + lines = [ + '[defaults]\nprofile = "test"\n', + '[profiles.test]\n', + 'tenant_id = "t1"\n', + 'environment = "Sandbox"\n', + ] + if disable_writes: + lines.append("disable_writes = true\n") + if disable_standard_api: + lines.append("disable_standard_api = true\n") + if allowed_categories is not None: + cats = ", ".join(f'"{c}"' for c in allowed_categories) + lines.append(f"allowed_categories = [{cats}]\n") + config_file.write_text("".join(lines)) + + +def _write_custom_registry(registries_dir: Path, profile: str = "test") -> Path: + """Drop a tiny custom registry with two endpoints for projection tests.""" + registry_file = registries_dir / f"{profile}.json" + payload = { + "endpoints": [ + { + "entity_set_name": "vendors", + "entity_name": "Vendor", + "description": "Vendor master records", + "supports": ["GET", "POST", "PATCH"], + "key_field": "id", + "category": "finance", + "api_publisher": "beautech", + "api_group": "finance", + "api_version": "v1.0", + "domain": "finance", + "field_names": ["id", "number", "name"], + }, + { + "entity_set_name": "engineLogbook", + "entity_name": "EngineLogbook", + "description": "Aviation engine logbook", + "supports": ["GET"], + "key_field": "id", + "category": "aviation", + "api_publisher": "beautech", + "api_group": "aviation", + "api_version": "v1.0", + "domain": "technical", + "field_names": [], + }, + ] + } + registry_file.write_text(json.dumps(payload)) + return registry_file + + +def _invoke_describe(*extra_args: str): + """Run `bcli describe ...` and decode JSON from stdout when applicable.""" + return runner.invoke(app, ["describe", *extra_args]) + + +# ─── Schema-level tests ────────────────────────────────────────────── + + +def test_describe_json_top_level_keys(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + assert result.exit_code == 0, result.stderr or result.stdout + data = json.loads(result.stdout) + for key in ("version", "tool", "tool_version", "profile", + "commands", "registry", "profile_constraints"): + assert key in data, f"missing top-level key: {key!r}" + assert data["version"] == "0.1" + assert data["tool"] == "bcli" + assert data["tool_version"] == __version__ + assert data["profile"] == "test" + + +def test_describe_commands_list_shape(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + assert result.exit_code == 0, result.stderr + data = json.loads(result.stdout) + assert isinstance(data["commands"], list) and data["commands"] + for cmd in data["commands"]: + assert "path" in cmd and isinstance(cmd["path"], list) + assert all(isinstance(p, str) for p in cmd["path"]) + assert "summary" in cmd + assert "options" in cmd and isinstance(cmd["options"], list) + for opt in cmd["options"]: + assert "name" in opt and isinstance(opt["name"], str) + assert "type" in opt + assert "effects" in cmd and isinstance(cmd["effects"], list) + assert cmd["effects"], "every command must declare at least one effect" + for e in cmd["effects"]: + assert e in ("read", "mutating", "other"), f"unknown effect: {e!r}" + assert "supported_formats" in cmd + assert isinstance(cmd["supported_formats"], list) + + +def test_describe_lists_describe_command_itself(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + data = json.loads(result.stdout) + paths = [tuple(c["path"]) for c in data["commands"]] + assert ("describe",) in paths, f"describe missing from commands; got {paths!r}" + describe_cmd = next(c for c in data["commands"] if c["path"] == ["describe"]) + assert describe_cmd["effects"] == ["read"] + + +def test_describe_commands_sorted_deterministically(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + data = json.loads(result.stdout) + paths = [tuple(c["path"]) for c in data["commands"]] + assert paths == sorted(paths), "commands must be sorted alphabetically by path" + + +def test_describe_mutating_commands_flagged(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + data = json.loads(result.stdout) + cmds_by_path = {tuple(c["path"]): c for c in data["commands"]} + + mutating_paths = [ + ("post",), + ("patch",), + ("delete",), + ("attach", "upload"), + ("batch", "run"), + ] + for p in mutating_paths: + assert p in cmds_by_path, f"missing mutating command: {p}" + cmd = cmds_by_path[p] + assert cmd["effects"] == ["mutating"], f"{p} not flagged mutating: {cmd['effects']}" + # Forward-compat declaration for Phase 2. + assert cmd.get("emits_result_envelope") is True, ( + f"{p} must declare emits_result_envelope=True for AIP forward-compat" + ) + assert cmd.get("requires_confirmation") == "production", ( + f"{p} must declare requires_confirmation='production'" + ) + + +def test_describe_read_commands_do_not_carry_envelope_flag(tmp_config): + """Forward-compat flags must NOT leak onto read commands.""" + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + data = json.loads(result.stdout) + cmds_by_path = {tuple(c["path"]): c for c in data["commands"]} + get_cmd = cmds_by_path[("get",)] + assert get_cmd["effects"] == ["read"] + assert "emits_result_envelope" not in get_cmd + assert "emits_operation_state" not in get_cmd + assert "requires_confirmation" not in get_cmd + + +def test_describe_batch_run_emits_operation_state(tmp_config): + """Only `batch run` is multi-step durable — declares operation_state for Phase 3.""" + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + data = json.loads(result.stdout) + cmds_by_path = {tuple(c["path"]): c for c in data["commands"]} + + batch_run = cmds_by_path[("batch", "run")] + assert batch_run.get("emits_operation_state") is True, ( + "batch run must declare emits_operation_state=True" + ) + + # And single-mutation commands do NOT get the operation_state flag. + for p in [("post",), ("patch",), ("delete",), ("attach", "upload")]: + assert "emits_operation_state" not in cmds_by_path[p], ( + f"{p} must not declare emits_operation_state — that's batch-only" + ) + + +# ─── Registry projection ───────────────────────────────────────────── + + +def test_describe_registry_projection_with_custom_endpoints(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + _write_custom_registry(tmp_config["registries_dir"], profile="test") + result = _invoke_describe("--format", "json") + assert result.exit_code == 0, result.stderr + data = json.loads(result.stdout) + reg = data["registry"] + assert reg["tier_1_custom_count"] == 2 + assert reg["tier_2_standard_enabled"] is True + endpoints = reg["endpoints"] + assert isinstance(endpoints, list) and len(endpoints) >= 2 + # Sorted alphabetically by entity for determinism. + custom_entities = [e["entity"] for e in endpoints if e["tier"] == "custom"] + assert custom_entities == sorted(custom_entities) + + vendors = next(e for e in endpoints if e["entity"] == "vendors") + assert vendors["tier"] == "custom" + assert vendors["domain"] == "finance" + assert vendors["ops"] == ["GET", "POST", "PATCH"] + assert vendors["fields_known"] == 3 + assert "fields_discovered_at" in vendors # may be null + + +def test_describe_registry_reflects_disable_standard_api(tmp_config): + _write_basic_profile(tmp_config["config_file"], disable_standard_api=True) + _write_custom_registry(tmp_config["registries_dir"], profile="test") + result = _invoke_describe("--format", "json") + data = json.loads(result.stdout) + assert data["registry"]["tier_2_standard_enabled"] is False + + +# ─── Profile constraints ───────────────────────────────────────────── + + +def test_describe_profile_constraints_projection(tmp_config): + _write_basic_profile( + tmp_config["config_file"], + disable_writes=True, + disable_standard_api=True, + allowed_categories=["finance", "aviation"], + ) + result = _invoke_describe("--format", "json") + data = json.loads(result.stdout) + constraints = data["profile_constraints"] + assert constraints["disable_writes"] is True + assert constraints["disable_standard_api"] is True + assert constraints["allowed_categories"] == ["finance", "aviation"] + + +def test_describe_profile_constraints_defaults_to_unset(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("--format", "json") + data = json.loads(result.stdout) + constraints = data["profile_constraints"] + # disable_writes / disable_standard_api default to False on the model. + assert constraints["disable_writes"] is False + assert constraints["disable_standard_api"] is False + # No allowed_categories set on the profile → null in the projection. + assert constraints["allowed_categories"] is None or constraints["allowed_categories"] == [] + + +# ─── No-profile / broken-config tolerance ──────────────────────────── + + +def test_describe_tolerates_no_profile_configured(tmp_config): + """Agent-first: describe must work even when no profile is set up. + + `doctor` tolerates a broken install; describe is the *first* command + an agent will run — it has to produce output even when the user + hasn't run `bcli config init` yet. + """ + # No config file written. + result = _invoke_describe("--format", "json") + assert result.exit_code == 0, result.stderr or result.stdout + data = json.loads(result.stdout) + assert data["profile"] is None + # commands list still populated from the live Typer app. + assert data["commands"] + # Registry projection should not crash. + assert "registry" in data + + +# ─── Subtree mode ──────────────────────────────────────────────────── + + +def test_describe_subtree_returns_only_matching_command(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + full = json.loads(_invoke_describe("--format", "json").stdout) + sub = json.loads(_invoke_describe("get", "--format", "json").stdout) + # Subtree must keep provenance keys. + assert sub["version"] == "0.1" + assert sub["tool"] == "bcli" + assert sub["tool_version"] == full["tool_version"] + assert sub["profile"] == "test" + # Subtree carries exactly one command — the `get`. + assert isinstance(sub["commands"], list) + assert len(sub["commands"]) == 1 + assert sub["commands"][0]["path"] == ["get"] + # Drops registry + profile_constraints to save tokens. + assert "registry" not in sub + assert "profile_constraints" not in sub + # Smaller payload — fundamental purpose of the subtree mode. + assert len(json.dumps(sub)) < len(json.dumps(full)) + + +def test_describe_subtree_supports_multipart_path(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + sub = json.loads(_invoke_describe("batch", "run", "--format", "json").stdout) + assert len(sub["commands"]) == 1 + assert sub["commands"][0]["path"] == ["batch", "run"] + assert sub["commands"][0]["effects"] == ["mutating"] + + +def test_describe_subtree_unknown_path_errors(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + result = _invoke_describe("not-a-real-command", "--format", "json") + assert result.exit_code != 0 + + +# ─── Cache behavior ─────────────────────────────────────────────────── + + +def test_describe_writes_cache_on_first_call(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + describe_dir = tmp_config["describe_dir"] + assert not describe_dir.exists() + result = _invoke_describe("--format", "json") + assert result.exit_code == 0, result.stderr or result.stdout + # Cache directory created. + assert describe_dir.exists() + cached_files = list(describe_dir.glob("test.*.json")) + assert len(cached_files) == 1, f"expected 1 cache file, got {cached_files}" + + +def test_describe_cache_hit_returns_cached_payload(tmp_config): + """Second call reads from cache rather than regenerating. + + Verified by writing a sentinel into the cache file and seeing it + return verbatim on the second call — bypassing fresh generation. + """ + _write_basic_profile(tmp_config["config_file"]) + first = _invoke_describe("--format", "json") + assert first.exit_code == 0 + + # Mutate the cache file to a known sentinel value; second call must + # return that sentinel (proving cache read path). + describe_dir = tmp_config["describe_dir"] + cached_files = list(describe_dir.glob("test.*.json")) + assert len(cached_files) == 1 + sentinel = { + "version": "0.1", + "tool": "bcli", + "tool_version": "sentinel-cache-version", + "profile": "test", + "commands": [], + "registry": {"tier_1_custom_count": 0, "tier_2_standard_enabled": True, "endpoints": []}, + "profile_constraints": {"disable_writes": False, "disable_standard_api": False, + "allowed_categories": None}, + } + cached_files[0].write_text(json.dumps(sentinel)) + + second = _invoke_describe("--format", "json") + assert second.exit_code == 0 + data = json.loads(second.stdout) + assert data["tool_version"] == "sentinel-cache-version", "cache was not consulted" + + +def test_describe_cache_invalidates_on_registry_mtime(tmp_config): + """Touching the registry file forces a regenerate on the next call.""" + _write_basic_profile(tmp_config["config_file"]) + registry_file = _write_custom_registry(tmp_config["registries_dir"], profile="test") + + first = _invoke_describe("--format", "json") + assert first.exit_code == 0 + describe_dir = tmp_config["describe_dir"] + cached_files = list(describe_dir.glob("test.*.json")) + assert len(cached_files) == 1 + first_hash_name = cached_files[0].name + + # Sleep just enough for filesystem mtime granularity (1s on most + # filesystems is plenty); then touch the registry file so its mtime + # advances, forcing cache invalidation. + time.sleep(1.1) + registry_file.write_text(registry_file.read_text()) # rewrite => new mtime + + second = _invoke_describe("--format", "json") + assert second.exit_code == 0 + cached_files_after = list(describe_dir.glob("test.*.json")) + new_names = {f.name for f in cached_files_after} + # Either a brand-new cache file appeared (different hash, ideal case) + # OR the existing file's mtime advanced — both prove regeneration. + appeared_new = bool(new_names - {first_hash_name}) + same_file_advanced = any( + f.stat().st_mtime > cached_files[0].stat().st_mtime + for f in cached_files_after if f.name == first_hash_name + ) + assert appeared_new or same_file_advanced, ( + "cache must regenerate when registry mtime advances; " + f"before={first_hash_name} after={new_names}" + ) + + +# ─── Table format smoke test ───────────────────────────────────────── + + +def test_describe_table_format_smoke(tmp_config): + _write_basic_profile(tmp_config["config_file"]) + result = runner.invoke(app, ["describe", "--format", "table"]) + assert result.exit_code == 0, result.stderr + # Some human-readable rendering of the data; "get" command should appear. + assert "get" in result.stdout