diff --git a/docs/src/content/docs/reference/cli/approve.md b/docs/src/content/docs/reference/cli/approve.md new file mode 100644 index 000000000..024385a04 --- /dev/null +++ b/docs/src/content/docs/reference/cli/approve.md @@ -0,0 +1,123 @@ +--- +title: apm approve / apm deny +description: Manage the executable approval gate for dependency packages. +sidebar: + order: 25 +--- + +## Synopsis + +```bash +apm approve [PACKAGE_REF...] [OPTIONS] +apm deny [PACKAGE_REF...] [OPTIONS] +``` + +## Description + +APM blocks executable primitives (hooks, bin/ executables) from +dependency packages by default. The `allowExecutables` block in +`apm.yml` records which packages have been explicitly approved to +deploy executables. + +`apm approve` adds a package to the allowlist. `apm deny` removes it. + +### How the gate works + +When `apm install` encounters a dependency that ships hooks or bin/ +executables: + +1. If `allowExecutables` is **absent** from `apm.yml`, everything is + approved (backward-compatible, no gate). +2. If `allowExecutables` is **present** (even empty `{}`), only listed + packages may deploy executables. +3. In interactive mode, `apm install` prompts for each unapproved + package. In CI (non-interactive), unapproved executables cause a + hard error. + +Local project content (the root `.apm/` directory) is always trusted. + +### What is gated + +| Type | Gated | Notes | +|------|-------|-------| +| Hooks (`.apm/hooks/`, `hooks/`) | Yes | Auto-fire in IDE on lifecycle events | +| Bin executables (`bin/`) | Yes | Deployed to agent PATH via symlinks | +| MCP servers | No | Enforcement deferred to a future release | +| Text primitives (skills, agents, instructions) | No | No code execution risk | + +## Options + +### `apm approve` + +| Flag | Description | +|------|-------------| +| `PACKAGE_REF` | One or more packages to approve (e.g. `ci-hooks@acme`). | +| `--pending` | List all packages with unapproved executables. | +| `--all` | Approve all currently blocked packages. | + +### `apm deny` + +| Flag | Description | +|------|-------------| +| `PACKAGE_REF` | One or more packages to deny (removes from allowlist). | + +## Manifest format + +Approvals are stored in `apm.yml` under `allowExecutables`, keyed by +`name#version` with per-type boolean flags: + +```yaml +allowExecutables: + "ci-hooks@acme#1.2.0": + hooks: true + bin: true + "dev-tools@org#0.5.0": + hooks: true +``` + +Version pinning means approval must be renewed when a package updates. + +## Examples + +Approve a specific package: + +```bash +apm approve ci-hooks@acme +``` + +Show all blocked packages: + +```bash +apm approve --pending +``` + +Approve everything (migration helper): + +```bash +apm approve --all +``` + +Revoke approval: + +```bash +apm deny ci-hooks@acme +``` + +## Non-interactive / CI usage + +In CI environments (`CI=true`, `APM_NON_INTERACTIVE=1`, or when stdin +is not a TTY), `apm install` fails with exit code 1 if any dependency +has unapproved executables. Pre-approve packages in `apm.yml` before +CI runs: + +```bash +# One-time setup: approve all current dependencies +apm approve --all +git add apm.yml +git commit -m "Approve executable dependencies" +``` + +## See also + +- [`apm install`](../install/) -- the install command that enforces the gate +- [`apm audit`](../audit/) -- audit installed packages diff --git a/packages/apm-guide/.apm/skills/apm-usage/commands.md b/packages/apm-guide/.apm/skills/apm-usage/commands.md index e6ea615d9..1f53d7312 100644 --- a/packages/apm-guide/.apm/skills/apm-usage/commands.md +++ b/packages/apm-guide/.apm/skills/apm-usage/commands.md @@ -220,3 +220,5 @@ Experimental flags MUST NOT gate security-critical behaviour (content scanning, `apm config set external..llm true|false` and `apm config set external..args -- ""` persist per-scanner external-scanner defaults to `~/.apm/config.json` (JSON section `external_scanners..{llm,args}`), behind `apm experimental enable external-scanners`. `` is validated against the supported scanners (e.g. `skillspector`). `.args` is shlex-split and stored as a list; use the `--` separator so Click does not parse a leading `--flag` as an option. `apm config get external..{llm,args}`, `apm config unset external..{llm,args}`, and `apm config unset external.` (removes both) round out the surface. These keys are reachable only when the flag is enabled; bare `apm config get` lists set external keys when the flag is on. CLI flags (`--external-llm`, `--external-args`) override these values for a single run. `apm config set mcp-registry-url https://mcp.internal.example.com` persists a private MCP registry URL so users do not need to export `MCP_REGISTRY_URL` every session. Accepts `http://` or `https://` URLs; all other schemes are rejected. Resolution order: `--registry ` flag on `apm mcp install` / `apm install --mcp` > `MCP_REGISTRY_URL` env var > `mcp-registry-url` in `~/.apm/config.json` > built-in public default. When the config layer is active, `apm mcp search` prints a `Registry (config): ` diagnostic. `apm config unset mcp-registry-url` removes the persisted URL. + +`apm approve [PACKAGE_REF...]` adds packages to the `allowExecutables` block in `apm.yml`, permitting their hooks and bin/ executables to deploy during `apm install`. `apm approve --pending` lists all packages with unapproved executables. `apm approve --all` approves every currently blocked package. `apm deny [PACKAGE_REF...]` removes packages from the allowlist, blocking their executables on the next install. Approvals are version-pinned (`name#version`); updating a package requires re-approval. In non-interactive environments (CI), unapproved executables cause `apm install` to exit 1. diff --git a/src/apm_cli/cli.py b/src/apm_cli/cli.py index 8706f4c1a..9c7b3b350 100644 --- a/src/apm_cli/cli.py +++ b/src/apm_cli/cli.py @@ -18,6 +18,7 @@ _check_and_notify_updates, print_version, ) +from apm_cli.commands.approve import approve_cmd, deny_cmd from apm_cli.commands.audit import audit from apm_cli.commands.cache import cache from apm_cli.commands.compile import compile as compile_cmd @@ -147,8 +148,10 @@ def cli(ctx, verbose: bool) -> None: # Register command groups +cli.add_command(approve_cmd, name="approve") cli.add_command(audit) cli.add_command(cache) +cli.add_command(deny_cmd, name="deny") cli.add_command(deps) cli.add_command(view_cmd) # Hidden backward-compatible alias: ``apm info`` → ``apm view`` diff --git a/src/apm_cli/commands/approve.py b/src/apm_cli/commands/approve.py new file mode 100644 index 000000000..662d0e798 --- /dev/null +++ b/src/apm_cli/commands/approve.py @@ -0,0 +1,265 @@ +"""``apm approve`` and ``apm deny`` -- manage executable primitive approvals. + +These commands mirror npm v12's ``npm approve-scripts`` / ``npm deny-scripts``. +They read and write the ``allowExecutables`` block in the project's ``apm.yml``. +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import click + +from ..utils.console import _rich_echo, _rich_error, _rich_info, _rich_success, _rich_warning + + +def _find_manifest() -> Path: + """Return the project's ``apm.yml`` path or exit.""" + manifest = Path.cwd() / "apm.yml" + if not manifest.is_file(): + _rich_error("No apm.yml found in the current directory.") + sys.exit(1) + return manifest + + +def _load_allow_executables(manifest: Path) -> dict[str, dict[str, bool]] | None: + """Load the ``allowExecutables`` block from ``apm.yml``. + + Returns ``None`` when the project has not declared the block (gate + disabled -- backward-compatible) vs ``{}`` when the block is present + but empty (gate enabled, deny-all). + """ + from ..security.executables import parse_allow_executables + from ..utils.yaml_io import load_yaml + + data = load_yaml(manifest) + if not isinstance(data, dict): + return None + return parse_allow_executables(data) + + +@click.command("approve") +@click.argument("packages", nargs=-1) +@click.option( + "--pending", + is_flag=True, + help="List all packages with unapproved executables.", +) +@click.option( + "--all", + "approve_all", + is_flag=True, + help="Approve all packages with executables.", +) +def approve_cmd(packages: tuple[str, ...], pending: bool, approve_all: bool) -> None: + """Approve executable primitives for installed packages. + + Adds entries to the ``allowExecutables`` block in ``apm.yml`` so that + hooks, MCP servers, and bin/ executables from the specified packages + are deployed during ``apm install``. + + Examples: + + apm approve owner/repo + + apm approve --pending + + apm approve --all + """ + manifest = _find_manifest() + allow_exec = _load_allow_executables(manifest) + + if pending: + _show_pending(manifest, allow_exec or {}) + return + + # Approving a package implies opting into the gate; initialise + # the block when absent so approvals are persisted correctly. + if allow_exec is None: + allow_exec = {} + + if approve_all: + _approve_all_pending(manifest, allow_exec) + return + + if not packages: + _rich_error("Specify at least one package, or use --pending / --all.") + sys.exit(1) + + _approve_packages(manifest, allow_exec, packages) + + +@click.command("deny") +@click.argument("packages", nargs=-1, required=True) +def deny_cmd(packages: tuple[str, ...]) -> None: + """Revoke executable approval for packages. + + Removes entries from the ``allowExecutables`` block in ``apm.yml``. + + Example: + + apm deny owner/repo + """ + manifest = _find_manifest() + allow_exec = _load_allow_executables(manifest) or {} + + from ..security.executables import write_allow_executables + + removed = 0 + for pkg in packages: + # Try exact match first, then prefix match + matched_key = _find_matching_key(allow_exec, pkg) + if matched_key: + del allow_exec[matched_key] + _rich_success(f"Revoked approval for {matched_key}") + removed += 1 + else: + _rich_warning(f"{pkg}: not found in allowExecutables") + + if removed > 0: + write_allow_executables(manifest, allow_exec) + _rich_info(f"Updated allowExecutables in apm.yml ({removed} removed).", symbol="info") + + +def _find_matching_key(allow_exec: dict[str, dict[str, bool]], pkg: str) -> str | None: + """Find a key in allow_exec that matches *pkg* (exact or prefix).""" + # Exact match + if pkg in allow_exec: + return pkg + # Prefix match: "owner/repo" matches "owner/repo#v1.0" + for key in allow_exec: + if key.startswith(pkg + "#"): + return key + return None + + +def _show_pending(manifest: Path, allow_exec: dict[str, dict[str, bool]]) -> None: + """List all installed packages with unapproved executables.""" + declarations = _scan_installed_packages(manifest) + pending = [d for d in declarations if d.has_executables and not _is_approved(allow_exec, d)] + + if not pending: + _rich_success("All packages with executables are approved.") + return + + _rich_warning(f"{len(pending)} package(s) with unapproved executables:") + _rich_echo("") + for decl in pending: + _rich_echo(f" {decl.package_key}: {decl.summary_line()}") + _rich_echo("") + _rich_info( + "Run 'apm approve ' to approve individual packages, " + "or 'apm approve --all' to approve everything.", + symbol="info", + ) + + +def _approve_all_pending(manifest: Path, allow_exec: dict[str, dict[str, bool]]) -> None: + """Approve all installed packages with unapproved executables.""" + from ..security.executables import write_allow_executables + + declarations = _scan_installed_packages(manifest) + count = 0 + for decl in declarations: + if decl.has_executables and not _is_approved(allow_exec, decl): + allow_exec[decl.package_key] = {t: True for t in decl.exec_types} + _rich_success(f"Approved {decl.package_key}: {decl.summary_line()}") + count += 1 + + if count == 0: + _rich_success("All packages with executables are already approved.") + return + + write_allow_executables(manifest, allow_exec) + _rich_info(f"Updated allowExecutables in apm.yml ({count} approved).", symbol="info") + + +def _approve_packages( + manifest: Path, + allow_exec: dict[str, dict[str, bool]], + packages: tuple[str, ...], +) -> None: + """Approve specific packages by name.""" + from ..security.executables import write_allow_executables + + declarations = _scan_installed_packages(manifest) + decl_map = {d.package_name: d for d in declarations} + # Also index by package_key for exact matches + decl_key_map = {d.package_key: d for d in declarations} + + count = 0 + for pkg in packages: + decl = decl_key_map.get(pkg) or decl_map.get(pkg) + if decl is None: + # Try prefix match on keys + for d in declarations: + if d.package_key.startswith(pkg + "#") or d.package_name.startswith(pkg): + decl = d + break + + if decl is None: + _rich_warning(f"{pkg}: not found in installed packages") + continue + + if not decl.has_executables: + _rich_info(f"{pkg}: no executable primitives to approve.", symbol="info") + continue + + allow_exec[decl.package_key] = {t: True for t in decl.exec_types} + _rich_success(f"Approved {decl.package_key}: {decl.summary_line()}") + count += 1 + + if count > 0: + write_allow_executables(manifest, allow_exec) + _rich_info(f"Updated allowExecutables in apm.yml ({count} approved).", symbol="info") + + +def _scan_installed_packages(manifest: Path) -> list: + """Scan all installed packages under apm_modules/ for executables.""" + from ..security.executables import ExecutableDeclaration, scan_package_executables + + apm_modules = manifest.parent / "apm_modules" + results: list[ExecutableDeclaration] = [] + + if not apm_modules.is_dir(): + return results + + def _scan_dir(base: Path) -> None: + for pkg_dir in sorted(base.iterdir()): + if not pkg_dir.is_dir() or pkg_dir.name.startswith("."): + continue + # Recurse into _local/ (local path dependencies) + if pkg_dir.name == "_local": + _scan_dir(pkg_dir) + continue + pkg_yml = pkg_dir / "apm.yml" + name = pkg_dir.name + version = "" + if pkg_yml.is_file(): + try: + from ..utils.yaml_io import load_yaml + + data = load_yaml(pkg_yml) + if isinstance(data, dict): + name = data.get("name", name) + version = str(data.get("version", "")) + except Exception: + pass + + decl = scan_package_executables(pkg_dir, name, version) + if decl.has_executables: + results.append(decl) + + _scan_dir(apm_modules) + return results + + +def _is_approved( + allow_exec: dict[str, dict[str, bool]], + decl, +) -> bool: + """Check if a declaration is fully approved.""" + from ..security.executables import _is_fully_approved + + return _is_fully_approved(allow_exec, decl) diff --git a/src/apm_cli/install/context.py b/src/apm_cli/install/context.py index df1d4496a..309550fa0 100644 --- a/src/apm_cli/install/context.py +++ b/src/apm_cli/install/context.py @@ -145,6 +145,7 @@ class InstallContext: total_hooks_integrated: int = 0 # integrate total_links_resolved: int = 0 # integrate direct_dep_failed: bool = False # integrate -- set when any direct dep fails + blocked_executables: list[Any] = field(default_factory=list) # integrate # ------------------------------------------------------------------ # policy_gate diff --git a/src/apm_cli/install/exec_gate.py b/src/apm_cli/install/exec_gate.py new file mode 100644 index 000000000..28772f24c --- /dev/null +++ b/src/apm_cli/install/exec_gate.py @@ -0,0 +1,139 @@ +"""Executable approval gate helpers for the install pipeline. + +Extracted from ``services.py`` to stay within the LOC budget. +These helpers are used by ``integrate_package_primitives`` to enforce +the npm v12-style ``allowExecutables`` default-deny policy. +""" + +from __future__ import annotations + +import builtins +from pathlib import Path +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from apm_cli.install.context import InstallContext + + +def check_executable_approval( + package_name: str, + package_info: Any, + allow_executables: builtins.dict[str, builtins.dict[str, bool]] | None, + *, + ctx: InstallContext | None = None, +) -> tuple[bool, bool]: + """Return ``(hooks_approved, bin_approved)`` for a package. + + Local project content (``_local``) is always trusted. Dependency + packages are checked against the ``allowExecutables`` block. When + no ``allowExecutables`` block exists (``None``), all executables are + considered approved (opt-in enforcement). + + When *ctx* is provided and a package is blocked, the declaration is + recorded on ``ctx.blocked_executables`` for the post-loop prompt. + """ + is_local = package_name == "_local" + if is_local or allow_executables is None: + return True, True + + from apm_cli.security.executables import ( + EXEC_TYPE_BIN, + EXEC_TYPE_HOOKS, + build_approval_key, + is_package_approved, + ) + + # Build candidate keys: the dep-ref canonical key AND the name#version + # fallback so that approvals stored under either format are honoured. + pkg_key = resolve_package_key(package_info, package_name) + candidate_keys = [pkg_key] + + # Add name#version fallback when it differs from the primary key. + _pkg = getattr(package_info, "package", None) + if _pkg: + _name = getattr(_pkg, "name", package_name) or package_name + _ver = getattr(_pkg, "version", "") or "" + alt_key = build_approval_key(_name, _ver) + if alt_key != pkg_key: + candidate_keys.append(alt_key) + + hooks_ok = any( + is_package_approved(allow_executables, k, EXEC_TYPE_HOOKS) for k in candidate_keys + ) + bin_ok = any(is_package_approved(allow_executables, k, EXEC_TYPE_BIN) for k in candidate_keys) + + # Track blocked packages for the post-loop approval prompt. + if ctx is not None and (not hooks_ok or not bin_ok): + from apm_cli.security.executables import scan_package_executables + + _install = Path(package_info.install_path) + _version = "" + _pkg = getattr(package_info, "package", None) + if _pkg: + _version = getattr(_pkg, "version", "") or "" + _decl = scan_package_executables(_install, package_name, _version) + if _decl.has_executables: + ctx.blocked_executables.append(_decl) + + return hooks_ok, bin_ok + + +def resolve_package_key(package_info: Any, package_name: str) -> str: + """Build the ``allowExecutables`` lookup key for a package. + + Tries ``dependency_ref`` first (canonical dependency string), then + falls back to ``name#version`` from the package's own metadata. + """ + from apm_cli.security.executables import build_approval_key + + # Prefer the dependency reference's canonical string (includes version/ref) + dep_ref = getattr(package_info, "dependency_ref", None) + if dep_ref is not None: + canonical = getattr(dep_ref, "canonical_string", None) + if callable(canonical): + cs = canonical() + if cs: + return cs + # Fall back to str(dep_ref) + s = str(dep_ref) + if s: + return s + + # Fall back to package metadata + pkg = getattr(package_info, "package", None) + if pkg is not None: + name = getattr(pkg, "name", package_name) or package_name + version = getattr(pkg, "version", "") or "" + return build_approval_key(name, version) + + return package_name + + +def log_bin_status( + skill_result: Any, + suffix: str, + package_name: str, + package_info: Any, + log_fn, +) -> None: + """Emit integration-tree lines for bin/ deployment or skip reasons.""" + if skill_result.bin_deployed > 0: + log_fn( + f" |-- {skill_result.bin_deployed} executable(s) deployed to " + f"Claude Code's PATH -> {suffix} (invoked without confirmation)" + ) + log_fn(" |-- run /reload-plugins or restart Claude Code to activate") + elif skill_result.bin_skipped_reason == "project_scope": + log_fn( + " |-- plugin ships executables; re-run with -g (global) to deploy them to Claude Code" + ) + elif skill_result.bin_skipped_reason == "no_claude_target": + log_fn( + " |-- plugin ships executables; no active Claude Code skills target to receive them" + ) + elif skill_result.bin_skipped_reason == "not_approved": + _pkg_label = package_name or getattr(package_info, "name", "unknown") + log_fn( + f" |-- bin/ executables skipped (not approved in allowExecutables). " + f"Run 'apm approve {_pkg_label}' to approve." + ) diff --git a/src/apm_cli/install/phases/integrate.py b/src/apm_cli/install/phases/integrate.py index f29ff9ef1..8b0688f7f 100644 --- a/src/apm_cli/install/phases/integrate.py +++ b/src/apm_cli/install/phases/integrate.py @@ -460,6 +460,49 @@ def _check_cowork_caps(ctx: InstallContext) -> None: ctx.diagnostics.warn(msg, package="cowork") +def _run_executable_approval_prompt(ctx: InstallContext) -> None: + """Prompt for approval of packages whose executables were blocked. + + After the integration loop, any package that had hooks or bin/ + blocked is collected in ``ctx.blocked_executables``. This function + runs the interactive approval flow (or hard-errors in CI) and + persists approved entries to ``apm.yml`` so the next install + deploys them. + """ + if not ctx.blocked_executables: + return + + from apm_cli.security.executables import ( + prompt_executable_approval, + write_allow_executables, + ) + + allow_exec = None + if ctx.apm_package is not None: + allow_exec = getattr(ctx.apm_package, "allow_executables", None) + + updated = prompt_executable_approval( + ctx.blocked_executables, + allow_executables=allow_exec, + ) + + # Persist approvals to apm.yml if user approved anything new. + if updated and updated != (allow_exec or {}): + manifest_path = ctx.source_root or ctx.project_root + apm_yml = manifest_path / "apm.yml" + if apm_yml.is_file(): + write_allow_executables(apm_yml, updated) + # Update the in-memory model so subsequent code sees the change. + if ctx.apm_package is not None: + ctx.apm_package.allow_executables = updated + if ctx.logger: + ctx.logger.info( + "Updated allowExecutables in apm.yml. " + "Run 'apm install' again to deploy approved executables.", + symbol="info", + ) + + # ====================================================================== # Public phase entry point # ====================================================================== @@ -628,3 +671,11 @@ def run(ctx: InstallContext) -> None: # a cowork target with a resolved_deploy_root is active. # ------------------------------------------------------------------ _check_cowork_caps(ctx) + + # ------------------------------------------------------------------ + # Executable approval prompt: if any packages had their hooks or + # bin/ blocked, prompt the user to approve them (interactive) or + # hard-error (CI). Approved packages are persisted to apm.yml so + # the next ``apm install`` deploys them automatically. + # ------------------------------------------------------------------ + _run_executable_approval_prompt(ctx) diff --git a/src/apm_cli/install/services.py b/src/apm_cli/install/services.py index 1a2d49352..0d3003d58 100644 --- a/src/apm_cli/install/services.py +++ b/src/apm_cli/install/services.py @@ -174,6 +174,51 @@ def _log_hook_display_payloads( logger.verbose_detail(f" | {_jline}") +def _check_executable_approval( + package_name: str, + package_info: Any, + allow_executables: builtins.dict[str, builtins.dict[str, bool]] | None, + *, + ctx: InstallContext | None = None, +) -> tuple[bool, bool]: + """Delegate to ``exec_gate.check_executable_approval``.""" + from apm_cli.install.exec_gate import check_executable_approval + + return check_executable_approval(package_name, package_info, allow_executables, ctx=ctx) + + +def _resolve_package_key(package_info: Any, package_name: str) -> str: + """Delegate to ``exec_gate.resolve_package_key``.""" + from apm_cli.install.exec_gate import resolve_package_key + + return resolve_package_key(package_info, package_name) + + +def _log_hooks_skip( + package_name: str, package_info: Any, targets: Any, logger: InstallLogger | None +) -> None: + """Warn about skipped hooks only when the package actually ships them. + + Aligned with :meth:`HookIntegrator.find_hook_files`: checks for + ``*.json`` in ``.apm/hooks/`` and ``hooks/``. + """ + _install = Path(package_info.install_path) + has_hooks = False + for hook_dir in [_install / ".apm" / "hooks", _install / "hooks"]: + if hook_dir.is_dir() and any(hook_dir.glob("*.json")): + has_hooks = True + break + if not has_hooks: + return + _pkg_label = package_name or getattr(package_info, "name", "unknown") + if logger: + logger.warning( + f"{_pkg_label}: hooks skipped (not approved in allowExecutables). " + f"Run 'apm approve {_pkg_label}' to approve.", + symbol="warning", + ) + + def integrate_package_primitives( package_info: Any, project_root: Path, @@ -190,6 +235,7 @@ def integrate_package_primitives( ctx: InstallContext | None = None, scratch_root: Path | None = None, policy: Any = None, + allow_executables: builtins.dict[str, builtins.dict[str, bool]] | None = None, ) -> dict: """Run the full integration pipeline for a single package. @@ -205,6 +251,11 @@ def integrate_package_primitives( (Amendment 6) is emitted once per install run for packages that contain non-skill primitives when the cowork target is active. + When *allow_executables* is provided, executable primitives (hooks, + bin/) are only deployed for packages whose key appears in the dict + with the matching type set to ``True``. Local project content + (``package_name == "_local"``) is always trusted. + Returns a dict with integration counters and the list of deployed file paths. """ from apm_cli.integration.dispatch import get_dispatch_table @@ -246,6 +297,11 @@ def integrate_package_primitives( # OR sit inside it. ensure_path_within(child, parent) raises if not. ensure_path_within(Path(project_root).resolve(), scratch_root) + # Executable approval gate (npm v12-style default-deny). + _hooks_approved, _bin_approved = _check_executable_approval( + package_name, package_info, allow_executables, ctx=ctx + ) + # --- Amendment 6: cowork non-skill primitive warning (once per run) --- _cowork_active = any(t.name == "copilot-cowork" for t in targets) if _cowork_active and ctx is not None and not ctx.cowork_nonsupported_warned: @@ -332,6 +388,10 @@ def _format_target_collapse(paths: list[str], verbose: bool) -> tuple[str, list[ for _prim_name, _entry in _dispatch.items(): if _entry.multi_target: continue # skills handled separately + # Executable approval gate: skip hooks if not approved. + if _prim_name == "hooks" and not _hooks_approved: + _log_hooks_skip(package_name, package_info, targets, logger) + continue _integrator = _INTEGRATOR_KWARGS[_prim_name] _agg_files = 0 _agg_adopted = 0 @@ -477,6 +537,7 @@ def _format_target_collapse(paths: list[str], verbose: bool) -> tuple[str, list[ skill_subset=skill_subset, scope=scope, policy=policy, + skip_bin=not _bin_approved, ) _skill_target_dirs: set = builtins.set() for tp in skill_result.target_paths: @@ -509,20 +570,10 @@ def _format_target_collapse(paths: list[str], verbose: bool) -> tuple[str, list[ _log_integration( f" |-- {skill_result.sub_skills_promoted} skill(s) integrated -> {_skill_suffix}" ) - if skill_result.bin_deployed > 0: - _log_integration( - f" |-- {skill_result.bin_deployed} executable(s) deployed to " - f"Claude Code's PATH -> {_skill_suffix} (invoked without confirmation)" - ) - _log_integration(" |-- run /reload-plugins or restart Claude Code to activate") - elif skill_result.bin_skipped_reason == "project_scope": - _log_integration( - " |-- plugin ships executables; re-run with -g (global) to deploy them to Claude Code" - ) - elif skill_result.bin_skipped_reason == "no_claude_target": - _log_integration( - " |-- plugin ships executables; no active Claude Code skills target to receive them" - ) + if skill_result.bin_deployed > 0 or skill_result.bin_skipped_reason: + from apm_cli.install.exec_gate import log_bin_status + + log_bin_status(skill_result, _skill_suffix, package_name, package_info, _log_integration) for tp in skill_result.target_paths: deployed.append(_deployed_path_entry(tp, project_root, targets)) # #1716: also record the bundle's contained files so per-file diff --git a/src/apm_cli/install/template.py b/src/apm_cli/install/template.py index 82d189399..020c0efa9 100644 --- a/src/apm_cli/install/template.py +++ b/src/apm_cli/install/template.py @@ -100,6 +100,7 @@ def _integrate_materialization( else (tuple(dep_ref.skill_subset) if dep_ref.skill_subset else None) ), ctx=ctx, + allow_executables=getattr(getattr(ctx, "apm_package", None), "allow_executables", None), ) mutation_keys = ( "prompts", diff --git a/src/apm_cli/integration/skill_integrator.py b/src/apm_cli/integration/skill_integrator.py index ed1c7c6b1..f98fb873e 100644 --- a/src/apm_cli/integration/skill_integrator.py +++ b/src/apm_cli/integration/skill_integrator.py @@ -4,12 +4,37 @@ import hashlib import re import shutil +from collections.abc import Callable from dataclasses import dataclass, replace from pathlib import Path from apm_cli.integration.base_integrator import BaseIntegrator +def _build_copy_ignore( + *, + skip_bin: bool = False, +) -> Callable[[str, list[str]], list[str]]: + """Build a ``shutil.copytree`` ignore function. + + When *skip_bin* is True the returned function also excludes ``bin/`` + directories so that unapproved executables are not deployed during + skill promotion. + """ + from apm_cli.security.gate import ignore_non_content + + if not skip_bin: + return ignore_non_content + _bin_filter = shutil.ignore_patterns("bin") + + def _combined(directory: str, contents: list[str]) -> list[str]: + return list( + set(ignore_non_content(directory, contents)) | set(_bin_filter(directory, contents)) + ) + + return _combined + + # DEPRECATED -- use IntegrationResult directly for new code. # Kept for backward compatibility. The fields map as follows: # skill_created -> IntegrationResult.skill_created @@ -581,6 +606,7 @@ def _promote_sub_skills( parent_name: str, *, warn: bool = True, + skip_bin: bool = False, owned_by: dict[str, str] | None = None, diagnostics=None, managed_files=None, @@ -692,9 +718,12 @@ def _promote_sub_skills( pass shutil.rmtree(target) target.mkdir(parents=True, exist_ok=True) - from apm_cli.security.gate import ignore_non_content - - shutil.copytree(sub_skill_path, target, dirs_exist_ok=True, ignore=ignore_non_content) + shutil.copytree( + sub_skill_path, + target, + dirs_exist_ok=True, + ignore=_build_copy_ignore(skip_bin=skip_bin), + ) if link_rewriter is not None: link_rewriter._resolve_markdown_links_in_skill_bundle(sub_skill_path, target) promoted += 1 @@ -761,6 +790,7 @@ def _promote_sub_skills_standalone( logger=None, targets=None, skill_subset=None, + skip_bin: bool = False, ) -> tuple[int, list[Path]]: """Promote sub-skills from a package that is NOT itself a skill. @@ -834,6 +864,7 @@ def _promote_sub_skills_standalone( project_root=project_root, name_filter=name_filter, link_rewriter=self, + skip_bin=skip_bin, ) if is_primary: count = n @@ -851,6 +882,7 @@ def _integrate_native_skill( force: bool = False, logger=None, targets=None, + skip_bin: bool = False, ) -> SkillIntegrationResult: """Copy a native Skill (with existing SKILL.md) to all active targets. @@ -1022,13 +1054,13 @@ def _integrate_native_skill( shutil.rmtree(target_skill_dir) target_skill_dir.parent.mkdir(parents=True, exist_ok=True) - from apm_cli.security.gate import ignore_non_content + _base_ignore = _build_copy_ignore(skip_bin=skip_bin) _apm_filter = shutil.ignore_patterns(".apm") def _ignore_non_content_and_apm(directory, contents): return list( - set(ignore_non_content(directory, contents)) + set(_base_ignore(directory, contents)) # noqa: B023 | set(_apm_filter(directory, contents)) # noqa: B023 ) @@ -1056,6 +1088,7 @@ def _ignore_non_content_and_apm(directory, contents): project_root=project_root, logger=logger if is_primary else None, link_rewriter=self, + skip_bin=skip_bin, ) all_target_paths.extend(sub_deployed) @@ -1092,6 +1125,7 @@ def _integrate_skill_bundle( logger=None, targets=None, skill_subset=None, + skip_bin: bool = False, ) -> SkillIntegrationResult: """Promote every skill in a SKILL_BUNDLE's top-level skills/ directory. @@ -1165,6 +1199,7 @@ def _integrate_skill_bundle( logger=logger if is_primary else None, name_filter=_name_filter, link_rewriter=self, + skip_bin=skip_bin, ) if is_primary: total_promoted = n @@ -1195,6 +1230,7 @@ def integrate_package_skill( skill_subset=None, scope=None, policy=None, + skip_bin: bool = False, ) -> SkillIntegrationResult: """Integrate a package's skill into all active target directories. @@ -1213,6 +1249,10 @@ def integrate_package_skill( project_root: Root directory of the project targets: Optional explicit list of TargetProfile objects. skill_subset: Optional tuple of skill names or paths to install (None = all). + skip_bin: When True, skip bin/ executable deployment even if the + package ships one. Used by the executable approval gate to + block unapproved bin/ executables while still deploying text + primitives (skills, sub-skills). Returns: SkillIntegrationResult: Results of the integration operation @@ -1232,6 +1272,7 @@ def integrate_package_skill( logger=logger, targets=targets, skill_subset=skill_subset, + skip_bin=skip_bin, ) return SkillIntegrationResult( skill_created=False, @@ -1270,15 +1311,18 @@ def integrate_package_skill( from apm_cli.models.apm_package import PackageType as _PackageType if package_info.package_type == _PackageType.MARKETPLACE_PLUGIN: - bin_paths, bin_skip_reason = self._deploy_plugin_bin( - package_info, - project_root, - targets, - scope=scope, - policy=policy, - force=force, - logger=logger, - ) + if skip_bin: + bin_skip_reason = "not_approved" + else: + bin_paths, bin_skip_reason = self._deploy_plugin_bin( + package_info, + project_root, + targets, + scope=scope, + policy=policy, + force=force, + logger=logger, + ) # Check if this is a native Skill (already has SKILL.md at root) source_skill_md = package_path / "SKILL.md" @@ -1300,6 +1344,7 @@ def integrate_package_skill( force=force, logger=logger, targets=targets, + skip_bin=skip_bin, ), bin_paths, bin_skip_reason, @@ -1321,6 +1366,7 @@ def integrate_package_skill( logger=logger, targets=targets, skill_subset=skill_subset, + skip_bin=skip_bin, ), bin_paths, bin_skip_reason, @@ -1337,6 +1383,7 @@ def integrate_package_skill( logger=logger, targets=targets, skill_subset=skill_subset, + skip_bin=skip_bin, ) return self._merge_bin_paths( SkillIntegrationResult( diff --git a/src/apm_cli/models/apm_package.py b/src/apm_cli/models/apm_package.py index cdd00b188..520eb4ed8 100644 --- a/src/apm_cli/models/apm_package.py +++ b/src/apm_cli/models/apm_package.py @@ -248,9 +248,16 @@ class APMPackage: # Top-level ``registries:`` block per docs/proposals/registry-api.md §3.1. # Maps registry name -> base URL. None when no ``registries:`` block is present. registries: dict[str, str] | None = None - # Value of ``registries.default:`` — routes unscoped deps to this registry. + # Value of ``registries.default:`` -- routes unscoped deps to this registry. default_registry: str | None = None + # Top-level ``allowExecutables:`` block -- per-package approval for + # executable primitives (hooks, MCP servers, bin/ executables). + # Mirrors npm v12's ``allowScripts`` in ``package.json``. + # Keys are package handles with pinned version; values map exec type + # to boolean (e.g. ``{"owner/repo#v1.0": {"hooks": true}}``). + allow_executables: dict[str, dict[str, bool]] | None = None + @classmethod def _parse_dependency_dict(cls, raw_deps: dict, label: str = "") -> dict: """Parse a dependencies or devDependencies dict from apm.yml. @@ -412,6 +419,11 @@ def from_apm_yml( for dep_list in _iter_apm_dependency_lists(dependencies, dev_dependencies): _route_unscoped_to_default_registry(dep_list, default_registry) + # Parse allowExecutables block (npm v12-style approval gate). + from ..security.executables import parse_allow_executables + + allow_executables = parse_allow_executables(data) + # Parse package content type pkg_type = None if "type" in data and data["type"] is not None: @@ -481,6 +493,7 @@ def from_apm_yml( includes=includes, registries=registries, default_registry=default_registry, + allow_executables=allow_executables, ) _apm_yml_cache[cache_key] = result return result diff --git a/src/apm_cli/security/__init__.py b/src/apm_cli/security/__init__.py index be7ed5139..8f33ed0d9 100644 --- a/src/apm_cli/security/__init__.py +++ b/src/apm_cli/security/__init__.py @@ -1,6 +1,11 @@ """Security utilities for APM content scanning.""" from apm_cli.security.content_scanner import ContentScanner, ScanFinding +from apm_cli.security.executables import ( + ExecutableDeclaration, + is_package_approved, + scan_package_executables, +) from apm_cli.security.gate import ( BLOCK_POLICY, REPORT_POLICY, @@ -17,10 +22,13 @@ "REPORT_POLICY", "WARN_POLICY", "ContentScanner", + "ExecutableDeclaration", "ScanFinding", "ScanPolicy", "ScanVerdict", "SecurityGate", "ignore_non_content", "ignore_symlinks", + "is_package_approved", + "scan_package_executables", ] diff --git a/src/apm_cli/security/executables.py b/src/apm_cli/security/executables.py new file mode 100644 index 000000000..b39b367f2 --- /dev/null +++ b/src/apm_cli/security/executables.py @@ -0,0 +1,435 @@ +"""Executable primitive approval gate (npm v12-inspired opt-in model). + +APM packages can declare three kinds of executable primitives -- hooks, +MCP servers, and bin/ executables -- that run arbitrary code on the +developer's machine. When the consuming project declares an +``allowExecutables`` block in its ``apm.yml``, this module enforces a +deny-by-default policy: none of these primitives are deployed unless +explicitly approved. Projects that omit the block entirely get +backward-compatible behaviour (all executables deployed). + +The design mirrors npm v12's ``allowScripts`` (shipping July 2026): +version-pinned per-package approval, interactive prompts at install +time, and hard errors in non-interactive (CI) environments. + +See also: ``apm approve`` / ``apm deny`` CLI commands. +""" + +from __future__ import annotations + +import os +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + +# Executable type constants used as keys in the allowExecutables block. +EXEC_TYPE_HOOKS = "hooks" +EXEC_TYPE_MCP = "mcp" # Reserved for future enforcement. +EXEC_TYPE_BIN = "bin" + +# Types with active enforcement in the install gate. MCP is excluded +# because MCPIntegrator does not yet honour the approval state -- +# surfacing it in the UI would create a false-assurance control. +ENFORCED_EXEC_TYPES = (EXEC_TYPE_HOOKS, EXEC_TYPE_BIN) + +# All recognised exec-type keys (for manifest validation). +ALL_EXEC_TYPES = (EXEC_TYPE_HOOKS, EXEC_TYPE_MCP, EXEC_TYPE_BIN) + + +@dataclass(frozen=True) +class ExecutableDeclaration: + """Describes the executable primitives declared by a single package. + + Attributes: + package_key: Approval key for this package (e.g. ``owner/repo#v1.0`` + or ``name@marketplace#1.2.0``). + package_name: Human-readable package name. + is_transitive: Whether this package is a transitive dependency. + parent_name: Name of the direct dependency that pulled this in + (only set when *is_transitive* is True). + hook_count: Number of hook files discovered. + mcp_count: Number of MCP server entries discovered. + bin_count: Number of bin/ executables discovered. + hook_details: Per-hook summaries for ``inspect`` display. + mcp_details: Per-MCP-server summaries. + bin_details: Per-binary summaries. + """ + + package_key: str + package_name: str + is_transitive: bool = False + parent_name: str | None = None + hook_count: int = 0 + mcp_count: int = 0 + bin_count: int = 0 + hook_details: list[str] = field(default_factory=list) + mcp_details: list[str] = field(default_factory=list) + bin_details: list[str] = field(default_factory=list) + + @property + def has_executables(self) -> bool: + """Return True if this package declares enforced executable primitives.""" + return self.hook_count > 0 or self.bin_count > 0 + + @property + def exec_types(self) -> list[str]: + """Return the list of enforced executable types this package declares.""" + types: list[str] = [] + if self.hook_count > 0: + types.append(EXEC_TYPE_HOOKS) + if self.bin_count > 0: + types.append(EXEC_TYPE_BIN) + return types + + def summary_line(self) -> str: + """One-line summary for the interactive prompt (enforced types only).""" + parts: list[str] = [] + if self.hook_count: + parts.append(f"{self.hook_count} hook(s)") + if self.bin_count: + parts.append(f"{self.bin_count} bin executable(s)") + return ", ".join(parts) + + +# ------------------------------------------------------------------- +# Approval checking +# ------------------------------------------------------------------- + + +def is_package_approved( + allow_executables: dict[str, dict[str, bool]] | None, + package_key: str, + exec_type: str, +) -> bool: + """Check whether *package_key* is approved for *exec_type*. + + Args: + allow_executables: The parsed ``allowExecutables`` block from the + consuming project's ``apm.yml``. ``None`` means no block + exists (nothing approved). + package_key: The approval key (e.g. ``owner/repo#v1.0``). + exec_type: One of ``hooks``, ``mcp``, ``bin``. + + Returns: + ``True`` only when the block contains a matching entry with + ``{exec_type}: true``. + """ + if not allow_executables: + return False + entry = allow_executables.get(package_key) + if not entry or not isinstance(entry, dict): + return False + return bool(entry.get(exec_type, False)) + + +def is_any_type_approved( + allow_executables: dict[str, dict[str, bool]] | None, + package_key: str, +) -> bool: + """Return True if *package_key* is approved for at least one exec type.""" + if not allow_executables: + return False + entry = allow_executables.get(package_key) + if not entry or not isinstance(entry, dict): + return False + return any(entry.get(t, False) for t in ALL_EXEC_TYPES) + + +# ------------------------------------------------------------------- +# Approval key construction +# ------------------------------------------------------------------- + + +def build_approval_key(package_name: str, version: str) -> str: + """Build the ``allowExecutables`` key for a resolved package. + + Uses the format ``#`` which works for all package + sources (marketplace, git, registry). The caller is responsible for + providing the canonical *package_name* (e.g. ``owner/repo`` for git, + ``name@marketplace`` for marketplace packages). + """ + if not version: + return package_name + return f"{package_name}#{version}" + + +# ------------------------------------------------------------------- +# Package scanning +# ------------------------------------------------------------------- + + +def scan_package_executables( + install_path: Path, + package_name: str, + package_version: str, + *, + is_transitive: bool = False, + parent_name: str | None = None, +) -> ExecutableDeclaration: + """Scan a materialised package directory for executable primitives. + + Checks for: + - ``.apm/hooks/*.json`` and ``hooks/*.json`` -- hook definitions + (mirrors :meth:`HookIntegrator.find_hook_files`) + - ``bin/`` directory -- bin executables + - MCP is declared in the package's ``apm.yml`` under + ``dependencies.mcp``, not as files -- so we parse that instead. + + Returns an :class:`ExecutableDeclaration` (may have zero counts if + the package declares no executables). + """ + key = build_approval_key(package_name, package_version) + + # 1. Hooks: .apm/hooks/*.json and hooks/*.json (aligned with + # HookIntegrator.find_hook_files -- only JSON files are actionable). + hook_files: list[Path] = [] + for hook_dir in [install_path / ".apm" / "hooks", install_path / "hooks"]: + if hook_dir.is_dir(): + hook_files.extend( + sorted(f for f in hook_dir.glob("*.json") if f.is_file() and not f.is_symlink()) + ) + hook_details = [f.name for f in hook_files] + + # 2. Bin executables: top-level bin/ AND .apm/skills/*/bin/ + bin_files: list[Path] = [] + for bin_dir in [install_path / "bin"]: + if bin_dir.is_dir(): + bin_files.extend( + f for f in bin_dir.iterdir() if f.is_file() and not f.name.startswith(".") + ) + # Also scan skill-level bin/ directories + apm_skills = install_path / ".apm" / "skills" + if apm_skills.is_dir(): + for skill_dir in apm_skills.iterdir(): + skill_bin = skill_dir / "bin" + if skill_bin.is_dir(): + bin_files.extend( + f for f in skill_bin.iterdir() if f.is_file() and not f.name.startswith(".") + ) + bin_files = sorted(set(bin_files)) + bin_details = [f.name for f in bin_files] + + # 3. MCP servers: parse from apm.yml dependencies.mcp + mcp_count = 0 + mcp_details: list[str] = [] + apm_yml = install_path / "apm.yml" + if apm_yml.is_file(): + try: + from ..utils.yaml_io import load_yaml + + data = load_yaml(apm_yml) + if isinstance(data, dict): + deps = data.get("dependencies", {}) + if isinstance(deps, dict): + mcp_list = deps.get("mcp", []) + if isinstance(mcp_list, list): + mcp_count = len(mcp_list) + for entry in mcp_list: + if isinstance(entry, str): + mcp_details.append(entry) + elif isinstance(entry, dict): + mcp_details.append(entry.get("name", str(entry))) + except Exception: + pass # Non-fatal: if we cannot parse, treat as zero MCP + + return ExecutableDeclaration( + package_key=key, + package_name=package_name, + is_transitive=is_transitive, + parent_name=parent_name, + hook_count=len(hook_files), + mcp_count=mcp_count, + bin_count=len(bin_files), + hook_details=hook_details, + mcp_details=mcp_details, + bin_details=bin_details, + ) + + +# ------------------------------------------------------------------- +# Interactive approval prompt +# ------------------------------------------------------------------- + + +def _is_interactive() -> bool: + """Return True when stdin is a TTY and not suppressed by env vars.""" + if os.environ.get("APM_NON_INTERACTIVE") or os.environ.get("CI"): + return False + return hasattr(sys.stdin, "isatty") and sys.stdin.isatty() + + +def prompt_executable_approval( + declarations: list[ExecutableDeclaration], + *, + allow_executables: dict[str, dict[str, bool]] | None = None, + trust_all: bool = False, + no_executables: bool = False, +) -> dict[str, dict[str, bool]]: + """Run the interactive approval flow for packages with executables. + + Args: + declarations: Executable declarations for packages that need + approval (already filtered to only those with executables). + allow_executables: Existing ``allowExecutables`` block from + ``apm.yml`` (merged into result for packages already approved). + trust_all: When True, auto-approve everything without prompting. + no_executables: When True, deny everything without prompting. + + Returns: + Updated ``allowExecutables`` dict ready to write back to + ``apm.yml``. + + Raises: + SystemExit: In non-interactive mode when unapproved executables + exist and neither *trust_all* nor *no_executables* is set. + """ + import click + + from ..utils.console import _rich_echo, _rich_info, _rich_warning + + result = dict(allow_executables or {}) + + # Filter to only declarations that actually have executables and are + # not already fully approved. + pending = [d for d in declarations if d.has_executables and not _is_fully_approved(result, d)] + + if not pending: + return result + + # --no-executables: deny everything + if no_executables: + return result + + # --trust-all: approve everything + if trust_all: + for decl in pending: + result[decl.package_key] = {t: True for t in decl.exec_types} + return result + + # Non-interactive (CI): hard error + if not _is_interactive(): + _rich_warning( + f"{len(pending)} package(s) declare executable primitives " + "but are not approved in allowExecutables:" + ) + for decl in pending: + provenance = "(transitive)" if decl.is_transitive else "(direct)" + _rich_echo(f" {decl.package_key} {provenance}: {decl.summary_line()}") + _rich_echo("") + _rich_info( + "Run 'apm approve ' to approve, " + "or add entries to allowExecutables in apm.yml.", + symbol="info", + ) + sys.exit(1) + + # Interactive: prompt per-package + _rich_warning(f"{len(pending)} package(s) declare executable primitives:") + _rich_echo("") + + for decl in pending: + provenance = "transitive" if decl.is_transitive else "direct dependency" + if decl.is_transitive and decl.parent_name: + provenance = f"transitive via {decl.parent_name}" + _rich_echo(f" {decl.package_key} ({provenance})") + _rich_echo(f" {decl.summary_line()}") + _rich_echo("") + + _rich_echo(" These will execute code on your machine when triggered by") + _rich_echo(" your IDE or by 'apm run'.") + _rich_echo("") + + for decl in pending: + approved = click.confirm( + f" Trust {decl.package_name}?", + default=False, + ) + if approved: + result[decl.package_key] = {t: True for t in decl.exec_types} + _rich_echo("") + + return result + + +def _is_fully_approved( + allow_executables: dict[str, dict[str, bool]], + decl: ExecutableDeclaration, +) -> bool: + """Return True if all exec types in *decl* are approved.""" + entry = allow_executables.get(decl.package_key) + if not entry or not isinstance(entry, dict): + return False + return all(entry.get(t, False) for t in decl.exec_types) + + +# ------------------------------------------------------------------- +# Manifest read/write helpers +# ------------------------------------------------------------------- + + +def parse_allow_executables(data: dict[str, Any]) -> dict[str, dict[str, bool]] | None: + """Parse the ``allowExecutables`` block from raw apm.yml data. + + Returns ``None`` when the block is absent. Raises ``ValueError`` + on schema violations (non-dict values, unknown exec types with + non-bool values). + """ + raw = data.get("allowExecutables") + if raw is None: + return None + if not isinstance(raw, dict): + raise ValueError( + "allowExecutables must be a mapping of " + "package keys to {hooks: bool, mcp: bool, bin: bool}" + ) + + result: dict[str, dict[str, bool]] = {} + for pkg_key, entry in raw.items(): + if not isinstance(pkg_key, str): + raise ValueError(f"allowExecutables key must be a string, got {type(pkg_key).__name__}") + if not isinstance(entry, dict): + raise ValueError( + f"allowExecutables[{pkg_key!r}] must be a mapping " + f"of exec types to booleans, got {type(entry).__name__}" + ) + parsed_entry: dict[str, bool] = {} + for exec_type, value in entry.items(): + exec_type_str = str(exec_type) + if exec_type_str not in ALL_EXEC_TYPES: + raise ValueError( + f"allowExecutables[{pkg_key!r}]: unknown exec type " + f"{exec_type_str!r} (valid: {', '.join(ALL_EXEC_TYPES)})" + ) + if not isinstance(value, bool): + raise ValueError( + f"allowExecutables[{pkg_key!r}][{exec_type_str!r}] " + f"must be a boolean, got {type(value).__name__}" + ) + parsed_entry[exec_type_str] = value + result[str(pkg_key)] = parsed_entry + + return result + + +def write_allow_executables( + manifest_path: Path, + allow_executables: dict[str, dict[str, bool]], +) -> None: + """Persist *allow_executables* back to the project's ``apm.yml``. + + Reads the existing YAML, updates the ``allowExecutables`` key, and + writes it back using the standard ``dump_yaml`` helper. + """ + from ..utils.yaml_io import dump_yaml, load_yaml + + data = load_yaml(manifest_path) + if not isinstance(data, dict): + return + + if allow_executables: + data["allowExecutables"] = allow_executables + elif "allowExecutables" in data: + del data["allowExecutables"] + + dump_yaml(data, manifest_path) diff --git a/tests/unit/commands/test_approve_deny.py b/tests/unit/commands/test_approve_deny.py new file mode 100644 index 000000000..bf271ee0f --- /dev/null +++ b/tests/unit/commands/test_approve_deny.py @@ -0,0 +1,206 @@ +"""Unit tests for ``apm_cli.commands.approve`` (apm approve / apm deny). + +Covers: +- ``approve_cmd``: no args error, --pending flag, --all flag, named packages +- ``deny_cmd``: exact match, prefix match, not found +- ``_find_matching_key``: exact and prefix matching +""" + +from __future__ import annotations + +from pathlib import Path + +import yaml +from click.testing import CliRunner + +from apm_cli.commands.approve import ( + _find_matching_key, + approve_cmd, + deny_cmd, +) + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _write_manifest(tmpdir: str, extra: dict | None = None) -> Path: + """Write a minimal apm.yml and return its path.""" + data = {"name": "test-project", "version": "1.0"} + if extra: + data.update(extra) + manifest = Path(tmpdir) / "apm.yml" + manifest.write_text(yaml.dump(data)) + return manifest + + +def _create_pkg_with_hooks(apm_modules: Path, name: str) -> None: + """Create a package directory with a hook file.""" + pkg_dir = apm_modules / name + hook_dir = pkg_dir / ".apm" / "hooks" + hook_dir.mkdir(parents=True) + (hook_dir / "pre-tool-use.json").write_text("{}") + (pkg_dir / "apm.yml").write_text(yaml.dump({"name": name, "version": "1.0"})) + + +def _create_pkg_with_bin(apm_modules: Path, name: str) -> None: + """Create a package directory with bin/ executables.""" + pkg_dir = apm_modules / name + bin_dir = pkg_dir / "bin" + bin_dir.mkdir(parents=True) + (bin_dir / "tool").write_text("#!/bin/sh") + (pkg_dir / "apm.yml").write_text(yaml.dump({"name": name, "version": "2.0"})) + + +# --------------------------------------------------------------------------- +# _find_matching_key +# --------------------------------------------------------------------------- + + +class TestFindMatchingKey: + """Tests for _find_matching_key prefix/exact matching.""" + + def test_exact_match(self) -> None: + allow = {"owner/repo#v1.0": {"hooks": True}} + assert _find_matching_key(allow, "owner/repo#v1.0") == "owner/repo#v1.0" + + def test_prefix_match(self) -> None: + allow = {"owner/repo#v1.0": {"hooks": True}} + assert _find_matching_key(allow, "owner/repo") == "owner/repo#v1.0" + + def test_no_match(self) -> None: + allow = {"other/repo#v1.0": {"hooks": True}} + assert _find_matching_key(allow, "owner/repo") is None + + def test_empty_dict(self) -> None: + assert _find_matching_key({}, "anything") is None + + +# --------------------------------------------------------------------------- +# approve_cmd +# --------------------------------------------------------------------------- + + +class TestApproveCmd: + """Tests for the apm approve CLI command.""" + + def test_no_manifest_exits_1(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + result = runner.invoke(approve_cmd, []) + assert result.exit_code != 0 + + def test_no_args_shows_error(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest(".") + result = runner.invoke(approve_cmd, []) + assert result.exit_code != 0 + assert "Specify at least one package" in result.output + + def test_pending_no_packages(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest(".") + result = runner.invoke(approve_cmd, ["--pending"]) + assert result.exit_code == 0 + assert "approved" in result.output.lower() + + def test_pending_with_unapproved_packages(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest(".") + apm_modules = Path("apm_modules") + _create_pkg_with_hooks(apm_modules, "hook-pkg") + + result = runner.invoke(approve_cmd, ["--pending"]) + assert result.exit_code == 0 + assert "hook-pkg" in result.output + + def test_approve_all(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest(".") + apm_modules = Path("apm_modules") + _create_pkg_with_hooks(apm_modules, "hook-pkg") + _create_pkg_with_bin(apm_modules, "bin-pkg") + + result = runner.invoke(approve_cmd, ["--all"]) + assert result.exit_code == 0 + assert "Approved" in result.output + + # Verify it wrote to apm.yml + from apm_cli.utils.yaml_io import load_yaml + + data = load_yaml(Path("apm.yml")) + assert "allowExecutables" in data + + def test_approve_specific_package(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest(".") + apm_modules = Path("apm_modules") + _create_pkg_with_hooks(apm_modules, "hook-pkg") + + result = runner.invoke(approve_cmd, ["hook-pkg"]) + assert result.exit_code == 0 + assert "Approved" in result.output + + def test_approve_unknown_package(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest(".") + Path("apm_modules").mkdir() + + result = runner.invoke(approve_cmd, ["nonexistent"]) + assert result.exit_code == 0 + assert "not found" in result.output + + +# --------------------------------------------------------------------------- +# deny_cmd +# --------------------------------------------------------------------------- + + +class TestDenyCmd: + """Tests for the apm deny CLI command.""" + + def test_deny_existing_entry(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest( + ".", + extra={ + "allowExecutables": {"pkg#1.0": {"hooks": True}}, + }, + ) + result = runner.invoke(deny_cmd, ["pkg#1.0"]) + assert result.exit_code == 0 + assert "Revoked" in result.output + + from apm_cli.utils.yaml_io import load_yaml + + data = load_yaml(Path("apm.yml")) + ae = data.get("allowExecutables", {}) + assert "pkg#1.0" not in ae + + def test_deny_prefix_match(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest( + ".", + extra={ + "allowExecutables": {"owner/repo#v1.0": {"hooks": True}}, + }, + ) + result = runner.invoke(deny_cmd, ["owner/repo"]) + assert result.exit_code == 0 + assert "Revoked" in result.output + + def test_deny_not_found(self) -> None: + runner = CliRunner() + with runner.isolated_filesystem(): + _write_manifest(".", extra={"allowExecutables": {}}) + result = runner.invoke(deny_cmd, ["nonexistent"]) + assert result.exit_code == 0 + assert "not found" in result.output diff --git a/tests/unit/integration/test_skill_integrator_hermetic.py b/tests/unit/integration/test_skill_integrator_hermetic.py index a726fc230..d83f868e4 100644 --- a/tests/unit/integration/test_skill_integrator_hermetic.py +++ b/tests/unit/integration/test_skill_integrator_hermetic.py @@ -884,3 +884,64 @@ def test_remove_error_increments_errors(self, tmp_path: Path) -> None: project_root=tmp_path, ) assert stats["errors"] == 1 + + +# --------------------------------------------------------------------------- +# Executable gate denial path -- skip_bin blocks bin/ during promotion +# --------------------------------------------------------------------------- + + +class TestSkipBinDenialPath: + """When skip_bin=True, bin/ directories must not be copied during + skill promotion. This is the core security invariant: a one-line + deletion of the skip_bin guard would silently deploy unapproved + executables. + """ + + def test_promote_sub_skills_excludes_bin(self, tmp_path: Path) -> None: + """bin/ excluded from promoted sub-skill when skip_bin=True.""" + sub_skills = tmp_path / "src" / ".apm" / "skills" / "risky" + sub_skills.mkdir(parents=True) + (sub_skills / "SKILL.md").write_text("# risky skill") + bin_dir = sub_skills / "bin" + bin_dir.mkdir() + (bin_dir / "helper").write_text("#!/bin/sh\necho exploit") + + dest = tmp_path / "dest" + dest.mkdir() + + count, _deployed = SkillIntegrator._promote_sub_skills( + sub_skills.parent, + dest, + "risky-pkg", + skip_bin=True, + ) + assert count == 1 + promoted_skill = dest / "risky" + assert (promoted_skill / "SKILL.md").exists() + assert not (promoted_skill / "bin").exists(), "bin/ must be excluded when skip_bin=True" + + def test_promote_sub_skills_includes_bin_when_approved(self, tmp_path: Path) -> None: + """bin/ included in promoted sub-skill when skip_bin=False.""" + sub_skills = tmp_path / "src" / ".apm" / "skills" / "risky" + sub_skills.mkdir(parents=True) + (sub_skills / "SKILL.md").write_text("# risky skill") + bin_dir = sub_skills / "bin" + bin_dir.mkdir() + (bin_dir / "helper").write_text("#!/bin/sh\necho ok") + + dest = tmp_path / "dest" + dest.mkdir() + + count, _deployed = SkillIntegrator._promote_sub_skills( + sub_skills.parent, + dest, + "risky-pkg", + skip_bin=False, + ) + assert count == 1 + promoted_skill = dest / "risky" + assert (promoted_skill / "SKILL.md").exists() + assert (promoted_skill / "bin" / "helper").exists(), ( + "bin/ must be included when skip_bin=False" + ) diff --git a/tests/unit/security/__init__.py b/tests/unit/security/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/security/test_executables.py b/tests/unit/security/test_executables.py new file mode 100644 index 000000000..c3056fdc3 --- /dev/null +++ b/tests/unit/security/test_executables.py @@ -0,0 +1,518 @@ +"""Unit tests for ``apm_cli.security.executables``. + +Covers: +- ``ExecutableDeclaration`` data model properties +- ``is_package_approved`` and ``is_any_type_approved`` checking logic +- ``build_approval_key`` construction +- ``scan_package_executables`` filesystem scanning +- ``parse_allow_executables`` validation +- ``write_allow_executables`` round-trip +- ``prompt_executable_approval`` interactive / CI / trust-all / no-executables paths +- ``_is_fully_approved`` helper +""" + +from __future__ import annotations + +import tempfile +from pathlib import Path +from unittest.mock import patch + +import yaml + +from apm_cli.security.executables import ( + EXEC_TYPE_BIN, + EXEC_TYPE_HOOKS, + EXEC_TYPE_MCP, + ExecutableDeclaration, + _is_fully_approved, + build_approval_key, + is_any_type_approved, + is_package_approved, + parse_allow_executables, + prompt_executable_approval, + scan_package_executables, + write_allow_executables, +) + +# --------------------------------------------------------------------------- +# ExecutableDeclaration +# --------------------------------------------------------------------------- + + +class TestExecutableDeclaration: + """Tests for the ExecutableDeclaration data model.""" + + def test_has_executables_false_when_all_zero(self) -> None: + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a") + assert not decl.has_executables + + def test_has_executables_true_with_hooks(self) -> None: + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a", hook_count=2) + assert decl.has_executables + + def test_has_executables_true_with_mcp_only(self) -> None: + """MCP-only packages are not flagged (MCP enforcement deferred).""" + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a", mcp_count=1) + assert not decl.has_executables + + def test_has_executables_true_with_bin(self) -> None: + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a", bin_count=3) + assert decl.has_executables + + def test_exec_types_empty(self) -> None: + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a") + assert decl.exec_types == [] + + def test_exec_types_all(self) -> None: + """exec_types only includes enforced types (hooks, bin); MCP is excluded.""" + decl = ExecutableDeclaration( + package_key="a#1.0", + package_name="a", + hook_count=1, + mcp_count=1, + bin_count=1, + ) + assert decl.exec_types == [EXEC_TYPE_HOOKS, EXEC_TYPE_BIN] + + def test_exec_types_partial(self) -> None: + decl = ExecutableDeclaration( + package_key="a#1.0", package_name="a", hook_count=1, bin_count=2 + ) + assert decl.exec_types == [EXEC_TYPE_HOOKS, EXEC_TYPE_BIN] + + def test_summary_line(self) -> None: + """summary_line only shows enforced types (hooks, bin).""" + decl = ExecutableDeclaration( + package_key="a#1.0", + package_name="a", + hook_count=2, + mcp_count=1, + bin_count=3, + ) + summary = decl.summary_line() + assert "2 hook(s)" in summary + assert "MCP" not in summary + assert "3 bin executable(s)" in summary + + def test_summary_line_hooks_only(self) -> None: + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a", hook_count=1) + assert decl.summary_line() == "1 hook(s)" + + def test_is_frozen(self) -> None: + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a") + try: + decl.hook_count = 5 # type: ignore[misc] + raise AssertionError("Expected FrozenInstanceError") + except AttributeError: + pass + + +# --------------------------------------------------------------------------- +# is_package_approved +# --------------------------------------------------------------------------- + + +class TestIsPackageApproved: + """Tests for is_package_approved.""" + + def test_none_allow_executables_returns_false(self) -> None: + assert not is_package_approved(None, "a#1.0", EXEC_TYPE_HOOKS) + + def test_empty_allow_executables_returns_false(self) -> None: + assert not is_package_approved({}, "a#1.0", EXEC_TYPE_HOOKS) + + def test_missing_key_returns_false(self) -> None: + allow = {"b#1.0": {"hooks": True}} + assert not is_package_approved(allow, "a#1.0", EXEC_TYPE_HOOKS) + + def test_wrong_exec_type_returns_false(self) -> None: + allow = {"a#1.0": {"hooks": True}} + assert not is_package_approved(allow, "a#1.0", EXEC_TYPE_BIN) + + def test_approved_returns_true(self) -> None: + allow = {"a#1.0": {"hooks": True, "bin": True}} + assert is_package_approved(allow, "a#1.0", EXEC_TYPE_HOOKS) + assert is_package_approved(allow, "a#1.0", EXEC_TYPE_BIN) + + def test_false_value_returns_false(self) -> None: + allow = {"a#1.0": {"hooks": False}} + assert not is_package_approved(allow, "a#1.0", EXEC_TYPE_HOOKS) + + def test_non_dict_entry_returns_false(self) -> None: + allow = {"a#1.0": True} # type: ignore[dict-item] + assert not is_package_approved(allow, "a#1.0", EXEC_TYPE_HOOKS) + + +# --------------------------------------------------------------------------- +# is_any_type_approved +# --------------------------------------------------------------------------- + + +class TestIsAnyTypeApproved: + """Tests for is_any_type_approved.""" + + def test_none_returns_false(self) -> None: + assert not is_any_type_approved(None, "a#1.0") + + def test_empty_returns_false(self) -> None: + assert not is_any_type_approved({}, "a#1.0") + + def test_any_true_returns_true(self) -> None: + allow = {"a#1.0": {"mcp": True}} + assert is_any_type_approved(allow, "a#1.0") + + def test_all_false_returns_false(self) -> None: + allow = {"a#1.0": {"hooks": False, "mcp": False, "bin": False}} + assert not is_any_type_approved(allow, "a#1.0") + + +# --------------------------------------------------------------------------- +# build_approval_key +# --------------------------------------------------------------------------- + + +class TestBuildApprovalKey: + """Tests for build_approval_key.""" + + def test_with_version(self) -> None: + assert build_approval_key("owner/repo", "v1.0") == "owner/repo#v1.0" + + def test_empty_version(self) -> None: + assert build_approval_key("owner/repo", "") == "owner/repo" + + def test_marketplace_format(self) -> None: + assert build_approval_key("ci-hooks@acme", "1.2.0") == "ci-hooks@acme#1.2.0" + + +# --------------------------------------------------------------------------- +# scan_package_executables +# --------------------------------------------------------------------------- + + +class TestScanPackageExecutables: + """Tests for scan_package_executables.""" + + def test_empty_directory(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + decl = scan_package_executables(Path(tmpdir), "test-pkg", "1.0") + assert not decl.has_executables + assert decl.package_key == "test-pkg#1.0" + assert decl.hook_count == 0 + assert decl.mcp_count == 0 + assert decl.bin_count == 0 + + def test_detects_hooks(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + hook_dir = Path(tmpdir) / ".apm" / "hooks" + hook_dir.mkdir(parents=True) + (hook_dir / "pre-tool-use.json").write_text("{}") + (hook_dir / "post-tool-use.json").write_text("{}") + + decl = scan_package_executables(Path(tmpdir), "hooks-pkg", "2.0") + assert decl.hook_count == 2 + assert decl.has_executables + assert EXEC_TYPE_HOOKS in decl.exec_types + assert "pre-tool-use.json" in decl.hook_details + assert "post-tool-use.json" in decl.hook_details + + def test_detects_bin(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + bin_dir = Path(tmpdir) / "bin" + bin_dir.mkdir() + (bin_dir / "tool1").write_text("#!/bin/sh\necho hi") + (bin_dir / "tool2").write_text("#!/bin/sh\necho bye") + # Hidden files should be ignored + (bin_dir / ".hidden").write_text("ignored") + + decl = scan_package_executables(Path(tmpdir), "bin-pkg", "3.0") + assert decl.bin_count == 2 + assert EXEC_TYPE_BIN in decl.exec_types + assert "tool1" in decl.bin_details + assert "tool2" in decl.bin_details + + def test_detects_mcp_from_apm_yml(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + apm_yml = Path(tmpdir) / "apm.yml" + apm_yml.write_text( + yaml.dump( + { + "name": "mcp-pkg", + "version": "1.0", + "dependencies": { + "mcp": [ + "server-a", + {"name": "server-b", "command": "node"}, + ] + }, + } + ) + ) + decl = scan_package_executables(Path(tmpdir), "mcp-pkg", "1.0") + assert decl.mcp_count == 2 + # MCP is scanned but not included in enforced exec_types. + assert EXEC_TYPE_MCP not in decl.exec_types + assert "server-a" in decl.mcp_details + + def test_transitive_flag(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + decl = scan_package_executables( + Path(tmpdir), + "trans-pkg", + "1.0", + is_transitive=True, + parent_name="parent-pkg", + ) + assert decl.is_transitive + assert decl.parent_name == "parent-pkg" + + def test_combined_hooks_and_bin(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + hook_dir = Path(tmpdir) / ".apm" / "hooks" + hook_dir.mkdir(parents=True) + (hook_dir / "validate.json").write_text("{}") + + bin_dir = Path(tmpdir) / "bin" + bin_dir.mkdir() + (bin_dir / "cli-tool").write_text("#!/bin/sh") + + decl = scan_package_executables(Path(tmpdir), "combined", "1.0") + assert decl.hook_count == 1 + assert decl.bin_count == 1 + assert decl.exec_types == [EXEC_TYPE_HOOKS, EXEC_TYPE_BIN] + + +# --------------------------------------------------------------------------- +# parse_allow_executables +# --------------------------------------------------------------------------- + + +class TestParseAllowExecutables: + """Tests for parse_allow_executables validation.""" + + def test_absent_returns_none(self) -> None: + assert parse_allow_executables({}) is None + + def test_valid_block(self) -> None: + data = { + "allowExecutables": { + "ci-hooks@acme#1.2.0": {"hooks": True, "bin": False}, + "mcp-tools@corp#2.0": {"mcp": True}, + } + } + result = parse_allow_executables(data) + assert result is not None + assert result["ci-hooks@acme#1.2.0"]["hooks"] is True + assert result["ci-hooks@acme#1.2.0"]["bin"] is False + assert result["mcp-tools@corp#2.0"]["mcp"] is True + + def test_empty_block(self) -> None: + data = {"allowExecutables": {}} + result = parse_allow_executables(data) + assert result == {} + + def test_non_dict_top_level_raises(self) -> None: + data = {"allowExecutables": "invalid"} + try: + parse_allow_executables(data) + raise AssertionError("Expected ValueError") + except ValueError as e: + assert "must be a mapping" in str(e) + + def test_non_dict_entry_raises(self) -> None: + data = {"allowExecutables": {"pkg#1.0": True}} + try: + parse_allow_executables(data) + raise AssertionError("Expected ValueError") + except ValueError as e: + assert "must be a mapping" in str(e) + + def test_non_bool_value_raises(self) -> None: + data = {"allowExecutables": {"pkg#1.0": {"hooks": "yes"}}} + try: + parse_allow_executables(data) + raise AssertionError("Expected ValueError") + except ValueError as e: + assert "must be a boolean" in str(e) + + def test_unknown_exec_type_raises(self) -> None: + data = {"allowExecutables": {"pkg#1.0": {"hokks": True}}} + try: + parse_allow_executables(data) + raise AssertionError("Expected ValueError") + except ValueError as e: + assert "unknown exec type" in str(e) + assert "hokks" in str(e) + + +# --------------------------------------------------------------------------- +# write_allow_executables +# --------------------------------------------------------------------------- + + +class TestWriteAllowExecutables: + """Tests for write_allow_executables round-trip.""" + + def test_writes_block(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + manifest = Path(tmpdir) / "apm.yml" + manifest.write_text(yaml.dump({"name": "my-project", "version": "1.0"})) + + allow = {"pkg#1.0": {"hooks": True}} + write_allow_executables(manifest, allow) + + from apm_cli.utils.yaml_io import load_yaml + + data = load_yaml(manifest) + assert data["allowExecutables"] == {"pkg#1.0": {"hooks": True}} + + def test_removes_block_when_empty(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + manifest = Path(tmpdir) / "apm.yml" + manifest.write_text( + yaml.dump( + { + "name": "my-project", + "allowExecutables": {"old#1.0": {"hooks": True}}, + } + ) + ) + + write_allow_executables(manifest, {}) + + from apm_cli.utils.yaml_io import load_yaml + + data = load_yaml(manifest) + assert "allowExecutables" not in data + + def test_preserves_other_fields(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + manifest = Path(tmpdir) / "apm.yml" + manifest.write_text( + yaml.dump({"name": "my-project", "version": "1.0", "description": "test"}) + ) + + allow = {"pkg#1.0": {"bin": True}} + write_allow_executables(manifest, allow) + + from apm_cli.utils.yaml_io import load_yaml + + data = load_yaml(manifest) + assert data["name"] == "my-project" + assert data["version"] == "1.0" + assert data["description"] == "test" + assert data["allowExecutables"]["pkg#1.0"]["bin"] is True + + +# --------------------------------------------------------------------------- +# _is_fully_approved +# --------------------------------------------------------------------------- + + +class TestIsFullyApproved: + """Tests for _is_fully_approved helper.""" + + def test_all_types_approved(self) -> None: + allow = {"a#1.0": {"hooks": True, "bin": True}} + decl = ExecutableDeclaration( + package_key="a#1.0", package_name="a", hook_count=1, bin_count=1 + ) + assert _is_fully_approved(allow, decl) + + def test_partial_approval(self) -> None: + allow = {"a#1.0": {"hooks": True}} + decl = ExecutableDeclaration( + package_key="a#1.0", package_name="a", hook_count=1, bin_count=1 + ) + assert not _is_fully_approved(allow, decl) + + def test_no_entry(self) -> None: + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a", hook_count=1) + assert not _is_fully_approved({}, decl) + + +# --------------------------------------------------------------------------- +# prompt_executable_approval +# --------------------------------------------------------------------------- + + +class TestPromptExecutableApproval: + """Tests for prompt_executable_approval flow.""" + + def _make_decl( + self, + key: str = "pkg#1.0", + name: str = "pkg", + hooks: int = 1, + bins: int = 0, + ) -> ExecutableDeclaration: + return ExecutableDeclaration( + package_key=key, + package_name=name, + hook_count=hooks, + bin_count=bins, + ) + + def test_trust_all_approves_everything(self) -> None: + decl = self._make_decl() + result = prompt_executable_approval([decl], trust_all=True) + assert "pkg#1.0" in result + assert result["pkg#1.0"]["hooks"] is True + + def test_no_executables_denies_everything(self) -> None: + decl = self._make_decl() + result = prompt_executable_approval([decl], no_executables=True) + assert "pkg#1.0" not in result + + def test_already_approved_skipped(self) -> None: + decl = self._make_decl() + existing = {"pkg#1.0": {"hooks": True}} + result = prompt_executable_approval([decl], allow_executables=existing, trust_all=True) + # Should preserve existing + assert result["pkg#1.0"]["hooks"] is True + + def test_no_pending_returns_existing(self) -> None: + decl = self._make_decl() + existing = {"pkg#1.0": {"hooks": True}} + result = prompt_executable_approval([decl], allow_executables=existing) + assert result == existing + + def test_non_interactive_exits(self) -> None: + decl = self._make_decl() + with patch("apm_cli.security.executables._is_interactive", return_value=False): + try: + prompt_executable_approval([decl]) + raise AssertionError("Expected SystemExit") + except SystemExit as e: + assert e.code == 1 + + def test_empty_declarations_returns_existing(self) -> None: + existing = {"old#1.0": {"hooks": True}} + result = prompt_executable_approval([], allow_executables=existing) + assert result == existing + + def test_no_executable_declarations_returns_existing(self) -> None: + # Declaration with no actual executables + decl = ExecutableDeclaration(package_key="a#1.0", package_name="a") + result = prompt_executable_approval([decl], trust_all=True) + assert "a#1.0" not in result + + def test_trust_all_with_multiple_types(self) -> None: + decl = self._make_decl(hooks=2, bins=3) + result = prompt_executable_approval([decl], trust_all=True) + assert result["pkg#1.0"]["hooks"] is True + assert result["pkg#1.0"]["bin"] is True + + @patch("apm_cli.security.executables._is_interactive", return_value=True) + @patch("click.confirm", return_value=True) + def test_interactive_approve(self, mock_confirm, mock_interactive) -> None: + decl = self._make_decl() + result = prompt_executable_approval([decl]) + assert "pkg#1.0" in result + assert result["pkg#1.0"]["hooks"] is True + + @patch("apm_cli.security.executables._is_interactive", return_value=True) + @patch("click.confirm", return_value=False) + def test_interactive_deny(self, mock_confirm, mock_interactive) -> None: + decl = self._make_decl() + result = prompt_executable_approval([decl]) + assert "pkg#1.0" not in result