-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathVisionSystem.py
More file actions
234 lines (196 loc) · 9.27 KB
/
VisionSystem.py
File metadata and controls
234 lines (196 loc) · 9.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""
VisionSystem — High-Performance X11 Framebuffer Capture and YOLOv10 Inference Engine.
Architecture overview
---------------------
This module implements the first two stages of the closed-loop Target Acquisition Pipeline:
Stage 1 — Acquisition
Raw pixel data is read directly from the X11 framebuffer using ``XGetImage`` (ZPixmap
format). A fixed-size Region of Interest (ROI) centred on the configured game-window
position is captured in a single round-trip to the X server, minimising system-call
overhead. Typical capture latency: **< 1 ms** on a local X11 connection.
Stage 2 — Inference
The captured BGR frame is fed synchronously to a YOLOv10 model (PyTorch or ONNX).
GPU acceleration is enabled through CUDA with optional Mixed-Precision (FP16/TF32) and
cuDNN auto-tuning, reducing per-frame inference time to **< 5 ms** on RTX-class hardware.
Threading model
---------------
The system operates in **synchronous mode**: capture and inference run sequentially in the
caller's thread. This avoids producer/consumer queue latency and guarantees that every
captured frame is processed by the neural network.
Detection output schema
-----------------------
Each entry in the returned ``detections`` list is a ``dict`` with the following keys:
* ``id`` — sequential index within the current frame's result set
* ``x`` — bounding-box centre X in ROI coordinates (pixels)
* ``y`` — bounding-box centre Y in ROI coordinates (pixels)
* ``w`` — bounding-box width (pixels)
* ``h`` — bounding-box height (pixels)
* ``conf`` — YOLOv10 detection confidence score ∈ [0, 1]
"""
import Xlib.display
import Xlib.X
import numpy as np
import cv2
if not hasattr(cv2, 'setNumThreads'):
cv2.setNumThreads = lambda x: None
import torch
import functools
import threading
import queue
import screeninfo
import time
from ultralytics import YOLO
class VisionSystem:
"""Real-time capture and neural inference subsystem.
Combines an X11 framebuffer reader with a YOLOv10 object detector to produce
per-frame bounding-box detections within a configurable Region of Interest.
Parameters
----------
model_path : str
Path to the YOLOv10 weight file (``.pt``). ONNX paths are accepted by the
argument but internally fall back to ``yolov10n.pt`` due to float16/float32
precision constraints in the ONNX runtime path.
fov_size : int
Side length (pixels) of the square ROI. Must match the ``imgsz`` used during
YOLO export (default: 416).
monitor_index : int
Zero-based X11 screen index. Falls back to screen 0 if the index exceeds the
number of available screens.
use_tensorrt : bool
Reserved for future TensorRT integration (currently unused at runtime).
game_window_x : int or None
Pixel X-coordinate of the top-left corner of the capture ROI. When ``None``
the ROI is centred on the detected screen.
game_window_y : int or None
Pixel Y-coordinate of the top-left corner of the capture ROI. When ``None``
the ROI is centred on the detected screen.
conf : float
Minimum YOLOv10 confidence threshold for a detection to be returned (∈ [0, 1]).
min_y : int
Detections whose top edge (``y1``) is above this pixel row are discarded.
Useful for filtering out distant or irrelevant regions near the top of the ROI.
"""
def __init__(self, model_path="yolov10n.pt", fov_size=416, monitor_index=1, use_tensorrt=True, game_window_x=None, game_window_y=None, conf=0.25, min_y=0):
torch.load = functools.partial(torch.load, weights_only=False)
# Force PyTorch weights — ONNX float16/float32 mixing causes inference errors
if not model_path.endswith('.pt'):
import warnings
warnings.warn(
f"ONNX model path '{model_path}' is not supported due to float16/float32 "
"precision constraints. Falling back to 'yolov10n.pt'.",
RuntimeWarning,
stacklevel=2,
)
model_path = "yolov10n.pt"
# --- Stage 1: X11 connection ---
self.display = Xlib.display.Display()
if monitor_index >= self.display.screen_count():
monitor_index = 0
self.screen = self.display.screen(monitor_index)
self.target_drawable = self.screen.root
geo = self.target_drawable.get_geometry()
self.screen_w = geo.width
self.screen_h = geo.height
print(f"🖥️ Xlib Screen {monitor_index} detected: {self.screen_w}x{self.screen_h}")
# --- ROI computation ---
# Formula: left = window_x OR (screen_w - fov_size) / 2 (centred fallback)
self.fov_size = fov_size
if game_window_x is not None and game_window_y is not None:
self.left = game_window_x
self.top = game_window_y
else:
self.left = (self.screen_w // 2) - (self.fov_size // 2)
self.top = (self.screen_h // 2) - (self.fov_size // 2)
# Clamp ROI to valid screen bounds
self.left = max(0, min(self.left, self.screen_w - self.fov_size))
self.top = max(0, min(self.top, self.screen_h - self.fov_size))
print(f"🎯 Calibrated ROI origin: ({self.left}, {self.top})")
# --- Stage 2: YOLOv10 model (synchronous mode for reliability) ---
self.model = YOLO(model_path)
# GPU optimisations (when CUDA is available)
if torch.cuda.is_available():
self.model.to('cuda')
torch.backends.cudnn.benchmark = True # auto-tune cuDNN kernels
torch.backends.cuda.matmul.allow_tf32 = True # TF32 on Ampere/Ada GPUs
torch.backends.cudnn.allow_tf32 = True
print(f"⚡ GPU: {torch.cuda.get_device_name(0)} | VRAM: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f}GB")
self.running = True
self.conf = conf
self.min_y = min_y
print("🧵 Synchronous mode: capture + YOLOv10 in the main loop")
def capture_frame(self):
"""Capture a single ROI frame from the X11 framebuffer.
Uses ``XGetImage`` (ZPixmap) to read ``fov_size × fov_size`` pixels starting
at ``(self.left, self.top)`` in root-window coordinates. The raw BGRA pixel
buffer is reshaped into a NumPy array and converted to BGR for OpenCV/YOLO.
Returns
-------
numpy.ndarray
BGR image of shape ``(fov_size, fov_size, 3)``, dtype ``uint8``.
"""
raw_img = self.target_drawable.get_image(
self.left, self.top,
self.fov_size, self.fov_size,
Xlib.X.ZPixmap, 0xffffffff
)
frame = np.frombuffer(raw_img.data, dtype=np.uint8)
frame = frame.reshape(self.fov_size, self.fov_size, 4)
return cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
def detect_targets(self):
"""Capture one frame and run synchronous YOLOv10 inference.
The inference call uses ``torch.cuda.amp.autocast`` for automatic Mixed-Precision
(FP16 on supported GPUs), halving memory bandwidth requirements and doubling
arithmetic throughput on Tensor Core hardware.
Detections are filtered by:
* ``conf`` threshold (set in constructor)
* ``min_y`` upper-boundary mask (discards detections too high in the ROI)
* Bottom-edge guard: detections whose ``y2 > fov_size - 5`` are dropped to
avoid boundary artefacts from partial bounding boxes.
Returns
-------
tuple[list[dict], numpy.ndarray]
A 2-tuple of ``(detections, frame)`` where ``detections`` is a list of
detection dicts (see module docstring) and ``frame`` is the raw BGR capture
used for the inference.
"""
frame = self.capture_frame()
# Mixed-precision inference for maximum GPU throughput
with torch.cuda.amp.autocast():
results = self.model.predict(
frame,
conf=self.conf,
classes=[0],
imgsz=416,
device=0,
half=True,
verbose=False,
augment=False,
agnostic_nms=True,
max_det=10
)
detections = []
if len(results) > 0:
for i, box in enumerate(results[0].boxes):
x1, y1, x2, y2 = box.xyxy[0].tolist()
# Drop partial detections clipped by the ROI bottom edge
if y2 > self.fov_size - 5:
continue
# Drop detections above the configured vertical threshold
if y1 < self.min_y:
continue
detections.append({
'id': i,
'x': int((x1 + x2) / 2),
'y': int((y1 + y2) / 2),
'w': int(x2 - x1),
'h': int(y2 - y1),
'conf': float(box.conf)
})
return detections, frame
def stop(self):
"""Signal the vision system to stop processing.
In synchronous mode there are no background threads to join; this method
simply sets the ``running`` flag to ``False`` so that callers can poll it.
"""
self.running = False
print("🛑 VisionSystem stopped")