Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/secuscan/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def _should_run(self, now: datetime, last_run_at: str | None, schedule_seconds:
return elapsed >= schedule_seconds
async def _run_workflow(self, workflow_id: str, steps: List[Dict[str, Any]]):
logger.info("Running workflow %s with %d step(s)", workflow_id, len(steps))
db = await get_db()
for step in steps:
plugin_id = step.get("plugin_id")
inputs = step.get("inputs") or {}
Expand Down
11 changes: 10 additions & 1 deletion frontend/testing/unit/pages/ToolConfigDynamic.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { render, screen, waitFor } from '@testing-library/react'
import userEvent from '@testing-library/user-event'
import { MemoryRouter, Route, Routes } from 'react-router-dom'
import ToolConfig from '../../../src/pages/ToolConfig'
import { getPluginSchema, listPlugins, startTask } from '../../../src/api'
import { getPluginSchema, listPlugins, startTask, getSettings, listTargetPolicies, listCredentialProfiles, listSessionProfiles } from '../../../src/api'
import { routes } from '../../../src/routes'

const addToast = vi.fn()
Expand All @@ -15,6 +15,10 @@ vi.mock('../../../src/api', () => ({
listPlugins: vi.fn(),
getPluginSchema: vi.fn(),
startTask: vi.fn(),
getSettings: vi.fn(),
listTargetPolicies: vi.fn(),
listCredentialProfiles: vi.fn(),
listSessionProfiles: vi.fn(),
}))

describe('ToolConfig dynamic schema flow', () => {
Expand Down Expand Up @@ -74,6 +78,10 @@ describe('ToolConfig dynamic schema flow', () => {
created_at: 'now',
stream_url: '/api/v1/task/task-123/stream',
})
vi.mocked(getSettings).mockResolvedValue(null)
vi.mocked(listTargetPolicies).mockResolvedValue({ items: [], total: 0 })
vi.mocked(listCredentialProfiles).mockResolvedValue({ items: [], total: 0 })
vi.mocked(listSessionProfiles).mockResolvedValue({ items: [], total: 0 })
})

it('renders dynamic fields and submits startTask with consent', async () => {
Expand Down Expand Up @@ -108,6 +116,7 @@ describe('ToolConfig dynamic schema flow', () => {
}),
true,
'quick',
expect.any(Object),
)
})
})
Expand Down
8 changes: 7 additions & 1 deletion frontend/testing/unit/pages/ToolConfigTimeout.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { render, screen } from '@testing-library/react'
import userEvent from '@testing-library/user-event'
import { MemoryRouter, Route, Routes } from 'react-router-dom'
import ToolConfig from '../../../src/pages/ToolConfig'
import { getPluginSchema, listPlugins, startTask, getSettings } from '../../../src/api'
import { getPluginSchema, listPlugins, startTask, getSettings, listTargetPolicies, listCredentialProfiles, listSessionProfiles } from '../../../src/api'
import { routes } from '../../../src/routes'

const addToast = vi.fn()
Expand All @@ -16,6 +16,9 @@ vi.mock('../../../src/api', () => ({
getPluginSchema: vi.fn(),
startTask: vi.fn(),
getSettings: vi.fn(),
listTargetPolicies: vi.fn(),
listCredentialProfiles: vi.fn(),
listSessionProfiles: vi.fn(),
}))

describe('ToolConfig timeout control', () => {
Expand Down Expand Up @@ -60,6 +63,9 @@ describe('ToolConfig timeout control', () => {

vi.mocked(getSettings).mockResolvedValue({ sandbox: { default_timeout: 600 } })
vi.mocked(startTask).mockResolvedValue({ task_id: 'task-1', status: 'queued', created_at: 'now', stream_url: '' })
vi.mocked(listTargetPolicies).mockResolvedValue({ items: [], total: 0 })
vi.mocked(listCredentialProfiles).mockResolvedValue({ items: [], total: 0 })
vi.mocked(listSessionProfiles).mockResolvedValue({ items: [], total: 0 })
})

it('renders integer input with constrained min/max', async () => {
Expand Down
2 changes: 1 addition & 1 deletion frontend/testing/unit/pages/Workflows.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ describe('Workflows — create action', () => {
name: 'Nightly Scan',
schedule_seconds: 7200,
enabled: true,
steps: [{ plugin_id: '', inputs: {} }],
steps: [{ plugin_id: '', inputs: {}, execution_context: { scan_profile: 'standard', validation_mode: 'proof', evidence_level: 'standard' } }],
})
})
})
Expand Down
246 changes: 246 additions & 0 deletions testing/backend/test_api_scanner_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
"""
Contract and parser tests for the api_scanner plugin.

These tests load the real plugins/api_scanner/metadata.json, validate it
through the project PluginMetadataValidator, render commands through the
real PluginManager, and call the real parser.py parse() function.

Assertions are tied to the actual plugin contract: if metadata.json,
the command template, or parser.py drift, these tests will fail.

Related to issue #490: Add parser and contract coverage for plugin `api_scanner`
"""

