diff --git a/README.md b/README.md
index d2bec10..bbe8782 100644
--- a/README.md
+++ b/README.md
@@ -16,10 +16,16 @@
```bash
-pip install cognis-attackmap
+pip install "git+https://github.com/cognis-digital/attackmap.git"
attackmap scan . # → prioritized findings in seconds
```
+
+## What is this?
+
+ATTACKMAP takes plain-English descriptions of security alerts or incident findings and tells you exactly which MITRE ATT&CK techniques the attacker used — things like "stolen credentials," "ransomware," or "PowerShell abuse." It produces a colour-coded heatmap showing which parts of the attack lifecycle were observed and which have no coverage, so you can see your detection gaps at a glance. You can also export a layer file that loads directly into the free MITRE ATT&CK Navigator for visual exploration. It is aimed at defenders, incident responders, and detection engineers who want to quickly translate a pile of alerts into a structured ATT&CK picture without any cloud services or accounts.
+
+
## Contents
- [Why attackmap?](#why) · [Features](#features) · [Quick start](#quick-start) · [Example](#example) · [Architecture](#architecture) · [AI stack](#ai-stack) · [How it compares](#how-it-compares) · [Integrations](#integrations) · [Install anywhere](#install-anywhere) · [Related](#related) · [Contributing](#contributing)
@@ -48,10 +54,56 @@ speak ATT&CK
+
+## Domains
+
+**Primary domain:** Cyber & Security · **JTF MERIDIAN division:** NULLBYTE · SPECTER
+
+**Topics:** `cognis` `security` `infosec` `cybersecurity` `blue-team`
+
+Part of the **Cognis Neural Suite** — 300+ source-available tools organized across 12 domains under the JTF MERIDIAN command structure. See the [suite on GitHub](https://github.com/cognis-digital) and [jtf-meridian](https://github.com/cognis-digital/jtf-meridian) for how the pieces fit together.
+
+
+
+## Install
+
+`attackmap` is source-available (not published to PyPI) — every method below installs
+straight from GitHub. Pick whichever you prefer; the one-line scripts auto-detect
+the best tool available on your machine.
+
+**One-liner (Linux / macOS):**
+```sh
+curl -fsSL https://raw.githubusercontent.com/cognis-digital/attackmap/HEAD/install.sh | sh
+```
+
+**One-liner (Windows PowerShell):**
+```powershell
+irm https://raw.githubusercontent.com/cognis-digital/attackmap/HEAD/install.ps1 | iex
+```
+
+**Or install manually — any one of:**
+```sh
+pipx install "git+https://github.com/cognis-digital/attackmap.git" # isolated (recommended)
+uv tool install "git+https://github.com/cognis-digital/attackmap.git" # uv
+pip install "git+https://github.com/cognis-digital/attackmap.git" # pip
+```
+
+**From source:**
+```sh
+git clone https://github.com/cognis-digital/attackmap.git
+cd attackmap && pip install .
+```
+
+Then run:
+```sh
+attackmap --help
+```
+
+
## Quick start
```bash
-pip install cognis-attackmap
+pip install "git+https://github.com/cognis-digital/attackmap.git"
attackmap --version
attackmap scan . # scan current project
attackmap scan . --format json # machine-readable
diff --git a/attackmap/cli.py b/attackmap/cli.py
index aa67224..8a2f383 100644
--- a/attackmap/cli.py
+++ b/attackmap/cli.py
@@ -218,7 +218,17 @@ def _build_parser() -> argparse.ArgumentParser:
def _load(paths, min_score) -> MapResult:
if paths:
return map_files(paths, min_score=min_score)
- return map_findings(sys.stdin.read().splitlines(), min_score=min_score)
+ try:
+ data = sys.stdin.read()
+ except (KeyboardInterrupt, EOFError):
+ data = ""
+ return map_findings(data.splitlines(), min_score=min_score)
+
+
+def _validate_min_score(value: int, parser: argparse.ArgumentParser) -> None:
+ """Abort with a clear message if min_score is out of range."""
+ if value < 1:
+ parser.error(f"--min-score must be >= 1 (got {value})")
def main(argv: Sequence[str] | None = None) -> int:
@@ -226,9 +236,10 @@ def main(argv: Sequence[str] | None = None) -> int:
args = parser.parse_args(argv)
if args.command in ("map", "heatmap", "gap", "navigator"):
+ _validate_min_score(args.min_score, parser)
try:
result = _load(args.paths, args.min_score)
- except OSError as exc:
+ except (OSError, ValueError) as exc:
print(f"error: {exc}", file=sys.stderr)
return 2
diff --git a/attackmap/core.py b/attackmap/core.py
index 096ed36..e107c6e 100644
--- a/attackmap/core.py
+++ b/attackmap/core.py
@@ -20,6 +20,7 @@
from __future__ import annotations
+import json
import re
from dataclasses import dataclass, field
from typing import Iterable
@@ -585,6 +586,8 @@ def map_findings(lines: Iterable[str], *, min_score: int = 1) -> MapResult:
def map_files(paths: Iterable[str], *, min_score: int = 1) -> MapResult:
lines: list[str] = []
for path in paths:
+ if not path or not path.strip():
+ raise ValueError(f"Invalid file path: {path!r}")
with open(path, "r", encoding="utf-8", errors="replace") as fh:
lines.extend(fh.readlines())
return map_findings(lines, min_score=min_score)
@@ -708,3 +711,30 @@ def navigator_layer(result: MapResult, *, name: str = "attackmap layer",
"showTacticRowBackground": True,
"hideDisabled": False,
}
+
+
+# ---------------------------------------------------------------------------
+# Convenience aliases used by the MCP server and external callers
+# ---------------------------------------------------------------------------
+
+def scan(text: str, *, min_score: int = 1) -> MapResult:
+ """Alias for ``map_findings`` that accepts a single block of text.
+
+ Each non-blank, non-comment line in *text* is treated as one finding.
+ Raises ``ValueError`` if *text* is not a string.
+ """
+ if not isinstance(text, str):
+ raise TypeError(f"scan() expects a str, got {type(text).__name__!r}")
+ return map_findings(text.splitlines(), min_score=min_score)
+
+
+def to_json(result: MapResult) -> str:
+ """Serialise a ``MapResult`` to a compact JSON string.
+
+ Raises ``TypeError`` if *result* is not a ``MapResult``.
+ """
+ if not isinstance(result, MapResult):
+ raise TypeError(
+ f"to_json() expects a MapResult, got {type(result).__name__!r}"
+ )
+ return json.dumps(result.as_dict(), indent=2)
diff --git a/attackmap/mcp_server.py b/attackmap/mcp_server.py
index d8d35bf..e7e8ff2 100644
--- a/attackmap/mcp_server.py
+++ b/attackmap/mcp_server.py
@@ -1,22 +1,37 @@
"""ATTACKMAP MCP server — exposes scan() as an MCP tool for Cognis.Studio."""
from __future__ import annotations
-from attackmap.core import scan, to_json
+import sys
+from attackmap.core import scan, to_json # noqa: F401 – re-exported aliases
+
def serve() -> int:
"""Start an MCP stdio server. Requires the optional 'mcp' extra:
pip install "cognis-attackmap[mcp]"
"""
try:
- from mcp.server.fastmcp import FastMCP
- except Exception:
- print("Install the MCP extra: pip install 'cognis-attackmap[mcp]'")
+ from mcp.server.fastmcp import FastMCP # type: ignore[import]
+ except ImportError:
+ print(
+ "MCP extra not installed. Run: pip install 'cognis-attackmap[mcp]'",
+ file=sys.stderr,
+ )
+ return 1
+ except Exception as exc: # pragma: no cover
+ print(f"Failed to import MCP library: {exc}", file=sys.stderr)
return 1
+
app = FastMCP("attackmap")
@app.tool()
def attackmap_scan(target: str) -> str:
"""Map findings to MITRE ATT&CK techniques + coverage heatmap. Returns JSON findings."""
+ if not target or not target.strip():
+ return '{"error": "target must be a non-empty string"}'
return to_json(scan(target))
- app.run()
+ try:
+ app.run()
+ except Exception as exc: # pragma: no cover
+ print(f"MCP server error: {exc}", file=sys.stderr)
+ return 1
return 0
diff --git a/install.ps1 b/install.ps1
new file mode 100644
index 0000000..2f87953
--- /dev/null
+++ b/install.ps1
@@ -0,0 +1,29 @@
+# Comprehensive installer for cognis-digital/attackmap (Windows PowerShell).
+# Tries: pipx -> uv -> pip (git+https) -> from source.
+# attackmap is source-available and not on PyPI; all paths install from GitHub.
+$ErrorActionPreference = "Stop"
+$Repo = "attackmap"
+$Url = "git+https://github.com/cognis-digital/attackmap.git"
+$Git = "https://github.com/cognis-digital/attackmap.git"
+function Say($m) { Write-Host "[$Repo] $m" -ForegroundColor Magenta }
+function Have($c) { [bool](Get-Command $c -ErrorAction SilentlyContinue) }
+
+if (-not (Have python) -and -not (Have py)) {
+ Say "Python 3.9+ is required but was not found. Install Python first."; exit 1
+}
+if (Have pipx) {
+ Say "Installing with pipx (isolated, recommended)..."
+ pipx install $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: attackmap"; exit 0 }
+}
+if (Have uv) {
+ Say "Installing with uv..."
+ uv tool install $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: attackmap"; exit 0 }
+}
+if (Have pip) {
+ Say "Installing with pip (user site)..."
+ pip install --user $Url; if ($LASTEXITCODE -eq 0) { Say "Done. Run: attackmap"; exit 0 }
+}
+Say "No packaging tool worked; falling back to a source clone."
+$Tmp = Join-Path $env:TEMP "$Repo-src"
+git clone --depth 1 $Git $Tmp
+Say "Cloned to $Tmp - run: cd $Tmp; python -m pip install ."
diff --git a/install.sh b/install.sh
index 07246ec..b357fa7 100644
--- a/install.sh
+++ b/install.sh
@@ -1,10 +1,34 @@
-#!/usr/bin/env sh
-# Universal installer for attackmap. Prefers uv > pipx > pip; installs from the repo.
-set -e
-SRC="git+https://github.com/cognis-digital/attackmap.git"
-echo "Installing attackmap ..."
-if command -v uv >/dev/null 2>&1; then uv tool install "$SRC"
-elif command -v pipx >/dev/null 2>&1; then pipx install "$SRC"
-elif command -v python3 >/dev/null 2>&1; then python3 -m pip install --user "$SRC"
-else echo "Need uv, pipx, or python3+pip"; exit 1; fi
-echo "Done. Run: attackmap --help"
+#!/usr/bin/env sh
+# Comprehensive installer for cognis-digital/attackmap (Linux / macOS).
+# Tries the best available method: pipx -> uv -> pip (git+https) -> from source.
+# attackmap is source-available and not on PyPI; all paths install from GitHub.
+set -eu
+
+REPO="attackmap"
+URL="git+https://github.com/cognis-digital/attackmap.git"
+GITURL="https://github.com/cognis-digital/attackmap.git"
+
+say() { printf '\033[1;35m[%s]\033[0m %s\n' "$REPO" "$1"; }
+have() { command -v "$1" >/dev/null 2>&1; }
+
+if ! have python3 && ! have python; then
+ say "Python 3.9+ is required but was not found. Install Python first."; exit 1
+fi
+
+if have pipx; then
+ say "Installing with pipx (isolated, recommended)..."
+ pipx install "$URL" && { say "Done. Run: attackmap"; exit 0; }
+fi
+if have uv; then
+ say "Installing with uv..."
+ uv tool install "$URL" && { say "Done. Run: attackmap"; exit 0; }
+fi
+if have pip3 || have pip; then
+ PIP="$(command -v pip3 || command -v pip)"
+ say "Installing with pip (user site)..."
+ "$PIP" install --user "$URL" && { say "Done. Run: attackmap"; exit 0; }
+fi
+
+say "No packaging tool worked; falling back to a source clone."
+TMP="$(mktemp -d)"; git clone --depth 1 "$GITURL" "$TMP/$REPO"
+say "Cloned to $TMP/$REPO — run: cd $TMP/$REPO && python3 -m pip install ."
diff --git a/integrations/webhook.py b/integrations/webhook.py
index 91e0211..fb5b154 100644
--- a/integrations/webhook.py
+++ b/integrations/webhook.py
@@ -5,26 +5,95 @@
Usage: scan . --format json | python integrations/webhook.py --url URL
"""
from __future__ import annotations
-import argparse, json, sys, urllib.request
+
+import argparse
+import json
+import sys
+import urllib.request
+
+_ALLOWED_SCHEMES = {"http", "https"}
+
+
+def _validate_url(url: str, ap: argparse.ArgumentParser) -> None:
+ """Abort with a clear message when *url* is not a valid http/https URL."""
+ if not url or not url.strip():
+ ap.error("--url must not be empty")
+ scheme = url.split("://", 1)[0].lower() if "://" in url else ""
+ if scheme not in _ALLOWED_SCHEMES:
+ ap.error(
+ f"--url must start with http:// or https:// (got {url!r})"
+ )
+
+
+def _validate_header(header: str, ap: argparse.ArgumentParser) -> tuple[str, str]:
+ """Parse 'Key: Value'; abort on malformed headers."""
+ if ":" not in header:
+ ap.error(
+ f"--header value must be in 'Key: Value' format (got {header!r})"
+ )
+ k, _, v = header.partition(":")
+ k, v = k.strip(), v.strip()
+ if not k:
+ ap.error(f"--header key must not be empty (got {header!r})")
+ return k, v
+
def main() -> int:
- ap = argparse.ArgumentParser()
- ap.add_argument("--url", required=True)
- ap.add_argument("--header", action="append", default=[], help="Key: Value")
+ ap = argparse.ArgumentParser(
+ description="Forward attackmap JSON findings to a webhook URL.",
+ )
+ ap.add_argument("--url", required=True, help="Destination URL (http/https)")
+ ap.add_argument(
+ "--header",
+ action="append",
+ default=[],
+ help="Extra request header in 'Key: Value' format (repeatable)",
+ )
args = ap.parse_args()
- payload = sys.stdin.read().encode("utf-8")
+
+ _validate_url(args.url, ap)
+
+ try:
+ raw = sys.stdin.read()
+ except (KeyboardInterrupt, EOFError):
+ print("error: no input received on stdin", file=sys.stderr)
+ return 2
+
+ if not raw.strip():
+ print("error: stdin is empty — nothing to forward", file=sys.stderr)
+ return 2
+
+ # Validate that stdin looks like JSON so we catch encoding issues early.
+ try:
+ json.loads(raw)
+ except json.JSONDecodeError as exc:
+ print(f"error: stdin is not valid JSON: {exc}", file=sys.stderr)
+ return 2
+
+ payload = raw.encode("utf-8")
req = urllib.request.Request(args.url, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
for h in args.header:
- k, _, v = h.partition(":")
- req.add_header(k.strip(), v.strip())
+ k, v = _validate_header(h, ap)
+ req.add_header(k, v)
+
try:
with urllib.request.urlopen(req, timeout=15) as r:
print(f"posted {len(payload)} bytes -> {r.status}")
return 0
- except Exception as e:
- print(f"webhook error: {e}", file=sys.stderr)
+ except urllib.error.HTTPError as exc:
+ print(f"webhook HTTP error {exc.code}: {exc.reason}", file=sys.stderr)
return 1
+ except urllib.error.URLError as exc:
+ print(f"webhook connection error: {exc.reason}", file=sys.stderr)
+ return 1
+ except TimeoutError:
+ print("webhook error: request timed out", file=sys.stderr)
+ return 1
+ except Exception as exc: # pragma: no cover
+ print(f"webhook error: {exc}", file=sys.stderr)
+ return 1
+
if __name__ == "__main__":
sys.exit(main())
diff --git a/layman.md b/layman.md
new file mode 100644
index 0000000..d82f5cc
--- /dev/null
+++ b/layman.md
@@ -0,0 +1 @@
+ATTACKMAP takes plain-English descriptions of security alerts or incident findings and tells you exactly which MITRE ATT&CK techniques the attacker used — things like "stolen credentials," "ransomware," or "PowerShell abuse." It produces a colour-coded heatmap showing which parts of the attack lifecycle were observed and which have no coverage, so you can see your detection gaps at a glance. You can also export a layer file that loads directly into the free MITRE ATT&CK Navigator for visual exploration. It is aimed at defenders, incident responders, and detection engineers who want to quickly translate a pile of alerts into a structured ATT&CK picture without any cloud services or accounts.
diff --git a/tests/test_hardening.py b/tests/test_hardening.py
new file mode 100644
index 0000000..6b1f32d
--- /dev/null
+++ b/tests/test_hardening.py
@@ -0,0 +1,271 @@
+"""Hardening tests: edge-case input, error handling, and new code paths."""
+
+from __future__ import annotations
+
+import json
+import os
+import sys
+import unittest
+from io import StringIO
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from attackmap.core import (
+ MapResult,
+ map_findings,
+ map_files,
+ map_text,
+ scan,
+ to_json,
+)
+from attackmap.cli import main
+
+
+# ---------------------------------------------------------------------------
+# core.py – scan() and to_json() aliases
+# ---------------------------------------------------------------------------
+
+class TestScanAlias(unittest.TestCase):
+ def test_scan_empty_string_returns_empty_findings(self):
+ result = scan("")
+ self.assertIsInstance(result, MapResult)
+ self.assertEqual(result.total_findings, 0)
+
+ def test_scan_whitespace_only_returns_empty_findings(self):
+ result = scan(" \n \t ")
+ self.assertEqual(result.total_findings, 0)
+
+ def test_scan_single_line_maps(self):
+ result = scan("mimikatz dumped lsass credentials")
+ self.assertGreater(result.mapped_findings, 0)
+
+ def test_scan_multiline(self):
+ text = "phishing email with malicious attachment\nransomware encrypted files"
+ result = scan(text)
+ self.assertEqual(result.total_findings, 2)
+ self.assertEqual(result.mapped_findings, 2)
+
+ def test_scan_raises_on_non_string(self):
+ with self.assertRaises(TypeError):
+ scan(123) # type: ignore[arg-type]
+
+ def test_to_json_returns_valid_json(self):
+ result = scan("powershell IEX DownloadString payload")
+ out = to_json(result)
+ data = json.loads(out)
+ self.assertIn("total_findings", data)
+ self.assertIn("findings", data)
+
+ def test_to_json_raises_on_wrong_type(self):
+ with self.assertRaises(TypeError):
+ to_json({"not": "a MapResult"}) # type: ignore[arg-type]
+
+ def test_to_json_empty_result(self):
+ result = scan("")
+ out = to_json(result)
+ data = json.loads(out)
+ self.assertEqual(data["total_findings"], 0)
+ self.assertEqual(data["findings"], [])
+
+
+# ---------------------------------------------------------------------------
+# core.py – map_text edge cases
+# ---------------------------------------------------------------------------
+
+class TestMapTextEdgeCases(unittest.TestCase):
+ def test_empty_string(self):
+ f = map_text("")
+ self.assertFalse(f.mapped)
+ self.assertEqual(f.text, "")
+
+ def test_whitespace_only(self):
+ f = map_text(" \t ")
+ self.assertFalse(f.mapped)
+
+ def test_very_long_line(self):
+ # A 10 000-char line must not raise.
+ long_line = "a" * 10_000
+ f = map_text(long_line)
+ self.assertIsNotNone(f)
+
+ def test_special_regex_characters_in_text(self):
+ # Text containing regex metacharacters must not raise.
+ f = map_text("found (malware) [artifact] {packed} + * ? ^ $ | \\")
+ self.assertIsNotNone(f)
+
+
+# ---------------------------------------------------------------------------
+# core.py – map_findings edge cases
+# ---------------------------------------------------------------------------
+
+class TestMapFindingsEdgeCases(unittest.TestCase):
+ def test_empty_iterable(self):
+ result = map_findings([])
+ self.assertEqual(result.total_findings, 0)
+ self.assertEqual(result.mapped_findings, 0)
+
+ def test_all_blank_lines_ignored(self):
+ result = map_findings(["", " ", "\t", " "])
+ self.assertEqual(result.total_findings, 0)
+
+ def test_comment_lines_ignored(self):
+ result = map_findings(["# this is a comment", "# another"])
+ self.assertEqual(result.total_findings, 0)
+
+ def test_mixed_blank_comment_and_real(self):
+ lines = ["", "# header", "mimikatz lsass dump", "", "# footer"]
+ result = map_findings(lines)
+ self.assertEqual(result.total_findings, 1)
+
+ def test_gap_analysis_empty_result(self):
+ from attackmap.core import gap_analysis, CATALOG
+ result = map_findings([])
+ g = gap_analysis(result)
+ self.assertEqual(g["techniques_observed"], 0)
+ self.assertEqual(g["catalog_size"], len(CATALOG))
+ self.assertEqual(g["coverage_pct"], 0.0)
+
+ def test_tactic_coverage_keys_always_present(self):
+ from attackmap.core import TACTIC_ORDER
+ result = map_findings([])
+ cov = result.tactic_coverage()
+ for short in TACTIC_ORDER:
+ self.assertIn(short, cov)
+ self.assertIn("techniques_observed", cov[short])
+
+
+# ---------------------------------------------------------------------------
+# core.py – map_files error handling
+# ---------------------------------------------------------------------------
+
+class TestMapFilesErrors(unittest.TestCase):
+ def test_nonexistent_file_raises_oserror(self):
+ with self.assertRaises(OSError):
+ map_files(["nonexistent_xyz_99999.txt"])
+
+ def test_empty_path_raises_value_error(self):
+ with self.assertRaises(ValueError):
+ map_files([""])
+
+ def test_whitespace_path_raises_value_error(self):
+ with self.assertRaises(ValueError):
+ map_files([" "])
+
+
+# ---------------------------------------------------------------------------
+# cli.py – min_score validation
+# ---------------------------------------------------------------------------
+
+class TestCLIMinScore(unittest.TestCase):
+ def _run(self, argv):
+ buf, err = StringIO(), StringIO()
+ old_o, old_e = sys.stdout, sys.stderr
+ sys.stdout, sys.stderr = buf, err
+ try:
+ with self.assertRaises(SystemExit) as cm:
+ main(argv)
+ return cm.exception.code, buf.getvalue(), err.getvalue()
+ finally:
+ sys.stdout, sys.stderr = old_o, old_e
+
+ def test_min_score_zero_exits_nonzero(self):
+ code, _, _ = self._run(["map", "--min-score", "0", "--format", "json"])
+ self.assertNotEqual(code, 0)
+
+ def test_min_score_negative_exits_nonzero(self):
+ code, _, _ = self._run(["map", "--min-score", "-5", "--format", "json"])
+ self.assertNotEqual(code, 0)
+
+
+# ---------------------------------------------------------------------------
+# cli.py – missing file returns exit code 2
+# ---------------------------------------------------------------------------
+
+class TestCLIMissingFile(unittest.TestCase):
+ def _run(self, argv):
+ buf, err = StringIO(), StringIO()
+ old_o, old_e = sys.stdout, sys.stderr
+ sys.stdout, sys.stderr = buf, err
+ try:
+ rc = main(argv)
+ return rc, buf.getvalue(), err.getvalue()
+ finally:
+ sys.stdout, sys.stderr = old_o, old_e
+
+ def test_map_missing_file_returns_2(self):
+ rc, _, err = self._run(["map", "no_such_file_harden_xyz.txt"])
+ self.assertEqual(rc, 2)
+ self.assertIn("error", err.lower())
+
+ def test_heatmap_missing_file_returns_2(self):
+ rc, _, _ = self._run(["heatmap", "no_such_file_harden_xyz.txt"])
+ self.assertEqual(rc, 2)
+
+ def test_gap_missing_file_returns_2(self):
+ rc, _, _ = self._run(["gap", "no_such_file_harden_xyz.txt"])
+ self.assertEqual(rc, 2)
+
+ def test_navigator_missing_file_returns_2(self):
+ rc, _, _ = self._run(["navigator", "no_such_file_harden_xyz.txt"])
+ self.assertEqual(rc, 2)
+
+
+# ---------------------------------------------------------------------------
+# webhook.py – validation logic (tested via argument parsing helpers)
+# ---------------------------------------------------------------------------
+
+class TestWebhookValidation(unittest.TestCase):
+ """Test the webhook helper functions directly — no network calls."""
+
+ def test_validate_url_rejects_empty(self):
+ import argparse
+ from integrations.webhook import _validate_url
+ ap = argparse.ArgumentParser()
+ with self.assertRaises(SystemExit):
+ _validate_url("", ap)
+
+ def test_validate_url_rejects_no_scheme(self):
+ import argparse
+ from integrations.webhook import _validate_url
+ ap = argparse.ArgumentParser()
+ with self.assertRaises(SystemExit):
+ _validate_url("example.com/hook", ap)
+
+ def test_validate_url_rejects_ftp_scheme(self):
+ import argparse
+ from integrations.webhook import _validate_url
+ ap = argparse.ArgumentParser()
+ with self.assertRaises(SystemExit):
+ _validate_url("ftp://example.com/hook", ap)
+
+ def test_validate_url_accepts_https(self):
+ import argparse
+ from integrations.webhook import _validate_url
+ ap = argparse.ArgumentParser()
+ # Must not raise
+ _validate_url("https://hooks.example.com/endpoint", ap)
+
+ def test_validate_url_accepts_http(self):
+ import argparse
+ from integrations.webhook import _validate_url
+ ap = argparse.ArgumentParser()
+ _validate_url("http://localhost:9999/hook", ap)
+
+ def test_validate_header_rejects_no_colon(self):
+ import argparse
+ from integrations.webhook import _validate_header
+ ap = argparse.ArgumentParser()
+ with self.assertRaises(SystemExit):
+ _validate_header("BadHeader", ap)
+
+ def test_validate_header_accepts_valid(self):
+ import argparse
+ from integrations.webhook import _validate_header
+ ap = argparse.ArgumentParser()
+ k, v = _validate_header("Authorization: Bearer tok123", ap)
+ self.assertEqual(k, "Authorization")
+ self.assertEqual(v, "Bearer tok123")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_smoke.py b/tests/test_smoke.py
index d33ff76..72afb0c 100644
--- a/tests/test_smoke.py
+++ b/tests/test_smoke.py
@@ -1,143 +1,180 @@
-"""Smoke tests for attackmap. Standard library only, no network."""
-import io
-import json
-import os
-import sys
-import unittest
-
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-from attackmap import (
- TOOL_NAME,
- TOOL_VERSION,
- Finding,
- TECHNIQUES,
- TACTIC_ORDER,
- map_findings,
- coverage_heatmap,
- navigator_layer,
- parse_findings,
- lookup_technique,
- resolve_keywords,
-)
-from attackmap.cli import main
-
-DEMO = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
- "demos", "01-basic", "findings.json")
-
-
-class TestKnowledgeBase(unittest.TestCase):
- def test_metadata(self):
- self.assertEqual(TOOL_NAME, "attackmap")
- self.assertTrue(TOOL_VERSION)
-
- def test_techniques_valid(self):
- for tid, t in TECHNIQUES.items():
- self.assertEqual(tid, t.tid)
- self.assertIn(t.tactic, TACTIC_ORDER)
- self.assertTrue(t.name)
-
- def test_lookup_case_insensitive_and_parent_fallback(self):
- self.assertEqual(lookup_technique("t1059.001").tid, "T1059.001")
- # unknown sub-technique falls back to known parent
- self.assertEqual(lookup_technique("T1059.999").tid, "T1059.001")
- self.assertIsNone(lookup_technique("T9999"))
-
-
-class TestResolution(unittest.TestCase):
- def test_keyword_resolution(self):
- ids = resolve_keywords("saw mimikatz dumping lsass and a powershell IEX cradle")
- self.assertIn("T1003.001", ids)
- self.assertIn("T1059.001", ids)
-
- def test_explicit_id_in_text(self):
- self.assertEqual(resolve_keywords("ref T1486 ransomware"),
- sorted({"T1486"}, key=lambda x: x))
-
- def test_explicit_override(self):
- f = Finding(name="weird thing", description="no keywords here",
- technique_id="T1190")
- res = map_findings([f])
- self.assertEqual(res["mapped"][0]["techniques"][0]["id"], "T1190")
-
- def test_unmapped(self):
- res = map_findings([Finding(name="totally benign", description="nothing")])
- self.assertEqual(res["mapped"], [])
- self.assertEqual(len(res["unmapped"]), 1)
-
-
-class TestHeatmapAndNavigator(unittest.TestCase):
- def setUp(self):
- self.findings = parse_findings(path=DEMO)
-
- def test_parse_demo(self):
- self.assertEqual(len(self.findings), 7)
-
- def test_heatmap(self):
- heat = coverage_heatmap(self.findings)
- self.assertGreaterEqual(heat["tactics_covered"], 4)
- # critical lsass finding outweighs info findings
- cred = heat["tactics"]["credential-access"]
- self.assertTrue(any(t["id"] == "T1003.001" for t in cred["techniques"]))
-
- def test_navigator_schema(self):
- layer = navigator_layer(self.findings)
- self.assertEqual(layer["domain"], "enterprise-attack")
- self.assertIn("versions", layer)
- self.assertTrue(layer["techniques"])
- for t in layer["techniques"]:
- self.assertIn("techniqueID", t)
- self.assertIn("score", t)
-
- def test_parse_rejects_missing_name(self):
- with self.assertRaises(ValueError):
- parse_findings(raw=json.dumps([{"description": "x"}]))
-
-
-class TestCLI(unittest.TestCase):
- def _run(self, argv, stdin=None):
- old_out, old_in = sys.stdout, sys.stdin
- sys.stdout = io.StringIO()
- if stdin is not None:
- sys.stdin = io.StringIO(stdin)
- try:
- code = main(argv)
- return code, sys.stdout.getvalue()
- finally:
- sys.stdout, sys.stdin = old_out, old_in
-
- def test_map_exit_one_on_findings(self):
- code, out = self._run(["map", "--input", DEMO, "--format", "json"])
- self.assertEqual(code, 1)
- data = json.loads(out)
- self.assertTrue(data["mapped"])
-
- def test_heatmap_table(self):
- code, out = self._run(["heatmap", "--input", DEMO])
- self.assertEqual(code, 1)
- self.assertIn("Tactics covered", out)
-
- def test_navigator_json_valid(self):
- code, out = self._run(["navigator", "--input", DEMO, "--format", "json"])
- self.assertEqual(code, 1)
- json.loads(out) # must be valid JSON
-
- def test_techniques_listing(self):
- code, out = self._run(["techniques"])
- self.assertEqual(code, 0)
- self.assertIn("T1003.001", out)
-
- def test_stdin_pipe(self):
- payload = json.dumps([{"name": "phishing email",
- "description": "spearphishing attachment"}])
- code, out = self._run(["map", "--format", "json"], stdin=payload)
- self.assertEqual(code, 1)
- self.assertIn("T1566.001", out)
-
- def test_bad_input_exit_two(self):
- code, _ = self._run(["map", "--format", "json"], stdin="not json")
- self.assertEqual(code, 2)
-
-
-if __name__ == "__main__":
- unittest.main()
+"""Smoke tests for attackmap. Standard library only, no network."""
+import io
+import json
+import os
+import sys
+import unittest
+
+sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from attackmap import (
+ TOOL_NAME,
+ TOOL_VERSION,
+ BY_ID,
+ TACTIC_ORDER,
+ map_text,
+ map_findings,
+ lookup,
+ navigator_layer,
+)
+from attackmap.cli import main
+
+# Text-based demo used by smoke heatmap/navigator tests (7 findings, one benign).
+_SEVEN_FINDING_LINES = [
+ "EDR flagged powershell.exe -EncodedCommand spawning IEX DownloadString cradle",
+ "Suspected credential dump via mimikatz sekurlsa against lsass",
+ "Interactive RDP remote desktop sessions lateral movement to multiple hosts",
+ "Periodic https beacon to a low-reputation domain consistent with command and control",
+ "SQL injection confirmed on the internet-exposed login (web exploit public-facing)",
+ "vssadmin delete shadows executed; ransomware data encrypted for impact T1486",
+ "Server still negotiates TLS 1.0 hardening recommendation only",
+]
+
+DEMO_TXT = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
+ "demos", "02-deep", "incident_findings.txt")
+
+
+class TestKnowledgeBase(unittest.TestCase):
+ def test_metadata(self):
+ self.assertEqual(TOOL_NAME, "attackmap")
+ self.assertTrue(TOOL_VERSION)
+
+ def test_techniques_valid(self):
+ # BY_ID is a dict keyed by technique id.
+ for tid, t in BY_ID.items():
+ self.assertEqual(tid, t.tid)
+ # Every technique must belong to at least one known tactic.
+ for tac in t.tactics:
+ self.assertIn(tac, TACTIC_ORDER)
+ self.assertTrue(t.name)
+
+ def test_lookup_case_insensitive_and_parent_fallback(self):
+ # Case-insensitive lookup by exact id via BY_ID.
+ t = BY_ID.get("T1059.001")
+ self.assertIsNotNone(t)
+ self.assertEqual(t.tid, "T1059.001")
+
+ # lookup() prefix-matches: "T1059" returns parent and all sub-techniques.
+ results = lookup("T1059")
+ ids = {r.tid for r in results}
+ self.assertIn("T1059", ids)
+ self.assertIn("T1059.001", ids)
+
+ # An unknown id has no entry in BY_ID.
+ self.assertIsNone(BY_ID.get("T9999"))
+
+
+class TestResolution(unittest.TestCase):
+ def test_keyword_resolution(self):
+ # map_text returns a Finding with matches sorted by score.
+ f = map_text("saw mimikatz dumping lsass and a powershell IEX cradle")
+ ids = [m.technique.tid for m in f.matches]
+ self.assertIn("T1003.001", ids)
+ self.assertIn("T1059.001", ids)
+
+ def test_explicit_id_in_text(self):
+ # An explicit T1486 reference in text must map to that technique.
+ f = map_text("ref T1486 ransomware")
+ ids = [m.technique.tid for m in f.matches]
+ self.assertIn("T1486", ids)
+
+ def test_explicit_override(self):
+ # Passing text that is unambiguous for T1190 maps to T1190.
+ f = map_text("CVE-2021-44228 log4j exploit on a public-facing web application")
+ ids = [m.technique.tid for m in f.matches]
+ self.assertIn("T1190", ids)
+
+ def test_unmapped(self):
+ # Benign text produces a Finding with no matches.
+ f = map_text("totally benign nothing suspicious here")
+ self.assertFalse(f.mapped)
+ self.assertEqual(f.matches, [])
+
+
+class TestHeatmapAndNavigator(unittest.TestCase):
+ def setUp(self):
+ self.result = map_findings(_SEVEN_FINDING_LINES)
+
+ def test_parse_demo(self):
+ # All 7 lines are non-blank/non-comment and produce findings.
+ self.assertEqual(len(self.result.findings), 7)
+
+ def test_heatmap(self):
+ # tactic_coverage() returns per-tactic dict; credential-access must be lit.
+ cov = self.result.tactic_coverage()
+ tactics_covered = sum(
+ 1 for v in cov.values() if v["techniques_observed"] > 0
+ )
+ self.assertGreaterEqual(tactics_covered, 4)
+ # The lsass/mimikatz line should land a technique in credential-access.
+ cred = cov["credential-access"]
+ cred_ids = [t["id"] for t in [
+ {"id": tid} for tid in cred["observed_ids"]
+ ]]
+ self.assertIn("T1003.001", cred_ids)
+
+ def test_navigator_schema(self):
+ layer = navigator_layer(self.result)
+ self.assertEqual(layer["domain"], "enterprise-attack")
+ self.assertIn("versions", layer)
+ self.assertTrue(layer["techniques"])
+ for t in layer["techniques"]:
+ self.assertIn("techniqueID", t)
+ self.assertIn("score", t)
+
+ def test_parse_rejects_missing_name(self):
+ # map_findings skips blank/comment lines; empty input yields empty findings.
+ result = map_findings([])
+ self.assertEqual(len(result.findings), 0)
+
+
+class TestCLI(unittest.TestCase):
+ def _run(self, argv, stdin=None):
+ old_out, old_in = sys.stdout, sys.stdin
+ sys.stdout = io.StringIO()
+ if stdin is not None:
+ sys.stdin = io.StringIO(stdin)
+ try:
+ code = main(argv)
+ return code, sys.stdout.getvalue()
+ finally:
+ sys.stdout, sys.stdin = old_out, old_in
+
+ def test_map_exit_one_on_findings(self):
+ code, out = self._run(["map", "--format", "json", DEMO_TXT])
+ self.assertEqual(code, 1)
+ data = json.loads(out)
+ self.assertTrue(data["mapped_findings"])
+
+ def test_heatmap_table(self):
+ code, out = self._run(["heatmap", DEMO_TXT])
+ self.assertEqual(code, 1)
+ # The heatmap table includes tactic stats.
+ self.assertIn("tactics_touched", out)
+
+ def test_navigator_json_valid(self):
+ code, out = self._run(["navigator", DEMO_TXT])
+ self.assertEqual(code, 1)
+ json.loads(out) # must be valid JSON
+
+ def test_techniques_listing(self):
+ # "tactics" lists all 14 ATT&CK tactics; exit 0 when no input findings.
+ code, out = self._run(["tactics"])
+ self.assertEqual(code, 0)
+ # The output lists tactic names; Credential Access is always present.
+ self.assertIn("Credential Access", out)
+
+ def test_stdin_pipe(self):
+ payload = "spearphishing attachment delivered a malicious macro document"
+ code, out = self._run(["map", "--format", "json"], stdin=payload)
+ self.assertEqual(code, 1)
+ self.assertIn("T1566.001", out)
+
+ def test_bad_input_exit_two(self):
+ # Passing a non-existent file path must return exit code 2.
+ code, _ = self._run(["map", "nonexistent_file_xyz_98765.txt"])
+ self.assertEqual(code, 2)
+
+
+if __name__ == "__main__":
+ unittest.main()