-
-
Notifications
You must be signed in to change notification settings - Fork 147
Expand file tree
/
Copy pathconftest.py
More file actions
247 lines (207 loc) · 8.73 KB
/
conftest.py
File metadata and controls
247 lines (207 loc) · 8.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""
Root pytest configuration for OpenContracts.
This file provides pytest-xdist parallel testing support and ensures proper
worker isolation for tests.
"""
import asyncio
import os
import pytest
from django import db
@pytest.fixture(scope="session", autouse=True)
def disable_document_processing_signals(django_db_setup):
"""
Disable document and annotation processing signals for all tests by default.
This prevents Celery tasks from being triggered when creating test documents,
which would fail because test documents typically don't have actual files.
Tests that specifically need document processing signals (e.g., integration tests
for the processing pipeline) can use the `enable_doc_processing_signals` fixture
to temporarily re-enable them.
The signals are reconnected after all tests complete.
"""
from django.db.models.signals import post_save
from opencontractserver.annotations.models import Annotation
from opencontractserver.annotations.signals import (
ANNOT_CREATE_UID,
process_annot_on_create_atomic,
)
from opencontractserver.documents.models import Document, DocumentPath
from opencontractserver.documents.signals import (
DOC_CREATE_UID,
DOC_PATH_CREATE_UID,
process_doc_on_create_atomic,
process_doc_on_document_path_create,
)
# Disconnect signals
post_save.disconnect(
process_doc_on_create_atomic, sender=Document, dispatch_uid=DOC_CREATE_UID
)
post_save.disconnect(
process_annot_on_create_atomic, sender=Annotation, dispatch_uid=ANNOT_CREATE_UID
)
post_save.disconnect(
process_doc_on_document_path_create,
sender=DocumentPath,
dispatch_uid=DOC_PATH_CREATE_UID,
)
yield
# Reconnect signals after all tests complete
post_save.connect(
process_doc_on_create_atomic, sender=Document, dispatch_uid=DOC_CREATE_UID
)
post_save.connect(
process_annot_on_create_atomic, sender=Annotation, dispatch_uid=ANNOT_CREATE_UID
)
post_save.connect(
process_doc_on_document_path_create,
sender=DocumentPath,
dispatch_uid=DOC_PATH_CREATE_UID,
)
@pytest.fixture
def enable_doc_processing_signals():
"""
Re-enable document processing signals for a specific test.
Use this fixture in tests that specifically need to test document processing
pipeline behavior with signals firing.
Example:
def test_document_processing_flow(enable_doc_processing_signals):
# Signals are enabled for this test
doc = Document.objects.create(...) # Will trigger processing
"""
from django.db.models.signals import post_save
from opencontractserver.annotations.models import Annotation
from opencontractserver.annotations.signals import (
ANNOT_CREATE_UID,
process_annot_on_create_atomic,
)
from opencontractserver.documents.models import Document, DocumentPath
from opencontractserver.documents.signals import (
DOC_CREATE_UID,
DOC_PATH_CREATE_UID,
process_doc_on_create_atomic,
process_doc_on_document_path_create,
)
# Re-enable signals for this test
post_save.connect(
process_doc_on_create_atomic, sender=Document, dispatch_uid=DOC_CREATE_UID
)
post_save.connect(
process_annot_on_create_atomic, sender=Annotation, dispatch_uid=ANNOT_CREATE_UID
)
post_save.connect(
process_doc_on_document_path_create,
sender=DocumentPath,
dispatch_uid=DOC_PATH_CREATE_UID,
)
yield
# Disable signals again after the test
post_save.disconnect(
process_doc_on_create_atomic, sender=Document, dispatch_uid=DOC_CREATE_UID
)
post_save.disconnect(
process_annot_on_create_atomic, sender=Annotation, dispatch_uid=ANNOT_CREATE_UID
)
post_save.disconnect(
process_doc_on_document_path_create,
sender=DocumentPath,
dispatch_uid=DOC_PATH_CREATE_UID,
)
def pytest_configure(config):
"""Configure pytest settings, including xdist worker handling."""
# Ensure the serial marker is registered only once
if not hasattr(config, "_serial_marker_registered"):
config.addinivalue_line(
"markers",
"serial: mark test to run serially (not in parallel with other tests)",
)
config._serial_marker_registered = True
def pytest_collection_modifyitems(config, items):
"""Modify test collection to handle serial tests when running with xdist."""
# Check if running with xdist by looking at numprocesses option
# Note: workerinput is only on workers, but collection happens on controller
numprocesses = getattr(config.option, "numprocesses", None)
if not numprocesses:
return
# When running with xdist, mark serial tests to run on worker gw0 only
# This ensures they run sequentially without interference from other workers
for item in items:
if item.get_closest_marker("serial"):
# Add xdist_group to ensure all serial tests run on same worker
item.add_marker(pytest.mark.xdist_group(name="serial"))
@pytest.fixture(scope="session")
def django_db_modify_db_settings(django_db_modify_db_settings_parallel_suffix):
"""
Fixture to ensure each xdist worker gets its own database.
This is automatically handled by pytest-django when running with xdist,
but we explicitly include it here for clarity.
"""
pass
def pytest_xdist_setupnodes(config, specs):
"""Called before any workers are created. Use for one-time setup."""
pass
def pytest_xdist_make_scheduler(config, log):
"""
Create a scheduler that respects test grouping.
Using LoadScopeScheduling keeps tests from the same class together,
which is important for setUpClass/setUpTestData patterns.
"""
# Return None to use the default scheduler specified by --dist option
# Users should specify --dist loadscope for proper class-level grouping
return None
@pytest.hookimpl(tryfirst=True)
def pytest_runtest_setup(item):
"""Setup hook that runs before each test."""
# Get worker ID for logging (empty string if not using xdist)
worker_id = os.environ.get("PYTEST_XDIST_WORKER", "")
if worker_id:
# Set worker ID in environment for tests that need to know
os.environ["TEST_WORKER_ID"] = worker_id
# NOTE: We intentionally do NOT call db.close_old_connections() here.
# Hooks run BEFORE pytest-django's db fixtures are applied, so calling
# db.close_old_connections() would fail with "Database access not allowed".
# Connection cleanup is handled in pytest_runtest_teardown() instead,
# which runs AFTER the test when database access is available.
# Ensure a fresh event loop is available for each test.
# This prevents "Event loop is closed" errors when using pydantic-ai's
# run_sync() or other async code with pytest-xdist.
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
raise RuntimeError("Event loop is closed")
except RuntimeError:
# Create a new event loop if one doesn't exist or is closed
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
@pytest.hookimpl(trylast=True)
def pytest_runtest_teardown(item, nextitem):
"""Teardown hook that runs after each test."""
# Clean up any event loop that was created during the test
# to prevent resource leaks, but don't close it as the next test may need it
try:
loop = asyncio.get_event_loop()
if loop.is_running():
# If the loop is still running, we can't close it safely
pass
elif not loop.is_closed():
# Cancel any pending tasks
pending = asyncio.all_tasks(loop) if hasattr(asyncio, "all_tasks") else []
for task in pending:
task.cancel()
except RuntimeError:
# No event loop, nothing to clean up
pass
# For serial tests (which use async code with asyncio.run()), close ALL
# database connections to prevent stale/corrupted connections from affecting
# subsequent tests. asyncio.run() can leave connections in a bad state when
# it closes its event loop.
#
# NOTE: Unlike db.close_old_connections() which checks connection state
# (and requires DB access to be allowed), db.connections.close_all() just
# directly closes connections without state checks. This should be safe
# in teardown, but we wrap in try/except for robustness.
if item.get_closest_marker("serial"):
try:
db.connections.close_all()
except Exception:
# If connection cleanup fails, log but don't fail the test
# This can happen in edge cases with pytest-django fixture teardown
pass