import asyncio
import json
import sys
from pathlib import Path

import pytest

REPO_ROOT = Path(__file__).resolve().parents[2]
sys.path.insert(0, str(REPO_ROOT))

from backend.secuscan.plugin_validator import PluginMetadataValidator
from backend.secuscan.plugins import PluginManager
from plugins.api_scanner.parser import parse

PLUGIN_DIR = REPO_ROOT / "plugins" / "api_scanner"
PLUGINS_DIR = REPO_ROOT / "plugins"


# ---------------------------------------------------------------------------
# Metadata contract tests
# ---------------------------------------------------------------------------


def test_api_scanner_metadata_file_exists():
"""metadata.json must exist at the expected plugin path."""
assert (PLUGIN_DIR / "metadata.json").exists()


def test_api_scanner_metadata_is_valid_json():
"""metadata.json must be valid, parseable JSON."""
raw = (PLUGIN_DIR / "metadata.json").read_text(encoding="utf-8")
data = json.loads(raw)
assert isinstance(data, dict)


def test_api_scanner_passes_validator():
"""
The full PluginMetadataValidator must accept the plugin without errors.

This will fail if any required field is missing, the engine type or safety
level is invalid, the command template references an undeclared field, or
the checksum field is absent or malformed.
"""
result = PluginMetadataValidator(PLUGIN_DIR).validate()
assert result.valid, (
"Plugin validation errors:\n"
+ "\n".join(e.display() for e in result.errors)
)


def test_api_scanner_metadata_id_matches_directory():
"""Plugin id in metadata.json must match the directory name."""
data = json.loads((PLUGIN_DIR / "metadata.json").read_text(encoding="utf-8"))
assert data["id"] == "api_scanner"


def test_api_scanner_engine_is_nuclei():
"""Engine binary must be 'nuclei' -- update this if the underlying tool changes."""
data = json.loads((PLUGIN_DIR / "metadata.json").read_text(encoding="utf-8"))
assert data["engine"]["type"] == "cli"
assert data["engine"]["binary"] == "nuclei"


def test_api_scanner_has_required_target_field():
"""Plugin must declare a required 'target' field for the API base URL."""
data = json.loads((PLUGIN_DIR / "metadata.json").read_text(encoding="utf-8"))
fields = {f["id"]: f for f in data["fields"]}
assert "target" in fields, "Missing required field: target"
assert fields["target"]["required"] is True


def test_api_scanner_target_field_requires_http_url():
"""The 'target' field must have a validation pattern requiring http(s)://."""
data = json.loads((PLUGIN_DIR / "metadata.json").read_text(encoding="utf-8"))
fields = {f["id"]: f for f in data["fields"]}
target_validation = fields["target"].get("validation", {})
pattern = target_validation.get("pattern", "")
assert "https?" in pattern or "http" in pattern, (
"target field must validate for HTTP(S) URL format"
)


def test_api_scanner_output_parser_is_custom():
"""Parser type must be 'custom', backed by parser.py."""
data = json.loads((PLUGIN_DIR / "metadata.json").read_text(encoding="utf-8"))
assert data["output"]["parser"] == "custom"


def test_api_scanner_parser_file_exists():
"""parser.py must exist alongside metadata.json."""
assert (PLUGIN_DIR / "parser.py").exists()


def test_api_scanner_requires_consent():
"""API scanning is intrusive and must require user consent."""
data = json.loads((PLUGIN_DIR / "metadata.json").read_text(encoding="utf-8"))
assert data["safety"]["requires_consent"] is True
assert data["safety"]["consent_message"], "consent_message must not be empty"


# ---------------------------------------------------------------------------
# Command rendering tests via real PluginManager
# ---------------------------------------------------------------------------


def test_api_scanner_command_renders_with_target(setup_test_environment):
"""
PluginManager must produce the correct nuclei command for an API target.

This test will fail if command_template in metadata.json changes or a
placeholder becomes mismatched.
"""
manager = PluginManager(str(PLUGINS_DIR))
asyncio.run(manager.load_plugins())

