fix: KeyError on concurrent same-key job attempts in job_task_contexts#293
Conversation
|
i don't really understand the purpose of this, there must be a simpler way, job.key is globally unique, id() is not stable this fix doesn't seem correct, i don't see why job context can't be keyed on job key |
|
|
||
| async def test_job_task_contexts_same_key_isolation(self) -> None: | ||
| """Two concurrent attempts of a same-``key`` job keep isolated contexts.""" | ||
| from saq.types import JobTaskContext |
There was a problem hiding this comment.
this test seems contrived, i don't really get how it happens in real life
ee41d15 to
139dc0b
Compare
|
please fix the build |
|
Sorry, I'm working on the new implementation as you asked... By the way, it is a real bug. I had ~30 times in 2 days, before I introduced my fix, in production and this was the reason of my PR: Note The race:
Keying on
No |
139dc0b to
c869b18
Compare
Job.__hash__ is derived from .key, so two in-flight attempts of the
same-key job (e.g. a sweep-driven retry of a job whose previous attempt
is still winding down) share one slot in Worker.job_task_contexts.
Whichever attempt's finally pops the slot first leaves the other to
raise KeyError at the completion check:
File ".../saq/worker.py", line 374, in process
if self.job_task_contexts[job]["aborted"] is None:
KeyError: Job<... key: chat:1, attempts: 2, error: cancelled, status: active>
Keep the dict keyed by Job (i.e. by .key), but hold a local reference to
the JobTaskContext inside process() and read "aborted" through it. In
the finally, only remove the slot if it still holds our ctx -- a
successor attempt's slot is preserved. abort() is unchanged.
Adds a regression test that reproduces the race by running two
process() coroutines with same-key jobs and asserting both complete.
c869b18 to
de0cdd4
Compare
|
Now it pass any test again: https://github.com/drizzt/saq/actions/runs/24740433208 |
What and why
Job.__hash__is derived from.key, so two in-flight attempts of the same-key job collapse onto a single slot inWorker.job_task_contexts. This happens in practice when a sweep or rolling-restart retries a job whose previous attempt is still winding down. The first attempt'sfinallyblock pops the second attempt's entry, and the second attempt then crashes at the completion check:Fix
Key
job_task_contextsbyid(job)(unique per PythonJobobject) instead of theJobitself, and carry theJobinsideJobTaskContextsoabort()can still correlate dict entries with refreshed queue state. Each attempt now occupies its own slot and thefinallypop is scoped to that attempt.abort()fetches fresh queue state once perkeyand applies it to every local attempt tracking that key, preserving the existing "one refresh per key" network pattern.Tests
Adds
tests/test_worker.py::test_job_task_contexts_same_key_isolationcovering the two-attempts-same-key case.