diff --git a/Dockerfile b/Dockerfile index 40d6fad3..574d38ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ FROM base AS builder ENV POETRY_VERSION=2.2.1 # deps are required to build cffi -RUN apk add --no-cache --virtual .build-deps gcc=14.2.0-r4 libffi-dev=3.4.7-r0 musl-dev=1.2.5-r9 && \ +RUN apk add --no-cache --virtual .build-deps gcc=14.2.0-r4 libffi-dev=3.4.7-r0 musl-dev=1.2.5-r11 && \ pip install --no-cache-dir "poetry==$POETRY_VERSION" "poetry-dynamic-versioning[plugin]" && \ apk del .build-deps gcc libffi-dev musl-dev diff --git a/README.md b/README.md index 780d947b..806390d0 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,11 @@ This guide walks you through both installation and usage. 2. [Available Options](#available-options) 3. [MCP Tools](#mcp-tools) 4. [Usage Examples](#usage-examples) -5. [Scan Command](#scan-command) +5. [Platform Command](#platform-command-beta) + 1. [Discovering Commands](#discovering-commands) + 2. [Examples](#platform-examples) + 3. [Notes & Limitations](#platform-notes--limitations) +6. [Scan Command](#scan-command) 1. [Running a Scan](#running-a-scan) 1. [Options](#options) 1. [Severity Threshold](#severity-option) @@ -605,6 +609,64 @@ This information can be helpful when: - Debugging transport-specific issues +# Platform Command \[BETA\] + +> [!WARNING] +> The `platform` command is in **beta**. Commands, arguments, and output formats are generated dynamically from the Cycode API spec and may change between releases without notice. Do not rely on them in production automation yet. + +The `cycode platform` command exposes the Cycode platform's read APIs as CLI commands. It groups endpoints by resource (e.g. `projects`, `violations`, `workflows`) and turns each endpoint's parameters into typed CLI arguments and `--option` flags. + +```bash +cycode platform projects list --page-size 50 +cycode platform violations count +cycode platform workflows view +``` + +The OpenAPI spec is fetched from the Cycode API on first use and cached at `~/.cycode/openapi-spec.json` for 24 hours. Unrelated commands (`cycode scan`, `cycode status`, etc.) do not trigger a fetch. + +> [!NOTE] +> You must be authenticated (`cycode auth` or `CYCODE_CLIENT_ID` / `CYCODE_CLIENT_SECRET` environment variables) for `cycode platform` to discover and run commands. Other Cycode CLI commands work without authentication. + +## Discovering Commands + +Because commands are generated from the spec, the source of truth for what's available is `--help`: + +```bash +cycode platform --help # list all resource groups +cycode platform projects --help # list actions on a resource +cycode platform projects list --help # list options/arguments for an action +``` + +## Platform Examples + +```bash +# List projects with pagination +cycode platform projects list --page-size 25 + +# View a single project by ID +cycode platform projects view + +# Count violations across the tenant +cycode platform violations count + +# Filter using query parameters (see `--help` for what each endpoint supports) +cycode platform violations list --severity CRITICAL +``` + +All output is JSON by default — pipe it through `jq` for ad-hoc filtering: + +```bash +cycode platform projects list --page-size 100 | jq '.items[].name' +``` + +## Platform Notes & Limitations + +- **Read-only today.** Only `GET` endpoints are exposed in this beta. +- **Spec-driven.** Adding a new endpoint to the API surfaces it automatically the next time the cache is refreshed. +- **No bundled spec.** The first `cycode platform` invocation after install (or after the 24h cache expires) performs a network fetch. On slow connections this first call may take a few seconds; subsequent calls are near-instant until the cache expires. +- **Override the cache TTL** with `CYCODE_SPEC_CACHE_TTL=`. + + # Scan Command ## Running a Scan diff --git a/cycode/cli/app.py b/cycode/cli/app.py index 0e9f9c7b..103e8b86 100644 --- a/cycode/cli/app.py +++ b/cycode/cli/app.py @@ -2,6 +2,7 @@ import sys from typing import Annotated, Optional +import click import typer from typer import rich_utils from typer._completion_classes import completion_init @@ -10,6 +11,7 @@ from cycode import __version__ from cycode.cli.apps import ai_guardrails, ai_remediation, auth, configure, ignore, report, report_import, scan, status +from cycode.cli.apps.api import get_platform_group if sys.version_info >= (3, 10): from cycode.cli.apps import mcp @@ -56,6 +58,27 @@ if sys.version_info >= (3, 10): app.add_typer(mcp.app) +# Register the `platform` command group (dynamically built from the OpenAPI spec). +# The group itself is constructed cheaply at import time; the spec is only fetched +# when the user actually invokes `cycode platform ...`. Unrelated commands like +# `cycode scan` and `cycode status` never trigger a spec fetch. +# +# Typer doesn't support adding native Click groups directly, so we monkey-patch +# typer.main.get_group to inject our `platform` group into the resolved Click group. +# The `app_typer is app` guard ensures we only modify our own app. +_platform_group = get_platform_group() +_original_get_group = typer.main.get_group + + +def _get_group_with_platform(app_typer: typer.Typer) -> click.Group: + group = _original_get_group(app_typer) + if app_typer is app and _platform_group.name not in group.commands: + group.add_command(_platform_group, _platform_group.name) + return group + + +typer.main.get_group = _get_group_with_platform + def check_latest_version_on_close(ctx: typer.Context) -> None: output = ctx.obj.get('output') diff --git a/cycode/cli/apps/api/__init__.py b/cycode/cli/apps/api/__init__.py new file mode 100644 index 00000000..e65f9c6f --- /dev/null +++ b/cycode/cli/apps/api/__init__.py @@ -0,0 +1,69 @@ +"""Cycode platform API CLI commands. + +Dynamically builds CLI command groups from the Cycode API v4 OpenAPI spec. +The spec is fetched lazily — only when the user invokes `cycode platform ...` — +and cached locally for 24 hours. +""" + +from typing import Any, Optional + +import click + +from cycode.logger import get_logger + +logger = get_logger('Platform') + +_PLATFORM_HELP = ( + '[BETA] Access the Cycode platform.\n\n' + 'Commands are generated dynamically from the Cycode API spec and may change ' + 'between releases. The spec is fetched on first use and cached for 24 hours.' +) + + +class PlatformGroup(click.Group): + """Lazy-loading Click group for `cycode platform` subcommands. + + The OpenAPI spec is only fetched when the user actually invokes + `cycode platform ...` (or asks for its help). Unrelated commands like + `cycode scan` or `cycode status` never trigger a spec fetch. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self._loaded: bool = False + + def _ensure_loaded(self, ctx: Optional[click.Context]) -> None: + if self._loaded: + return + self._loaded = True # set first to avoid re-entrancy on errors + + client_id = client_secret = None + if ctx is not None: + root = ctx.find_root() + if root.obj: + client_id = root.obj.get('client_id') + client_secret = root.obj.get('client_secret') + + try: + from cycode.cli.apps.api.api_command import build_api_command_groups + + for sub_group, name in build_api_command_groups(client_id, client_secret): + if name not in self.commands: + self.add_command(sub_group, name) + except Exception as e: + logger.debug('Could not load platform commands: %s', e) + # Surface the error to the user only when they're inside `platform` + click.echo(f'Error loading Cycode platform commands: {e}', err=True) + + def list_commands(self, ctx: click.Context) -> list[str]: + self._ensure_loaded(ctx) + return super().list_commands(ctx) + + def get_command(self, ctx: click.Context, cmd_name: str) -> Optional[click.Command]: + self._ensure_loaded(ctx) + return super().get_command(ctx, cmd_name) + + +def get_platform_group() -> click.Group: + """Return the top-level `platform` Click group (lazy-loading).""" + return PlatformGroup(name='platform', help=_PLATFORM_HELP, no_args_is_help=True) diff --git a/cycode/cli/apps/api/api_command.py b/cycode/cli/apps/api/api_command.py new file mode 100644 index 00000000..1926c93c --- /dev/null +++ b/cycode/cli/apps/api/api_command.py @@ -0,0 +1,271 @@ +"""OpenAPI-to-Typer translator: dynamically builds CLI commands from the Cycode API v4 spec.""" + +import json +import re +from typing import Any, Optional + +import click + +from cycode.cli.apps.api.openapi_spec import OpenAPISpecError, get_openapi_spec, parse_spec_commands +from cycode.logger import get_logger + +logger = get_logger('API Command') + +# Map OpenAPI parameter types to Click types +_CLICK_TYPE_MAP: dict[str, click.ParamType] = { + 'string': click.STRING, + 'integer': click.INT, + 'number': click.FLOAT, + 'boolean': click.BOOL, +} + + +def _normalize_tag(tag: str) -> str: + """Normalize an OpenAPI tag to a CLI-friendly command name. + + 'Scan Statistics' -> 'scan-statistics' + 'CLI scan statistics' -> 'cli-scan-statistics' + """ + return re.sub(r'[^a-z0-9]+', '-', tag.lower()).strip('-') + + +def _find_common_prefix(paths: list[str]) -> str: + """Find the longest common path prefix shared by all paths.""" + if not paths: + return '' + if len(paths) == 1: + # For single-path tags, use the parent directory as prefix + return '/'.join(paths[0].split('/')[:-1]) + + common = paths[0] + for p in paths[1:]: + while not p.startswith(common + '/') and common != p: + common = '/'.join(common.split('/')[:-1]) + return common + + +def _path_to_command_name(path: str, common_prefix: str, has_path_params: bool) -> str: + """Derive a CLI command name from an API path relative to the tag's common prefix. + + Rules: + 1. Strip the common prefix shared by all endpoints in the tag + 2. Remove path parameter segments ({id}) + 3. If nothing remains: 'list' (no path params) or 'view' (has path params) + 4. Otherwise: use remaining segments joined with hyphens + + Examples: + /v4/projects (prefix=/v4/projects) -> list + /v4/projects/{id} (prefix=/v4/projects) -> view + /v4/projects/assets (prefix=/v4/projects) -> assets + /v4/violations/count (prefix=/v4/violations) -> count + """ + # Strip common prefix + relative = path[len(common_prefix) :] if path.startswith(common_prefix) else path + relative = relative.strip('/') + + # Remove path parameter segments and empty parts + parts = [p for p in relative.split('/') if p and not p.startswith('{')] + + if not parts: + return 'view' if has_path_params else 'list' + + # Join remaining segments with hyphens, normalize to kebab-case + return re.sub(r'[^a-z0-9]+', '-', '-'.join(parts).lower()).strip('-') + + +def _param_to_option_name(name: str) -> str: + """Convert an OpenAPI parameter name to a CLI option name. + + 'page_size' -> '--page-size' + 'pageSize' -> '--page-size' + 'filter.status' -> '--filter-status' + """ + s = re.sub(r'([a-z])([A-Z])', r'\1-\2', name) + # Replace any non-alphanumeric characters with hyphens + s = re.sub(r'[^a-z0-9]+', '-', s.lower()).strip('-') + return f'--{s}' + + +def _make_api_request( + endpoint_path: str, + method: str, + path_params: dict[str, str], + query_params: dict[str, Any], + client_id: Optional[str] = None, + client_secret: Optional[str] = None, +) -> dict: + """Execute an API request using the CLI's standard auth client.""" + from urllib.parse import quote + + from cycode.cli.apps.api.openapi_spec import resolve_credentials + from cycode.cyclient.cycode_token_based_client import CycodeTokenBasedClient + + cid, csecret = resolve_credentials(client_id, client_secret) + client = CycodeTokenBasedClient(cid, csecret) + + # Substitute path parameters (URL-encoded to prevent path traversal) + url_path = endpoint_path + for param_name, param_value in path_params.items(): + url_path = url_path.replace(f'{{{param_name}}}', quote(str(param_value), safe='')) + + filtered_query = {k: v for k, v in query_params.items() if v is not None} + + response = client.get(url_path.lstrip('/'), params=filtered_query) + return response.json() + + +def build_api_command_groups( + client_id: Optional[str] = None, + client_secret: Optional[str] = None, +) -> list[tuple[click.Group, str]]: + """Build Click command groups from the OpenAPI spec. + + Returns a list of (click_group, command_name) tuples. + """ + try: + spec = get_openapi_spec(client_id, client_secret) + except OpenAPISpecError as e: + logger.warning('Could not load OpenAPI spec: %s', e) + return [] + + groups = parse_spec_commands(spec) + result = [] + + for tag, endpoints in groups.items(): + tag_name = _normalize_tag(tag) + + group = click.Group(name=tag_name, help=f'[BETA] {tag}') + + # Compute common prefix from all GET (non-deprecated) endpoint paths in this tag + get_endpoints = [ep for ep in endpoints if ep['method'] == 'get' and not ep.get('deprecated')] + if not get_endpoints: + continue + + clean_paths = [re.sub(r'/\{[^}]+\}', '', ep['path']) for ep in get_endpoints] + common_prefix = _find_common_prefix(clean_paths) + + used_names: dict[str, int] = {} + + for endpoint in get_endpoints: + has_path_params = bool(endpoint['path_params']) + cmd_name = _path_to_command_name(endpoint['path'], common_prefix, has_path_params) + + # Fix redundancy: if command name matches the tag name, use list/view + # e.g. "cycode groups groups" -> "cycode groups list" + if cmd_name == tag_name: + cmd_name = 'view' if has_path_params else 'list' + + # Handle duplicate names (e.g. deprecated + new endpoint for same resource) + if cmd_name in used_names: + used_names[cmd_name] += 1 + cmd_name = f'{cmd_name}-v{used_names[cmd_name]}' + else: + used_names[cmd_name] = 1 + + cmd = _build_endpoint_command(cmd_name, endpoint) + group.add_command(cmd, cmd_name) + + result.append((group, tag_name)) + + return result + + +def _build_click_params(endpoint: dict) -> list[click.Parameter]: + """Build Click parameters from OpenAPI endpoint definition.""" + params: list[click.Parameter] = [] + + # Path parameters -> required arguments + for p in endpoint['path_params']: + param_type = _CLICK_TYPE_MAP.get(p.get('schema', {}).get('type', 'string'), click.STRING) + params.append( + click.Argument( + [p['name'].replace('-', '_')], + type=param_type, + required=True, + ) + ) + + # Query parameters -> --option flags + for p in endpoint['query_params']: + param_type = _CLICK_TYPE_MAP.get(p.get('schema', {}).get('type', 'string'), click.STRING) + option_name = _param_to_option_name(p['name']) + required = p.get('required', False) + default = p.get('schema', {}).get('default') + + schema = p.get('schema', {}) + if 'enum' in schema: + param_type = click.Choice(schema['enum']) + + params.append( + click.Option( + [option_name], + type=param_type, + required=required, + default=default, + help=p.get('description', ''), + show_default=default is not None, + ) + ) + + return params + + +def _build_endpoint_command(cmd_name: str, endpoint: dict) -> click.Command: + """Build a Click command for an API endpoint. + + Path parameters become required CLI arguments. + Query parameters become --option flags with proper types. + """ + ep_path = endpoint['path'] + ep_method = endpoint['method'] + ep_path_params = list(endpoint['path_params']) + ep_query_params = list(endpoint['query_params']) + ep_description = endpoint['description'] or endpoint['summary'] + + # Build a mapping from Click's normalized kwarg name to original OpenAPI param name + _path_param_map = {p['name'].replace('-', '_').lower(): p['name'] for p in ep_path_params} + _query_param_map = {re.sub(r'[^a-z0-9]+', '_', p['name'].lower()).strip('_'): p['name'] for p in ep_query_params} + + def _callback(**kwargs: Any) -> None: + ctx = click.get_current_context() + + # Extract path param values using the mapping + path_values = {} + for kwarg_key, original_name in _path_param_map.items(): + if kwarg_key in kwargs and kwargs[kwarg_key] is not None: + path_values[original_name] = kwargs[kwarg_key] + + # Extract query param values (skip None) + query_values = {} + for kwarg_key, original_name in _query_param_map.items(): + value = kwargs.get(kwarg_key) + if value is not None: + query_values[original_name] = value + + # Get auth from root context (set by app_callback) + root_ctx = ctx.find_root() + client_id = root_ctx.obj.get('client_id') if root_ctx.obj else None + client_secret = root_ctx.obj.get('client_secret') if root_ctx.obj else None + + try: + result = _make_api_request( + ep_path, + ep_method, + path_values, + query_values, + client_id=client_id, + client_secret=client_secret, + ) + except Exception as e: + click.echo(f'Error: {e}', err=True) + raise click.Abort from e + + click.echo(json.dumps(result, indent=2)) + + return click.Command( + name=cmd_name, + callback=_callback, + help=ep_description, + short_help=endpoint['summary'], + params=_build_click_params(endpoint), + ) diff --git a/cycode/cli/apps/api/openapi_spec.py b/cycode/cli/apps/api/openapi_spec.py new file mode 100644 index 00000000..74ffdb69 --- /dev/null +++ b/cycode/cli/apps/api/openapi_spec.py @@ -0,0 +1,182 @@ +"""OpenAPI spec manager: fetch, cache, and parse the Cycode API v4 spec.""" + +import json +import os +import time +from pathlib import Path +from typing import Optional + +from cycode.cli.consts import CYCODE_CONFIGURATION_DIRECTORY +from cycode.cli.user_settings.credentials_manager import CredentialsManager +from cycode.cyclient import config as cyclient_config +from cycode.logger import get_logger + +logger = get_logger('OpenAPI Spec') + +_CACHE_DIR = Path.home() / CYCODE_CONFIGURATION_DIRECTORY +_CACHE_FILE = _CACHE_DIR / 'openapi-spec.json' +_CACHE_TTL_SECONDS = int(os.getenv('CYCODE_SPEC_CACHE_TTL', str(24 * 60 * 60))) # 24h default + +_OPENAPI_SPEC_PATH = '/v4/api-docs/cycode-api-swagger.json' + + +def get_openapi_spec(client_id: Optional[str] = None, client_secret: Optional[str] = None) -> dict: + """Get the OpenAPI spec, using cache if fresh, otherwise fetching from API. + + The spec is only fetched when the user actually invokes `cycode platform ...`. + Fetch uses the HTTP client's default timeout; on a slow connection the first + invocation will block accordingly. Once cached, subsequent invocations within + the TTL are near-instant. + + Args: + client_id: Optional client ID override (from CLI flags). + client_secret: Optional client secret override (from CLI flags). + + Returns: + Parsed OpenAPI specification dictionary. + + Raises: + OpenAPISpecError: If spec cannot be loaded from cache or API. + """ + cached = _load_cached_spec() + if cached is not None: + return cached + + return _fetch_and_cache_spec(client_id, client_secret) + + +def _load_cached_spec() -> Optional[dict]: + """Load spec from local cache if it exists and is fresh.""" + if not _CACHE_FILE.exists(): + return None + + try: + mtime = _CACHE_FILE.stat().st_mtime + if time.time() - mtime > _CACHE_TTL_SECONDS: + logger.debug('Cached OpenAPI spec is stale (age > %ds)', _CACHE_TTL_SECONDS) + return None + + spec = json.loads(_CACHE_FILE.read_text(encoding='utf-8')) + logger.debug('Using cached OpenAPI spec from %s', _CACHE_FILE) + return spec + except Exception as e: + logger.warning('Failed to load cached OpenAPI spec: %s', e) + return None + + +def resolve_credentials(client_id: Optional[str] = None, client_secret: Optional[str] = None) -> tuple[str, str]: + """Resolve credentials from args or the CLI's standard credential chain.""" + if not client_id or not client_secret: + credentials_manager = CredentialsManager() + cred_id, cred_secret = credentials_manager.get_credentials() + client_id = client_id or cred_id + client_secret = client_secret or cred_secret + + if not client_id or not client_secret: + raise OpenAPISpecError( + 'Cycode credentials not found. Run `cycode auth` first, ' + 'or set CYCODE_CLIENT_ID and CYCODE_CLIENT_SECRET environment variables.' + ) + + return client_id, client_secret + + +def _fetch_and_cache_spec(client_id: Optional[str] = None, client_secret: Optional[str] = None) -> dict: + """Fetch OpenAPI spec from API and cache to disk. + + Uses CycodeTokenBasedClient for auth and retries. The spec is served from the app URL, + so we create a client with app_url as base instead of the default api_url. + """ + from cycode.cyclient.cycode_token_based_client import CycodeTokenBasedClient + + cid, csecret = resolve_credentials(client_id, client_secret) + + # The spec is served from app.cycode.com, but token refresh POSTs to api.cycode.com. + # Ensure the token is fresh BEFORE overriding the base URL so that refresh + # targets the correct host. + client = CycodeTokenBasedClient(cid, csecret) + client.get_access_token() + client.api_url = cyclient_config.cycode_app_url + + spec_path = _OPENAPI_SPEC_PATH.lstrip('/') + logger.info('Fetching OpenAPI spec from %s/%s', cyclient_config.cycode_app_url, spec_path) + + try: + response = client.get(spec_path) + spec = response.json() + except Exception as e: + raise OpenAPISpecError( + f'Failed to fetch OpenAPI spec. Check your authentication and network connectivity. Error: {e}' + ) from e + + if not isinstance(spec, dict) or 'paths' not in spec: + raise OpenAPISpecError('Response does not look like a valid OpenAPI spec (missing "paths" key).') + + # Override server URL with API URL (supports on-premise installations) + spec['servers'] = [{'url': cyclient_config.cycode_api_url}] + + # Cache to disk + _cache_spec(spec) + + return spec + + +def _cache_spec(spec: dict) -> None: + """Write spec to local cache file atomically (write to temp file, then rename).""" + try: + _CACHE_DIR.mkdir(parents=True, exist_ok=True) + tmp_file = _CACHE_FILE.with_suffix('.json.tmp') + tmp_file.write_text(json.dumps(spec), encoding='utf-8') + tmp_file.replace(_CACHE_FILE) # atomic on POSIX and Windows + logger.debug('Cached OpenAPI spec to %s', _CACHE_FILE) + except Exception as e: + logger.warning('Failed to cache OpenAPI spec: %s', e) + + +def parse_spec_commands(spec: dict) -> dict[str, list[dict]]: + """Parse OpenAPI spec into resource groups with their endpoints. + + Groups endpoints by their first tag, returning a dict of: + {tag_name: [endpoint_info, ...]} + + Each endpoint_info contains: + - path: API path (e.g., '/v4/projects/{projectId}') + - method: HTTP method (e.g., 'get') + - summary: Human-readable summary + - description: Detailed description + - operation_id: Unique operation ID + - path_params: List of path parameter definitions + - query_params: List of query parameter definitions + """ + groups: dict[str, list[dict]] = {} + + for path, methods in spec.get('paths', {}).items(): + for method, details in methods.items(): + tags = details.get('tags', ['other']) + tag = tags[0] if tags else 'other' + + # Separate path and query parameters + parameters = details.get('parameters', []) + path_params = [p for p in parameters if p.get('in') == 'path'] + query_params = [p for p in parameters if p.get('in') == 'query'] + + endpoint_info = { + 'path': path, + 'method': method, + 'summary': details.get('summary', ''), + 'description': details.get('description', ''), + 'operation_id': details.get('operationId', ''), + 'path_params': path_params, + 'query_params': query_params, + 'deprecated': details.get('deprecated', False), + } + + if tag not in groups: + groups[tag] = [] + groups[tag].append(endpoint_info) + + return groups + + +class OpenAPISpecError(Exception): + """Raised when the OpenAPI spec cannot be loaded.""" diff --git a/poetry.lock b/poetry.lock index 3b793f4a..582ee5a5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "altgraph" diff --git a/tests/cli/apps/api/__init__.py b/tests/cli/apps/api/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/cli/apps/api/test_api_command.py b/tests/cli/apps/api/test_api_command.py new file mode 100644 index 00000000..0e981f33 --- /dev/null +++ b/tests/cli/apps/api/test_api_command.py @@ -0,0 +1,110 @@ +"""Tests for the OpenAPI-to-Click translator.""" + +from cycode.cli.apps.api.api_command import ( + _find_common_prefix, + _normalize_tag, + _param_to_option_name, + _path_to_command_name, +) + +# --- _normalize_tag --- + + +def test_normalize_tag_simple() -> None: + assert _normalize_tag('Projects') == 'projects' + + +def test_normalize_tag_multi_word() -> None: + assert _normalize_tag('Scan Statistics') == 'scan-statistics' + + +def test_normalize_tag_with_special_chars() -> None: + assert _normalize_tag('CLI scan statistics') == 'cli-scan-statistics' + + +def test_normalize_tag_strips_leading_trailing_separators() -> None: + assert _normalize_tag(' Projects ') == 'projects' + + +# --- _param_to_option_name --- + + +def test_param_to_option_name_snake_case() -> None: + assert _param_to_option_name('page_size') == '--page-size' + + +def test_param_to_option_name_camel_case() -> None: + assert _param_to_option_name('pageSize') == '--page-size' + + +def test_param_to_option_name_with_dot() -> None: + assert _param_to_option_name('filter.status') == '--filter-status' + + +def test_param_to_option_name_already_kebab() -> None: + assert _param_to_option_name('page-size') == '--page-size' + + +# --- _find_common_prefix --- + + +def test_find_common_prefix_empty() -> None: + assert _find_common_prefix([]) == '' + + +def test_find_common_prefix_single_path() -> None: + # Single path: use parent directory as prefix + assert _find_common_prefix(['/v4/projects']) == '/v4' + + +def test_find_common_prefix_two_paths_with_common_parent() -> None: + assert _find_common_prefix(['/v4/projects', '/v4/projects/assets']) == '/v4/projects' + + +def test_find_common_prefix_two_paths_with_grandparent() -> None: + assert _find_common_prefix(['/v4/projects', '/v4/members']) == '/v4' + + +def test_find_common_prefix_identical_paths() -> None: + assert _find_common_prefix(['/v4/projects', '/v4/projects']) == '/v4/projects' + + +# --- _path_to_command_name --- + + +def test_path_to_command_name_collection() -> None: + # /v4/projects with prefix /v4/projects -> nothing left -> 'list' + assert _path_to_command_name('/v4/projects', '/v4/projects', has_path_params=False) == 'list' + + +def test_path_to_command_name_single_resource() -> None: + # /v4/projects/{id} with prefix /v4/projects -> only path param left -> 'view' + assert _path_to_command_name('/v4/projects/{projectId}', '/v4/projects', has_path_params=True) == 'view' + + +def test_path_to_command_name_sub_resource() -> None: + # /v4/projects/assets with prefix /v4/projects -> 'assets' + assert _path_to_command_name('/v4/projects/assets', '/v4/projects', has_path_params=False) == 'assets' + + +def test_path_to_command_name_sub_resource_count() -> None: + # /v4/violations/count with prefix /v4/violations -> 'count' + assert _path_to_command_name('/v4/violations/count', '/v4/violations', has_path_params=False) == 'count' + + +def test_path_to_command_name_multi_segment() -> None: + # /v4/projects/collisions/count with prefix /v4/projects -> 'collisions-count' + assert ( + _path_to_command_name('/v4/projects/collisions/count', '/v4/projects', has_path_params=False) + == 'collisions-count' + ) + + +def test_path_to_command_name_with_path_param_in_middle() -> None: + # /v4/workflows/{id}/jobs with prefix /v4/workflows -> 'jobs' (path param stripped) + assert _path_to_command_name('/v4/workflows/{workflowId}/jobs', '/v4/workflows', has_path_params=True) == 'jobs' + + +def test_path_to_command_name_kebab_case_normalization() -> None: + # Path with underscores or special chars -> kebab-case + assert _path_to_command_name('/v4/brokers/broker_metrics', '/v4/brokers', has_path_params=False) == 'broker-metrics' diff --git a/tests/cli/apps/api/test_openapi_spec.py b/tests/cli/apps/api/test_openapi_spec.py new file mode 100644 index 00000000..1b863d89 --- /dev/null +++ b/tests/cli/apps/api/test_openapi_spec.py @@ -0,0 +1,72 @@ +"""Tests for the OpenAPI spec parser.""" + +from cycode.cli.apps.api.openapi_spec import parse_spec_commands + + +def test_parse_spec_commands_groups_by_tag() -> None: + spec = { + 'paths': { + '/v4/projects': { + 'get': {'tags': ['Projects'], 'summary': 'Get projects'}, + }, + '/v4/violations': { + 'get': {'tags': ['Violations'], 'summary': 'Get violations'}, + }, + } + } + groups = parse_spec_commands(spec) + assert set(groups.keys()) == {'Projects', 'Violations'} + + +def test_parse_spec_commands_extracts_path_params() -> None: + spec = { + 'paths': { + '/v4/projects/{projectId}': { + 'get': { + 'tags': ['Projects'], + 'parameters': [ + {'name': 'projectId', 'in': 'path', 'required': True}, + {'name': 'page_size', 'in': 'query', 'required': False}, + ], + }, + }, + } + } + groups = parse_spec_commands(spec) + ep = groups['Projects'][0] + assert len(ep['path_params']) == 1 + assert ep['path_params'][0]['name'] == 'projectId' + assert len(ep['query_params']) == 1 + assert ep['query_params'][0]['name'] == 'page_size' + + +def test_parse_spec_commands_captures_deprecated_flag() -> None: + spec = { + 'paths': { + '/v4/old': { + 'get': {'tags': ['T'], 'summary': 'old', 'deprecated': True}, + }, + '/v4/new': { + 'get': {'tags': ['T'], 'summary': 'new'}, + }, + } + } + groups = parse_spec_commands(spec) + by_path = {ep['path']: ep for ep in groups['T']} + assert by_path['/v4/old']['deprecated'] is True + assert by_path['/v4/new']['deprecated'] is False + + +def test_parse_spec_commands_no_tags_uses_other() -> None: + spec = { + 'paths': { + '/v4/foo': {'get': {}}, + } + } + groups = parse_spec_commands(spec) + assert 'other' in groups + + +def test_parse_spec_commands_empty_spec() -> None: + assert parse_spec_commands({}) == {} + assert parse_spec_commands({'paths': {}}) == {}