command = manager.build_command("api_scanner", {"target": "https://api.example.com"})

assert command is not None, "build_command returned None for valid inputs"
assert command[0] == "nuclei"
assert "-u" in command
assert "https://api.example.com" in command
assert "-silent" in command


def test_api_scanner_command_full_token_sequence(setup_test_environment):
"""Full rendered command must exactly match the command_template token sequence."""
manager = PluginManager(str(PLUGINS_DIR))
asyncio.run(manager.load_plugins())

command = manager.build_command("api_scanner", {"target": "https://api.secuscan.in"})

assert command == ["nuclei", "-u", "https://api.secuscan.in", "-silent"], (
f"Command template drift detected. Got: {command}"
)


def test_api_scanner_drops_target_token_when_absent(setup_test_environment):
"""
When the 'target' field is omitted, the renderer drops the unresolved
{target} token rather than emitting an empty value or literal placeholder.
"""
manager = PluginManager(str(PLUGINS_DIR))
asyncio.run(manager.load_plugins())

rendered = manager.build_command("api_scanner", {})

assert rendered is not None
assert not any("{" in token for token in rendered), "Unresolved placeholder leaked"
assert rendered == ["nuclei", "-u", "-silent"]

populated = manager.build_command("api_scanner", {"target": "https://api.example.com"})
assert "https://api.example.com" in populated
assert len(populated) == len(rendered) + 1


def test_api_scanner_loaded_by_plugin_manager(setup_test_environment):
"""PluginManager must successfully load api_scanner from the real plugins directory."""
manager = PluginManager(str(PLUGINS_DIR))
asyncio.run(manager.load_plugins())

plugin = manager.get_plugin("api_scanner")
assert plugin is not None
assert plugin.id == "api_scanner"
assert plugin.name == "API Scanner"


# ---------------------------------------------------------------------------
# Parser contract tests against the real parser.py
# ---------------------------------------------------------------------------

_API_SCAN_TEXT_FIXTURE = (
"https://api.example.com/v1/users [GET] [critical] [exposed]\n"
"https://api.example.com/admin [GET] [injection] [critical]\n"
"https://api.example.com/health [GET] [200 OK]\n"
"https://api.example.com/graphql [POST] [warning] [detected]\n"
)


def test_api_scanner_parser_returns_required_keys():
"""parse() must return a dict with 'findings', 'count', and 'items' keys."""
result = parse(_API_SCAN_TEXT_FIXTURE)
assert isinstance(result, dict)
assert "findings" in result
assert "count" in result
assert "items" in result


def test_api_scanner_parser_count_matches_findings():
"""'count' must equal len(findings)."""
result = parse(_API_SCAN_TEXT_FIXTURE)
assert result["count"] == len(result["findings"])


def test_api_scanner_parser_finding_has_required_keys():
"""Each finding must have title, category, severity, description, remediation, metadata."""
result = parse(_API_SCAN_TEXT_FIXTURE)
assert result["findings"], "Expected at least one finding"
for finding in result["findings"]:
for key in ("title", "category", "severity", "description", "remediation", "metadata"):
assert key in finding, f"Finding missing key: {key}"


def test_api_scanner_parser_critical_keyword_raises_severity():
"""Lines containing 'critical' or 'injection' must be classified as 'high' severity."""
result = parse(_API_SCAN_TEXT_FIXTURE)
high_findings = [f for f in result["findings"] if f["severity"] == "high"]
assert len(high_findings) >= 1, "Expected at least one high-severity finding from critical/injection lines"


def test_api_scanner_parser_low_severity_for_exposed():
"""Lines containing 'exposed' or 'found' but not critical keywords must be 'low' severity."""
result = parse(_API_SCAN_TEXT_FIXTURE)
exposed_lines = [f for f in result["findings"] if "exposed" in f["description"].lower()]
for finding in exposed_lines:
assert finding["severity"] in ("low", "high"), (
f"Unexpected severity '{finding['severity']}' for exposed finding"
)


def test_api_scanner_parser_empty_output():
"""Parser must handle empty input and return empty findings without raising."""
result = parse("")
assert result["findings"] == []
assert result["count"] == 0
assert result["items"] == []


def test_api_scanner_parser_preserves_raw_line_in_metadata():
"""Each finding's metadata.raw must match the original output line."""
single_line = "https://api.example.com/v1/tokens [GET] [exposed]\n"
result = parse(single_line)
assert result["findings"]
assert result["findings"][0]["metadata"]["raw"] == "https://api.example.com/v1/tokens [GET] [exposed]"
Loading