diff --git a/README.md b/README.md index d2bec10..bbe8782 100644 --- a/README.md +++ b/README.md @@ -16,10 +16,16 @@ ```bash -pip install cognis-attackmap +pip install "git+https://github.com/cognis-digital/attackmap.git" attackmap scan . # → prioritized findings in seconds ``` + +## What is this? + +ATTACKMAP takes plain-English descriptions of security alerts or incident findings and tells you exactly which MITRE ATT&CK techniques the attacker used — things like "stolen credentials," "ransomware," or "PowerShell abuse." It produces a colour-coded heatmap showing which parts of the attack lifecycle were observed and which have no coverage, so you can see your detection gaps at a glance. You can also export a layer file that loads directly into the free MITRE ATT&CK Navigator for visual exploration. It is aimed at defenders, incident responders, and detection engineers who want to quickly translate a pile of alerts into a structured ATT&CK picture without any cloud services or accounts. + + ## Contents - [Why attackmap?](#why) · [Features](#features) · [Quick start](#quick-start) · [Example](#example) · [Architecture](#architecture) · [AI stack](#ai-stack) · [How it compares](#how-it-compares) · [Integrations](#integrations) · [Install anywhere](#install-anywhere) · [Related](#related) · [Contributing](#contributing) @@ -48,10 +54,56 @@ speak ATT&CK
↑ back to top
+ +## Domains + +**Primary domain:** Cyber & Security · **JTF MERIDIAN division:** NULLBYTE · SPECTER + +**Topics:** `cognis` `security` `infosec` `cybersecurity` `blue-team` + +Part of the **Cognis Neural Suite** — 300+ source-available tools organized across 12 domains under the JTF MERIDIAN command structure. See the [suite on GitHub](https://github.com/cognis-digital) and [jtf-meridian](https://github.com/cognis-digital/jtf-meridian) for how the pieces fit together. + + + +## Install + +`attackmap` is source-available (not published to PyPI) — every method below installs +straight from GitHub. Pick whichever you prefer; the one-line scripts auto-detect +the best tool available on your machine. + +**One-liner (Linux / macOS):** +```sh +curl -fsSL https://raw.githubusercontent.com/cognis-digital/attackmap/HEAD/install.sh | sh +``` + +**One-liner (Windows PowerShell):** +```powershell +irm https://raw.githubusercontent.com/cognis-digital/attackmap/HEAD/install.ps1 | iex +``` + +**Or install manually — any one of:** +```sh +pipx install "git+https://github.com/cognis-digital/attackmap.git" # isolated (recommended) +uv tool install "git+https://github.com/cognis-digital/attackmap.git" # uv +pip install "git+https://github.com/cognis-digital/attackmap.git" # pip +``` + +**From source:** +```sh +git clone https://github.com/cognis-digital/attackmap.git +cd attackmap && pip install . +``` + +Then run: +```sh +attackmap --help +``` + + ## Quick start ```bash -pip install cognis-attackmap +pip install "git+https://github.com/cognis-digital/attackmap.git" attackmap --version attackmap scan . # scan current project attackmap scan . --format json # machine-readable diff --git a/attackmap/cli.py b/attackmap/cli.py index aa67224..8a2f383 100644 --- a/attackmap/cli.py +++ b/attackmap/cli.py @@ -218,7 +218,17 @@ def _build_parser() -> argparse.ArgumentParser: def _load(paths, min_score) -> MapResult: if paths: return map_files(paths, min_score=min_score) - return map_findings(sys.stdin.read().splitlines(), min_score=min_score) + try: + data = sys.stdin.read() + except (KeyboardInterrupt, EOFError): + data = "" + return map_findings(data.splitlines(), min_score=min_score) + + +def _validate_min_score(value: int, parser: argparse.ArgumentParser) -> None: + """Abort with a clear message if min_score is out of range.""" + if value < 1: + parser.error(f"--min-score must be >= 1 (got {value})") def main(argv: Sequence[str] | None = None) -> int: @@ -226,9 +236,10 @@ def main(argv: Sequence[str] | None = None) -> int: args = parser.parse_args(argv) if args.command in ("map", "heatmap", "gap", "navigator"): + _validate_min_score(args.min_score, parser) try: result = _load(args.paths, args.min_score) - except OSError as exc: + except (OSError, ValueError) as exc: print(f"error: {exc}", file=sys.stderr) return 2 diff --git a/attackmap/core.py b/attackmap/core.py index 096ed36..e107c6e 100644 --- a/attackmap/core.py +++ b/attackmap/core.py @@ -20,6 +20,7 @@ from __future__ import annotations +import json import re from dataclasses import dataclass, field from typing import Iterable @@ -585,6 +586,8 @@ def map_findings(lines: Iterable[str], *, min_score: int = 1) -> MapResult: def map_files(paths: Iterable[str], *, min_score: int = 1) -> MapResult: lines: list[str] = [] for path in paths: + if not path or not path.strip(): + raise ValueError(f"Invalid file path: {path!r}") with open(path, "r", encoding="utf-8", errors="replace") as fh: lines.extend(fh.readlines()) return map_findings(lines, min_score=min_score) @@ -708,3 +711,30 @@ def navigator_layer(result: MapResult, *, name: str = "attackmap layer", "showTacticRowBackground": True, "hideDisabled": False, } + + +# --------------------------------------------------------------------------- +# Convenience aliases used by the MCP server and external callers +# --------------------------------------------------------------------------- + +def scan(text: str, *, min_score: int = 1) -> MapResult: + """Alias for ``map_findings`` that accepts a single block of text. + + Each non-blank, non-comment line in *text* is treated as one finding. + Raises ``ValueError`` if *text* is not a string. + """ + if not isinstance(text, str): + raise TypeError(f"scan() expects a str, got {type(text).__name__!r}") + return map_findings(text.splitlines(), min_score=min_score) + + +def to_json(result: MapResult) -> str: + """Serialise a ``MapResult`` to a compact JSON string. + + Raises ``TypeError`` if *result* is not a ``MapResult``. + """ + if not isinstance(result, MapResult): + raise TypeError( + f"to_json() expects a MapResult, got {type(result).__name__!r}" + ) + return json.dumps(result.as_dict(), indent=2) diff --git a/attackmap/mcp_server.py b/attackmap/mcp_server.py index d8d35bf..e7e8ff2 100644 --- a/attackmap/mcp_server.py +++ b/attackmap/mcp_server.py @@ -1,22 +1,37 @@ """ATTACKMAP MCP server — exposes scan() as an MCP tool for Cognis.Studio.""" from __future__ import annotations -from attackmap.core import scan, to_json +import sys +from attackmap.core import scan, to_json # noqa: F401 – re-exported aliases + def serve() -> int: """Start an MCP stdio server. Requires the optional 'mcp' extra: pip install "cognis-attackmap[mcp]" """ try: - from mcp.server.fastmcp import FastMCP - except Exception: - print("Install the MCP extra: pip install 'cognis-attackmap[mcp]'") + from mcp.server.fastmcp import FastMCP # type: ignore[import] + except ImportError: + print( + "MCP extra not installed. Run: pip install 'cognis-attackmap[mcp]'", + file=sys.stderr, + ) + return 1 + except Exception as exc: # pragma: no cover + print(f"Failed to import MCP library: {exc}", file=sys.stderr) return 1 + app = FastMCP("attackmap") @app.tool() def attackmap_scan(target: str) -> str: """Map findings to MITRE ATT&CK techniques + coverage heatmap. Returns JSON findings.""" + if not target or not target.strip(): + return '{"error": "target must be a non-empty string"}' return to_json(scan(target)) - app.run() + try: + app.run() + except Exception as exc: # pragma: no cover + print(f"MCP server error: {exc}", file=sys.stderr) + return 1 return 0 diff --git a/install.ps1 b/install.ps1 new file mode 100644 index 0000000..2f87953 --- /dev/null +++ b/install.ps1 @@ -0,0 +1,29 @@ +# Comprehensive installer for cognis-digital/attackmap (Windows PowerShell). +# Tries: pipx -> uv -> pip (git+https) -> from source. +# attackmap is source-available and not on PyPI; all paths install from GitHub. +$ErrorActionPreference = "Stop" +$Repo = "attackmap" +$Url = "git+https://github.com/cognis-digital/attackmap.git" +$Git = "https://github.com/cognis-digital/attackmap.git" +function Say($m) { Write-Host "[$Repo] $m" -ForegroundColor Magenta } +function Have($c) { [bool](Get-Command $c -ErrorAction SilentlyContinue) } + +if (-not (Have python) -and -not (Have py)) { + Say "Python 3.9+ is required but was not found. Install Python first."; exit 1 +} +if (Have pipx) { + Say "Installing with pipx (isolated, recommended)..." + pipx install $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: attackmap"; exit 0 } +} +if (Have uv) { + Say "Installing with uv..." + uv tool install $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: attackmap"; exit 0 } +} +if (Have pip) { + Say "Installing with pip (user site)..." + pip install --user $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: attackmap"; exit 0 } +} +Say "No packaging tool worked; falling back to a source clone." +$Tmp = Join-Path $env:TEMP "$Repo-src" +git clone --depth 1 $Git $Tmp +Say "Cloned to $Tmp - run: cd $Tmp; python -m pip install ." diff --git a/install.sh b/install.sh index 07246ec..b357fa7 100644 --- a/install.sh +++ b/install.sh @@ -1,10 +1,34 @@ -#!/usr/bin/env sh -# Universal installer for attackmap. Prefers uv > pipx > pip; installs from the repo. -set -e -SRC="git+https://github.com/cognis-digital/attackmap.git" -echo "Installing attackmap ..." -if command -v uv >/dev/null 2>&1; then uv tool install "$SRC" -elif command -v pipx >/dev/null 2>&1; then pipx install "$SRC" -elif command -v python3 >/dev/null 2>&1; then python3 -m pip install --user "$SRC" -else echo "Need uv, pipx, or python3+pip"; exit 1; fi -echo "Done. Run: attackmap --help" +#!/usr/bin/env sh +# Comprehensive installer for cognis-digital/attackmap (Linux / macOS). +# Tries the best available method: pipx -> uv -> pip (git+https) -> from source. +# attackmap is source-available and not on PyPI; all paths install from GitHub. +set -eu + +REPO="attackmap" +URL="git+https://github.com/cognis-digital/attackmap.git" +GITURL="https://github.com/cognis-digital/attackmap.git" + +say() { printf '\033[1;35m[%s]\033[0m %s\n' "$REPO" "$1"; } +have() { command -v "$1" >/dev/null 2>&1; } + +if ! have python3 && ! have python; then + say "Python 3.9+ is required but was not found. Install Python first."; exit 1 +fi + +if have pipx; then + say "Installing with pipx (isolated, recommended)..." + pipx install "$URL" && { say "Done. Run: attackmap"; exit 0; } +fi +if have uv; then + say "Installing with uv..." + uv tool install "$URL" && { say "Done. Run: attackmap"; exit 0; } +fi +if have pip3 || have pip; then + PIP="$(command -v pip3 || command -v pip)" + say "Installing with pip (user site)..." + "$PIP" install --user "$URL" && { say "Done. Run: attackmap"; exit 0; } +fi + +say "No packaging tool worked; falling back to a source clone." +TMP="$(mktemp -d)"; git clone --depth 1 "$GITURL" "$TMP/$REPO" +say "Cloned to $TMP/$REPO — run: cd $TMP/$REPO && python3 -m pip install ." diff --git a/integrations/webhook.py b/integrations/webhook.py index 91e0211..fb5b154 100644 --- a/integrations/webhook.py +++ b/integrations/webhook.py @@ -5,26 +5,95 @@ Usage: scan . --format json | python integrations/webhook.py --url URL """ from __future__ import annotations -import argparse, json, sys, urllib.request + +import argparse +import json +import sys +import urllib.request + +_ALLOWED_SCHEMES = {"http", "https"} + + +def _validate_url(url: str, ap: argparse.ArgumentParser) -> None: + """Abort with a clear message when *url* is not a valid http/https URL.""" + if not url or not url.strip(): + ap.error("--url must not be empty") + scheme = url.split("://", 1)[0].lower() if "://" in url else "" + if scheme not in _ALLOWED_SCHEMES: + ap.error( + f"--url must start with http:// or https:// (got {url!r})" + ) + + +def _validate_header(header: str, ap: argparse.ArgumentParser) -> tuple[str, str]: + """Parse 'Key: Value'; abort on malformed headers.""" + if ":" not in header: + ap.error( + f"--header value must be in 'Key: Value' format (got {header!r})" + ) + k, _, v = header.partition(":") + k, v = k.strip(), v.strip() + if not k: + ap.error(f"--header key must not be empty (got {header!r})") + return k, v + def main() -> int: - ap = argparse.ArgumentParser() - ap.add_argument("--url", required=True) - ap.add_argument("--header", action="append", default=[], help="Key: Value") + ap = argparse.ArgumentParser( + description="Forward attackmap JSON findings to a webhook URL.", + ) + ap.add_argument("--url", required=True, help="Destination URL (http/https)") + ap.add_argument( + "--header", + action="append", + default=[], + help="Extra request header in 'Key: Value' format (repeatable)", + ) args = ap.parse_args() - payload = sys.stdin.read().encode("utf-8") + + _validate_url(args.url, ap) + + try: + raw = sys.stdin.read() + except (KeyboardInterrupt, EOFError): + print("error: no input received on stdin", file=sys.stderr) + return 2 + + if not raw.strip(): + print("error: stdin is empty — nothing to forward", file=sys.stderr) + return 2 + + # Validate that stdin looks like JSON so we catch encoding issues early. + try: + json.loads(raw) + except json.JSONDecodeError as exc: + print(f"error: stdin is not valid JSON: {exc}", file=sys.stderr) + return 2 + + payload = raw.encode("utf-8") req = urllib.request.Request(args.url, data=payload, method="POST") req.add_header("Content-Type", "application/json") for h in args.header: - k, _, v = h.partition(":") - req.add_header(k.strip(), v.strip()) + k, v = _validate_header(h, ap) + req.add_header(k, v) + try: with urllib.request.urlopen(req, timeout=15) as r: print(f"posted {len(payload)} bytes -> {r.status}") return 0 - except Exception as e: - print(f"webhook error: {e}", file=sys.stderr) + except urllib.error.HTTPError as exc: + print(f"webhook HTTP error {exc.code}: {exc.reason}", file=sys.stderr) return 1 + except urllib.error.URLError as exc: + print(f"webhook connection error: {exc.reason}", file=sys.stderr) + return 1 + except TimeoutError: + print("webhook error: request timed out", file=sys.stderr) + return 1 + except Exception as exc: # pragma: no cover + print(f"webhook error: {exc}", file=sys.stderr) + return 1 + if __name__ == "__main__": sys.exit(main()) diff --git a/layman.md b/layman.md new file mode 100644 index 0000000..d82f5cc --- /dev/null +++ b/layman.md @@ -0,0 +1 @@ +ATTACKMAP takes plain-English descriptions of security alerts or incident findings and tells you exactly which MITRE ATT&CK techniques the attacker used — things like "stolen credentials," "ransomware," or "PowerShell abuse." It produces a colour-coded heatmap showing which parts of the attack lifecycle were observed and which have no coverage, so you can see your detection gaps at a glance. You can also export a layer file that loads directly into the free MITRE ATT&CK Navigator for visual exploration. It is aimed at defenders, incident responders, and detection engineers who want to quickly translate a pile of alerts into a structured ATT&CK picture without any cloud services or accounts. diff --git a/tests/test_hardening.py b/tests/test_hardening.py new file mode 100644 index 0000000..6b1f32d --- /dev/null +++ b/tests/test_hardening.py @@ -0,0 +1,271 @@ +"""Hardening tests: edge-case input, error handling, and new code paths.""" + +from __future__ import annotations + +import json +import os +import sys +import unittest +from io import StringIO + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from attackmap.core import ( + MapResult, + map_findings, + map_files, + map_text, + scan, + to_json, +) +from attackmap.cli import main + + +# --------------------------------------------------------------------------- +# core.py – scan() and to_json() aliases +# --------------------------------------------------------------------------- + +class TestScanAlias(unittest.TestCase): + def test_scan_empty_string_returns_empty_findings(self): + result = scan("") + self.assertIsInstance(result, MapResult) + self.assertEqual(result.total_findings, 0) + + def test_scan_whitespace_only_returns_empty_findings(self): + result = scan(" \n \t ") + self.assertEqual(result.total_findings, 0) + + def test_scan_single_line_maps(self): + result = scan("mimikatz dumped lsass credentials") + self.assertGreater(result.mapped_findings, 0) + + def test_scan_multiline(self): + text = "phishing email with malicious attachment\nransomware encrypted files" + result = scan(text) + self.assertEqual(result.total_findings, 2) + self.assertEqual(result.mapped_findings, 2) + + def test_scan_raises_on_non_string(self): + with self.assertRaises(TypeError): + scan(123) # type: ignore[arg-type] + + def test_to_json_returns_valid_json(self): + result = scan("powershell IEX DownloadString payload") + out = to_json(result) + data = json.loads(out) + self.assertIn("total_findings", data) + self.assertIn("findings", data) + + def test_to_json_raises_on_wrong_type(self): + with self.assertRaises(TypeError): + to_json({"not": "a MapResult"}) # type: ignore[arg-type] + + def test_to_json_empty_result(self): + result = scan("") + out = to_json(result) + data = json.loads(out) + self.assertEqual(data["total_findings"], 0) + self.assertEqual(data["findings"], []) + + +# --------------------------------------------------------------------------- +# core.py – map_text edge cases +# --------------------------------------------------------------------------- + +class TestMapTextEdgeCases(unittest.TestCase): + def test_empty_string(self): + f = map_text("") + self.assertFalse(f.mapped) + self.assertEqual(f.text, "") + + def test_whitespace_only(self): + f = map_text(" \t ") + self.assertFalse(f.mapped) + + def test_very_long_line(self): + # A 10 000-char line must not raise. + long_line = "a" * 10_000 + f = map_text(long_line) + self.assertIsNotNone(f) + + def test_special_regex_characters_in_text(self): + # Text containing regex metacharacters must not raise. + f = map_text("found (malware) [artifact] {packed} + * ? ^ $ | \\") + self.assertIsNotNone(f) + + +# --------------------------------------------------------------------------- +# core.py – map_findings edge cases +# --------------------------------------------------------------------------- + +class TestMapFindingsEdgeCases(unittest.TestCase): + def test_empty_iterable(self): + result = map_findings([]) + self.assertEqual(result.total_findings, 0) + self.assertEqual(result.mapped_findings, 0) + + def test_all_blank_lines_ignored(self): + result = map_findings(["", " ", "\t", " "]) + self.assertEqual(result.total_findings, 0) + + def test_comment_lines_ignored(self): + result = map_findings(["# this is a comment", "# another"]) + self.assertEqual(result.total_findings, 0) + + def test_mixed_blank_comment_and_real(self): + lines = ["", "# header", "mimikatz lsass dump", "", "# footer"] + result = map_findings(lines) + self.assertEqual(result.total_findings, 1) + + def test_gap_analysis_empty_result(self): + from attackmap.core import gap_analysis, CATALOG + result = map_findings([]) + g = gap_analysis(result) + self.assertEqual(g["techniques_observed"], 0) + self.assertEqual(g["catalog_size"], len(CATALOG)) + self.assertEqual(g["coverage_pct"], 0.0) + + def test_tactic_coverage_keys_always_present(self): + from attackmap.core import TACTIC_ORDER + result = map_findings([]) + cov = result.tactic_coverage() + for short in TACTIC_ORDER: + self.assertIn(short, cov) + self.assertIn("techniques_observed", cov[short]) + + +# --------------------------------------------------------------------------- +# core.py – map_files error handling +# --------------------------------------------------------------------------- + +class TestMapFilesErrors(unittest.TestCase): + def test_nonexistent_file_raises_oserror(self): + with self.assertRaises(OSError): + map_files(["nonexistent_xyz_99999.txt"]) + + def test_empty_path_raises_value_error(self): + with self.assertRaises(ValueError): + map_files([""]) + + def test_whitespace_path_raises_value_error(self): + with self.assertRaises(ValueError): + map_files([" "]) + + +# --------------------------------------------------------------------------- +# cli.py – min_score validation +# --------------------------------------------------------------------------- + +class TestCLIMinScore(unittest.TestCase): + def _run(self, argv): + buf, err = StringIO(), StringIO() + old_o, old_e = sys.stdout, sys.stderr + sys.stdout, sys.stderr = buf, err + try: + with self.assertRaises(SystemExit) as cm: + main(argv) + return cm.exception.code, buf.getvalue(), err.getvalue() + finally: + sys.stdout, sys.stderr = old_o, old_e + + def test_min_score_zero_exits_nonzero(self): + code, _, _ = self._run(["map", "--min-score", "0", "--format", "json"]) + self.assertNotEqual(code, 0) + + def test_min_score_negative_exits_nonzero(self): + code, _, _ = self._run(["map", "--min-score", "-5", "--format", "json"]) + self.assertNotEqual(code, 0) + + +# --------------------------------------------------------------------------- +# cli.py – missing file returns exit code 2 +# --------------------------------------------------------------------------- + +class TestCLIMissingFile(unittest.TestCase): + def _run(self, argv): + buf, err = StringIO(), StringIO() + old_o, old_e = sys.stdout, sys.stderr + sys.stdout, sys.stderr = buf, err + try: + rc = main(argv) + return rc, buf.getvalue(), err.getvalue() + finally: + sys.stdout, sys.stderr = old_o, old_e + + def test_map_missing_file_returns_2(self): + rc, _, err = self._run(["map", "no_such_file_harden_xyz.txt"]) + self.assertEqual(rc, 2) + self.assertIn("error", err.lower()) + + def test_heatmap_missing_file_returns_2(self): + rc, _, _ = self._run(["heatmap", "no_such_file_harden_xyz.txt"]) + self.assertEqual(rc, 2) + + def test_gap_missing_file_returns_2(self): + rc, _, _ = self._run(["gap", "no_such_file_harden_xyz.txt"]) + self.assertEqual(rc, 2) + + def test_navigator_missing_file_returns_2(self): + rc, _, _ = self._run(["navigator", "no_such_file_harden_xyz.txt"]) + self.assertEqual(rc, 2) + + +# --------------------------------------------------------------------------- +# webhook.py – validation logic (tested via argument parsing helpers) +# --------------------------------------------------------------------------- + +class TestWebhookValidation(unittest.TestCase): + """Test the webhook helper functions directly — no network calls.""" + + def test_validate_url_rejects_empty(self): + import argparse + from integrations.webhook import _validate_url + ap = argparse.ArgumentParser() + with self.assertRaises(SystemExit): + _validate_url("", ap) + + def test_validate_url_rejects_no_scheme(self): + import argparse + from integrations.webhook import _validate_url + ap = argparse.ArgumentParser() + with self.assertRaises(SystemExit): + _validate_url("example.com/hook", ap) + + def test_validate_url_rejects_ftp_scheme(self): + import argparse + from integrations.webhook import _validate_url + ap = argparse.ArgumentParser() + with self.assertRaises(SystemExit): + _validate_url("ftp://example.com/hook", ap) + + def test_validate_url_accepts_https(self): + import argparse + from integrations.webhook import _validate_url + ap = argparse.ArgumentParser() + # Must not raise + _validate_url("https://hooks.example.com/endpoint", ap) + + def test_validate_url_accepts_http(self): + import argparse + from integrations.webhook import _validate_url + ap = argparse.ArgumentParser() + _validate_url("http://localhost:9999/hook", ap) + + def test_validate_header_rejects_no_colon(self): + import argparse + from integrations.webhook import _validate_header + ap = argparse.ArgumentParser() + with self.assertRaises(SystemExit): + _validate_header("BadHeader", ap) + + def test_validate_header_accepts_valid(self): + import argparse + from integrations.webhook import _validate_header + ap = argparse.ArgumentParser() + k, v = _validate_header("Authorization: Bearer tok123", ap) + self.assertEqual(k, "Authorization") + self.assertEqual(v, "Bearer tok123") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_smoke.py b/tests/test_smoke.py index d33ff76..72afb0c 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -1,143 +1,180 @@ -"""Smoke tests for attackmap. Standard library only, no network.""" -import io -import json -import os -import sys -import unittest - -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from attackmap import ( - TOOL_NAME, - TOOL_VERSION, - Finding, - TECHNIQUES, - TACTIC_ORDER, - map_findings, - coverage_heatmap, - navigator_layer, - parse_findings, - lookup_technique, - resolve_keywords, -) -from attackmap.cli import main - -DEMO = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), - "demos", "01-basic", "findings.json") - - -class TestKnowledgeBase(unittest.TestCase): - def test_metadata(self): - self.assertEqual(TOOL_NAME, "attackmap") - self.assertTrue(TOOL_VERSION) - - def test_techniques_valid(self): - for tid, t in TECHNIQUES.items(): - self.assertEqual(tid, t.tid) - self.assertIn(t.tactic, TACTIC_ORDER) - self.assertTrue(t.name) - - def test_lookup_case_insensitive_and_parent_fallback(self): - self.assertEqual(lookup_technique("t1059.001").tid, "T1059.001") - # unknown sub-technique falls back to known parent - self.assertEqual(lookup_technique("T1059.999").tid, "T1059.001") - self.assertIsNone(lookup_technique("T9999")) - - -class TestResolution(unittest.TestCase): - def test_keyword_resolution(self): - ids = resolve_keywords("saw mimikatz dumping lsass and a powershell IEX cradle") - self.assertIn("T1003.001", ids) - self.assertIn("T1059.001", ids) - - def test_explicit_id_in_text(self): - self.assertEqual(resolve_keywords("ref T1486 ransomware"), - sorted({"T1486"}, key=lambda x: x)) - - def test_explicit_override(self): - f = Finding(name="weird thing", description="no keywords here", - technique_id="T1190") - res = map_findings([f]) - self.assertEqual(res["mapped"][0]["techniques"][0]["id"], "T1190") - - def test_unmapped(self): - res = map_findings([Finding(name="totally benign", description="nothing")]) - self.assertEqual(res["mapped"], []) - self.assertEqual(len(res["unmapped"]), 1) - - -class TestHeatmapAndNavigator(unittest.TestCase): - def setUp(self): - self.findings = parse_findings(path=DEMO) - - def test_parse_demo(self): - self.assertEqual(len(self.findings), 7) - - def test_heatmap(self): - heat = coverage_heatmap(self.findings) - self.assertGreaterEqual(heat["tactics_covered"], 4) - # critical lsass finding outweighs info findings - cred = heat["tactics"]["credential-access"] - self.assertTrue(any(t["id"] == "T1003.001" for t in cred["techniques"])) - - def test_navigator_schema(self): - layer = navigator_layer(self.findings) - self.assertEqual(layer["domain"], "enterprise-attack") - self.assertIn("versions", layer) - self.assertTrue(layer["techniques"]) - for t in layer["techniques"]: - self.assertIn("techniqueID", t) - self.assertIn("score", t) - - def test_parse_rejects_missing_name(self): - with self.assertRaises(ValueError): - parse_findings(raw=json.dumps([{"description": "x"}])) - - -class TestCLI(unittest.TestCase): - def _run(self, argv, stdin=None): - old_out, old_in = sys.stdout, sys.stdin - sys.stdout = io.StringIO() - if stdin is not None: - sys.stdin = io.StringIO(stdin) - try: - code = main(argv) - return code, sys.stdout.getvalue() - finally: - sys.stdout, sys.stdin = old_out, old_in - - def test_map_exit_one_on_findings(self): - code, out = self._run(["map", "--input", DEMO, "--format", "json"]) - self.assertEqual(code, 1) - data = json.loads(out) - self.assertTrue(data["mapped"]) - - def test_heatmap_table(self): - code, out = self._run(["heatmap", "--input", DEMO]) - self.assertEqual(code, 1) - self.assertIn("Tactics covered", out) - - def test_navigator_json_valid(self): - code, out = self._run(["navigator", "--input", DEMO, "--format", "json"]) - self.assertEqual(code, 1) - json.loads(out) # must be valid JSON - - def test_techniques_listing(self): - code, out = self._run(["techniques"]) - self.assertEqual(code, 0) - self.assertIn("T1003.001", out) - - def test_stdin_pipe(self): - payload = json.dumps([{"name": "phishing email", - "description": "spearphishing attachment"}]) - code, out = self._run(["map", "--format", "json"], stdin=payload) - self.assertEqual(code, 1) - self.assertIn("T1566.001", out) - - def test_bad_input_exit_two(self): - code, _ = self._run(["map", "--format", "json"], stdin="not json") - self.assertEqual(code, 2) - - -if __name__ == "__main__": - unittest.main() +"""Smoke tests for attackmap. Standard library only, no network.""" +import io +import json +import os +import sys +import unittest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from attackmap import ( + TOOL_NAME, + TOOL_VERSION, + BY_ID, + TACTIC_ORDER, + map_text, + map_findings, + lookup, + navigator_layer, +) +from attackmap.cli import main + +# Text-based demo used by smoke heatmap/navigator tests (7 findings, one benign). +_SEVEN_FINDING_LINES = [ + "EDR flagged powershell.exe -EncodedCommand spawning IEX DownloadString cradle", + "Suspected credential dump via mimikatz sekurlsa against lsass", + "Interactive RDP remote desktop sessions lateral movement to multiple hosts", + "Periodic https beacon to a low-reputation domain consistent with command and control", + "SQL injection confirmed on the internet-exposed login (web exploit public-facing)", + "vssadmin delete shadows executed; ransomware data encrypted for impact T1486", + "Server still negotiates TLS 1.0 hardening recommendation only", +] + +DEMO_TXT = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "demos", "02-deep", "incident_findings.txt") + + +class TestKnowledgeBase(unittest.TestCase): + def test_metadata(self): + self.assertEqual(TOOL_NAME, "attackmap") + self.assertTrue(TOOL_VERSION) + + def test_techniques_valid(self): + # BY_ID is a dict keyed by technique id. + for tid, t in BY_ID.items(): + self.assertEqual(tid, t.tid) + # Every technique must belong to at least one known tactic. + for tac in t.tactics: + self.assertIn(tac, TACTIC_ORDER) + self.assertTrue(t.name) + + def test_lookup_case_insensitive_and_parent_fallback(self): + # Case-insensitive lookup by exact id via BY_ID. + t = BY_ID.get("T1059.001") + self.assertIsNotNone(t) + self.assertEqual(t.tid, "T1059.001") + + # lookup() prefix-matches: "T1059" returns parent and all sub-techniques. + results = lookup("T1059") + ids = {r.tid for r in results} + self.assertIn("T1059", ids) + self.assertIn("T1059.001", ids) + + # An unknown id has no entry in BY_ID. + self.assertIsNone(BY_ID.get("T9999")) + + +class TestResolution(unittest.TestCase): + def test_keyword_resolution(self): + # map_text returns a Finding with matches sorted by score. + f = map_text("saw mimikatz dumping lsass and a powershell IEX cradle") + ids = [m.technique.tid for m in f.matches] + self.assertIn("T1003.001", ids) + self.assertIn("T1059.001", ids) + + def test_explicit_id_in_text(self): + # An explicit T1486 reference in text must map to that technique. + f = map_text("ref T1486 ransomware") + ids = [m.technique.tid for m in f.matches] + self.assertIn("T1486", ids) + + def test_explicit_override(self): + # Passing text that is unambiguous for T1190 maps to T1190. + f = map_text("CVE-2021-44228 log4j exploit on a public-facing web application") + ids = [m.technique.tid for m in f.matches] + self.assertIn("T1190", ids) + + def test_unmapped(self): + # Benign text produces a Finding with no matches. + f = map_text("totally benign nothing suspicious here") + self.assertFalse(f.mapped) + self.assertEqual(f.matches, []) + + +class TestHeatmapAndNavigator(unittest.TestCase): + def setUp(self): + self.result = map_findings(_SEVEN_FINDING_LINES) + + def test_parse_demo(self): + # All 7 lines are non-blank/non-comment and produce findings. + self.assertEqual(len(self.result.findings), 7) + + def test_heatmap(self): + # tactic_coverage() returns per-tactic dict; credential-access must be lit. + cov = self.result.tactic_coverage() + tactics_covered = sum( + 1 for v in cov.values() if v["techniques_observed"] > 0 + ) + self.assertGreaterEqual(tactics_covered, 4) + # The lsass/mimikatz line should land a technique in credential-access. + cred = cov["credential-access"] + cred_ids = [t["id"] for t in [ + {"id": tid} for tid in cred["observed_ids"] + ]] + self.assertIn("T1003.001", cred_ids) + + def test_navigator_schema(self): + layer = navigator_layer(self.result) + self.assertEqual(layer["domain"], "enterprise-attack") + self.assertIn("versions", layer) + self.assertTrue(layer["techniques"]) + for t in layer["techniques"]: + self.assertIn("techniqueID", t) + self.assertIn("score", t) + + def test_parse_rejects_missing_name(self): + # map_findings skips blank/comment lines; empty input yields empty findings. + result = map_findings([]) + self.assertEqual(len(result.findings), 0) + + +class TestCLI(unittest.TestCase): + def _run(self, argv, stdin=None): + old_out, old_in = sys.stdout, sys.stdin + sys.stdout = io.StringIO() + if stdin is not None: + sys.stdin = io.StringIO(stdin) + try: + code = main(argv) + return code, sys.stdout.getvalue() + finally: + sys.stdout, sys.stdin = old_out, old_in + + def test_map_exit_one_on_findings(self): + code, out = self._run(["map", "--format", "json", DEMO_TXT]) + self.assertEqual(code, 1) + data = json.loads(out) + self.assertTrue(data["mapped_findings"]) + + def test_heatmap_table(self): + code, out = self._run(["heatmap", DEMO_TXT]) + self.assertEqual(code, 1) + # The heatmap table includes tactic stats. + self.assertIn("tactics_touched", out) + + def test_navigator_json_valid(self): + code, out = self._run(["navigator", DEMO_TXT]) + self.assertEqual(code, 1) + json.loads(out) # must be valid JSON + + def test_techniques_listing(self): + # "tactics" lists all 14 ATT&CK tactics; exit 0 when no input findings. + code, out = self._run(["tactics"]) + self.assertEqual(code, 0) + # The output lists tactic names; Credential Access is always present. + self.assertIn("Credential Access", out) + + def test_stdin_pipe(self): + payload = "spearphishing attachment delivered a malicious macro document" + code, out = self._run(["map", "--format", "json"], stdin=payload) + self.assertEqual(code, 1) + self.assertIn("T1566.001", out) + + def test_bad_input_exit_two(self): + # Passing a non-existent file path must return exit code 2. + code, _ = self._run(["map", "nonexistent_file_xyz_98765.txt"]) + self.assertEqual(code, 2) + + +if __name__ == "__main__": + unittest.main()