You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
While exercising #1222 end-to-end against real ML workers, the ADC worker's POST /api/v2/jobs/<id>/result/ calls repeatedly failed with HTTP 413 (Request Entity Too Large), and after raising body size limits, with HTTP 503 once each request started exceeding gunicorn's default 30 s worker timeout. The 413 was the first symptom, but after reading the current code path it's clear the underlying issue is broader than "the body limit is too low." This issue captures both the observation and the design direction we're leaning toward.
Observed behavior
Payload sizes (proxy error log, two small async_api jobs using 18 source images each, global_moths_2024 pipeline):
Job
Worker
POST body size
18 images
Worker A
321 MB
18 images
Worker B
139 – 301 MB
Per-image that's ~8-17 MB of result JSON, about an order of magnitude larger than what the default 100 MB DATA_UPLOAD_MAX_MEMORY_SIZE assumes.
Redelivery cascade observed on a larger run. A 636-image job was canceled mid-flight after ~16 minutes of struggling with 503s. The #1222 forensic cleanup snapshot captured this consumer state:
Reading that: the consumer had 920 delivery attempts for 636 tasks, only 169 had been acked, 43 were in-flight, and 141 of the attempts were redeliveries because the first attempt never got acked in time. Workers were fine — GPUs busy, inference running — but result POSTs were either 413ing, 503ing, or taking so long that the ~30 s ack_wait window passed before the ack made it back to NATS. The redelivery factor (920/636 ≈ 1.45×) is the cost of that: about 45% of the work was being redone.
This is exactly the kind of silent "workers look busy, job doesn't progress" pattern #1220 was asking for visibility into, and the snapshot is the receipt.
Current architecture (after code read)
The current result-handling path is not quite what the surface behavior suggests. Reading ami/jobs/views.py:302-356 and ami/jobs/tasks.py:50-178:
Worker POSTs {results: [PipelineTaskResult, ...]} with the full reserved batch (currently 139-321 MB).
DRF parses the JSON body synchronously — this blocks the uvicorn worker event loop for as long as the parse takes on a body this size.
MLJobResultsRequestSerializer(...).is_valid(raise_exception=True) runs Pydantic validation over the whole list.
For each result in the validated list, Django calls process_nats_pipeline_result.delay(job_id, result_data, reply_subject) — one celery enqueue per task, via RabbitMQ.
Django returns 200 {"status":"accepted","results_queued":N}without waiting for any save to happen.
A celery worker picks up each task, validates again (PipelineResultsResponse(**result_data)), runs job.pipeline.save_results(...), and then calls _ack_task_via_nats(reply_subject). NATS ack happens inside the celery task, not at HTTP time.
The celery task has max_retries=0 — the comment (tasks.py:50) explicitly defers retry semantics to NATS's max_deliver/ack_wait redelivery.
So the celery-backed save is already in place. The HTTP request is not waiting on save_results(). What it is waiting on is the JSON parse of a ~200 MB body, plus the per-task fan-out to RabbitMQ inside the request handler. Both are expensive enough on large payloads to blow through gunicorn's worker timeout on a non-trivial fraction of requests, which is what produces the 503 → NATS redelivery cascade.
Also relevant: the per-task result_data dict is being shipped to RabbitMQ as the celery task argument. That's using the message broker as a heavyweight object store for N copies of the same large payload that was just uploaded over HTTP — a re-serialization step that a durable intermediate would eliminate.
Requirements (from discussion)
The refactor should satisfy, in order of importance:
POSTs should be fast and the worker should keep moving. The worker should not be holding a connection open for tens of seconds per batch just to hand over bytes.
Results should be saved safely before ingest, and should look structurally valid. There needs to be a durable place where a payload lives between "worker says it's done" and "Django has ingested it into the relational database." A payload that fails schema validation should be preserved (with a reason) rather than silently dropped.
These are directions to discuss, not a decided plan. Listed roughly in order of how cleanly they satisfy the three requirements above.
A. Durable intermediate storage between worker and Django ingest
Worker writes the raw result payload to an object store (S3-compatible; the same storage used for source images could be reused, or a dedicated bucket for ingest queuing), then POSTs a short JSON to Django with {reply_subject, bucket_key, content_length, content_hash}. Django enqueues a celery task that reads the object from the bucket, validates it, saves it, and acks NATS. The HTTP POST handler never touches a body larger than a few KB regardless of pipeline output density.
Pros: Fully satisfies all three requirements. POST is cheap. Payload is durable and inspectable before ingest. Retry becomes "re-run the celery task against the same object" — fully idempotent if save_results() is idempotent per source image. Failed objects can be moved to a failed/ prefix as a native DLQ. The on-disk object is a great artifact for post-mortem investigation of malformed results.
Cons: Biggest change. Requires coordinated ADC-side and Django-side work. Bucket lifecycle policy needed (how long do successfully-ingested objects stick around?). Validation needs to move: pre-POST for schema sanity, post-POST in the celery task for semantic checks. Requires the worker to have bucket credentials or a presigned-URL flow.
B. Keep HTTP POST, but stop using RabbitMQ as a bulk carrier
The HTTP POST already returns before saves happen — that part isn't broken. What's broken is that (i) DRF parses 200 MB of JSON on the event loop, and (ii) the full result_data dict gets shipped to RabbitMQ once per task. A middle-ground refactor:
Stream-parse the request body (or hand the raw body to a celery task by blob, rather than a Python dict)
Write the parsed result_data for each task to a local disk queue or postgres BLOB, enqueue a small celery task that references the stored record by id instead of carrying the dict
Keep NATS ack inside the celery task, where it already is
Pros: Smaller change than (A). Doesn't require an object store. Keeps the existing POST contract for the ADC side.
Cons: Still puts the full body through the HTTP layer (so nginx/Django still need generous body limits). Doesn't give us the "failed object frozen in a bucket for inspection" win from (A). Postgres-BLOB-as-queue has its own operational costs.
C. Worker-side adaptive batching (tactical, not architectural)
Worker posts incrementally — ideally one result at a time, or in small fixed-size chunks (e.g. 10 results or 10 MB per POST, whichever is smaller). Adaptive: on 5xx, halve the batch size and retry; on sustained success, grow it back.
Pros: Smallest change. Only touches ADC. Fits under today's server-side contract. Lets each successful result get acked to NATS independently so a mid-batch failure doesn't unwind work already done. Gives the per-job log from #1222 meaningful progress granularity ("1/18 results posted" instead of a 30 s silent gap followed by a 300 MB upload).
Cons: Doesn't solve the fundamental mismatch between "big payload" and "HTTP POST as the carrier." Larger pipelines with denser detections will eventually produce a single-task result that's still 20-30 MB; at that point batching further doesn't help and we're back to needing (A) or (B). Treat as a stopgap, not the end state.
What we still need to verify
Before committing to any of the above, concrete things to answer:
Is job.pipeline.save_results() idempotent per source image? The retry story in (A) and (B) depends on "re-run the task against the same payload" being safe. If it's idempotent only in the happy path and creates duplicate detections/classifications on re-run, we need to fix that first.
What's driving the 8-17 MB per-image payload? Is it unusually dense detections for this project, or is every pipeline output this large? If it's project-specific, (C) alone buys us more time than it looks.
Does the NATS consumer's ack_wait interact badly with our current 30 s gunicorn timeout? On the canceled 636-image run, the symptom was specifically that redelivery fired before successful POSTs could complete. A longer ack_wait (or a smaller batch per POST) would bend the same curve.
Streaming vs. buffered request body in DRF + UvicornWorker — whether DRF even supports stream-parsing an array of Pydantic-typed results without materializing the whole list. If not, (B) is significantly harder than it looks.
Bucket credentials from the ADC worker side — does the worker already have credentials for the project's object storage, or would that be net-new surface area? Affects the cost of (A).
PSv2: Refactor /tasks and /result endpoints — GET→POST, proper serializers, service name cleanup #1141 (refactor /tasks and /result endpoints — GET→POST, DRF serializers, service name cleanup) — contract cleanup that touches the same files. If this issue moves forward with (A), the endpoint contract will change anyway (small body, bucket pointer), and the two refactors should be sequenced together rather than landed independently.
Evaluate Celery result backend and broker architecture #1189 (evaluate Celery result backend and broker architecture) — adjacent platform work. Not a prerequisite, but the outcome of that evaluation may influence which of (A)/(B) is easiest.
Not a plan to just keep bumping body size limits. The existing bump to 500 MB was an unblock-testing patch, not a design decision; that 500 MB ceiling will get blown through by the next pipeline with denser detections or a larger reserved batch size.
Not a plan to batch-compress. Adds a CPU/memory hit on the worker and opaque failure modes when the receiving side can't decompress.
Not scoped to fix NATS semantics or retry counts. max_deliver and ack_wait are tuning knobs, not the root cause; tuning them is a last resort if none of (A)/(B)/(C) is viable.
Where to look (for implementers)
ami-data-companion result-posting code path (the worker-side caller of POST /api/v2/jobs/<id>/result/)
How reserved tasks are grouped into a result payload in the ADC worker — whether one reserve_tasks call becomes one POST /result/ by design or by accident
ami/jobs/views.py:302-356 — the current result action on JobViewSet
ami/jobs/tasks.py:50-178 — process_nats_pipeline_result (the celery task that actually does the saving)
Whatever job.pipeline.save_results() resolves to on the Pipeline model — idempotency is load-bearing for any retry scheme built on top of this
Context
While exercising #1222 end-to-end against real ML workers, the ADC worker's
POST /api/v2/jobs/<id>/result/calls repeatedly failed with HTTP 413 (Request Entity Too Large), and after raising body size limits, with HTTP 503 once each request started exceeding gunicorn's default 30 s worker timeout. The 413 was the first symptom, but after reading the current code path it's clear the underlying issue is broader than "the body limit is too low." This issue captures both the observation and the design direction we're leaning toward.Observed behavior
Payload sizes (proxy error log, two small async_api jobs using 18 source images each,
global_moths_2024pipeline):Per-image that's ~8-17 MB of result JSON, about an order of magnitude larger than what the default 100 MB
DATA_UPLOAD_MAX_MEMORY_SIZEassumes.Redelivery cascade observed on a larger run. A 636-image job was canceled mid-flight after ~16 minutes of struggling with 503s. The #1222 forensic cleanup snapshot captured this consumer state:
Reading that: the consumer had 920 delivery attempts for 636 tasks, only 169 had been acked, 43 were in-flight, and 141 of the attempts were redeliveries because the first attempt never got acked in time. Workers were fine — GPUs busy, inference running — but result POSTs were either 413ing, 503ing, or taking so long that the ~30 s
ack_waitwindow passed before the ack made it back to NATS. The redelivery factor (920/636 ≈ 1.45×) is the cost of that: about 45% of the work was being redone.This is exactly the kind of silent "workers look busy, job doesn't progress" pattern #1220 was asking for visibility into, and the snapshot is the receipt.
Current architecture (after code read)
The current result-handling path is not quite what the surface behavior suggests. Reading
ami/jobs/views.py:302-356andami/jobs/tasks.py:50-178:{results: [PipelineTaskResult, ...]}with the full reserved batch (currently 139-321 MB).MLJobResultsRequestSerializer(...).is_valid(raise_exception=True)runs Pydantic validation over the whole list.process_nats_pipeline_result.delay(job_id, result_data, reply_subject)— one celery enqueue per task, via RabbitMQ.200 {"status":"accepted","results_queued":N}without waiting for any save to happen.PipelineResultsResponse(**result_data)), runsjob.pipeline.save_results(...), and then calls_ack_task_via_nats(reply_subject). NATS ack happens inside the celery task, not at HTTP time.max_retries=0— the comment (tasks.py:50) explicitly defers retry semantics to NATS'smax_deliver/ack_waitredelivery.So the celery-backed save is already in place. The HTTP request is not waiting on
save_results(). What it is waiting on is the JSON parse of a ~200 MB body, plus the per-task fan-out to RabbitMQ inside the request handler. Both are expensive enough on large payloads to blow through gunicorn's worker timeout on a non-trivial fraction of requests, which is what produces the 503 → NATS redelivery cascade.Also relevant: the per-task
result_datadict is being shipped to RabbitMQ as the celery task argument. That's using the message broker as a heavyweight object store for N copies of the same large payload that was just uploaded over HTTP — a re-serialization step that a durable intermediate would eliminate.Requirements (from discussion)
The refactor should satisfy, in order of importance:
Design directions
These are directions to discuss, not a decided plan. Listed roughly in order of how cleanly they satisfy the three requirements above.
A. Durable intermediate storage between worker and Django ingest
Worker writes the raw result payload to an object store (S3-compatible; the same storage used for source images could be reused, or a dedicated bucket for ingest queuing), then POSTs a short JSON to Django with
{reply_subject, bucket_key, content_length, content_hash}. Django enqueues a celery task that reads the object from the bucket, validates it, saves it, and acks NATS. The HTTP POST handler never touches a body larger than a few KB regardless of pipeline output density.Pros: Fully satisfies all three requirements. POST is cheap. Payload is durable and inspectable before ingest. Retry becomes "re-run the celery task against the same object" — fully idempotent if
save_results()is idempotent per source image. Failed objects can be moved to afailed/prefix as a native DLQ. The on-disk object is a great artifact for post-mortem investigation of malformed results.Cons: Biggest change. Requires coordinated ADC-side and Django-side work. Bucket lifecycle policy needed (how long do successfully-ingested objects stick around?). Validation needs to move: pre-POST for schema sanity, post-POST in the celery task for semantic checks. Requires the worker to have bucket credentials or a presigned-URL flow.
B. Keep HTTP POST, but stop using RabbitMQ as a bulk carrier
The HTTP POST already returns before saves happen — that part isn't broken. What's broken is that (i) DRF parses 200 MB of JSON on the event loop, and (ii) the full
result_datadict gets shipped to RabbitMQ once per task. A middle-ground refactor:result_datafor each task to a local disk queue or postgres BLOB, enqueue a small celery task that references the stored record by id instead of carrying the dictPros: Smaller change than (A). Doesn't require an object store. Keeps the existing POST contract for the ADC side.
Cons: Still puts the full body through the HTTP layer (so nginx/Django still need generous body limits). Doesn't give us the "failed object frozen in a bucket for inspection" win from (A). Postgres-BLOB-as-queue has its own operational costs.
C. Worker-side adaptive batching (tactical, not architectural)
Worker posts incrementally — ideally one result at a time, or in small fixed-size chunks (e.g. 10 results or 10 MB per POST, whichever is smaller). Adaptive: on 5xx, halve the batch size and retry; on sustained success, grow it back.
Pros: Smallest change. Only touches ADC. Fits under today's server-side contract. Lets each successful result get acked to NATS independently so a mid-batch failure doesn't unwind work already done. Gives the per-job log from #1222 meaningful progress granularity ("1/18 results posted" instead of a 30 s silent gap followed by a 300 MB upload).
Cons: Doesn't solve the fundamental mismatch between "big payload" and "HTTP POST as the carrier." Larger pipelines with denser detections will eventually produce a single-task result that's still 20-30 MB; at that point batching further doesn't help and we're back to needing (A) or (B). Treat as a stopgap, not the end state.
What we still need to verify
Before committing to any of the above, concrete things to answer:
job.pipeline.save_results()idempotent per source image? The retry story in (A) and (B) depends on "re-run the task against the same payload" being safe. If it's idempotent only in the happy path and creates duplicate detections/classifications on re-run, we need to fix that first.ack_waitinteract badly with our current 30 s gunicorn timeout? On the canceled 636-image run, the symptom was specifically that redelivery fired before successful POSTs could complete. A longerack_wait(or a smaller batch per POST) would bend the same curve.Relationship to other tickets
max_deliver) — the failure-detection side. With (A) or (B) in place, PSv2: Async jobs hang forever when NATS tasks exhaust max_deliver without posting results #1168's scope should expand: detection also needs to cover "objects sitting in the bucket that the celery task has failed N times on." Likely co-dependent./tasksand/resultendpoints — GET→POST, DRF serializers, service name cleanup) — contract cleanup that touches the same files. If this issue moves forward with (A), the endpoint contract will change anyway (small body, bucket pointer), and the two refactors should be sequenced together rather than landed independently.async_apijobs killed by transient Redis errors duringupdate_state—RedisErrorand "state actually missing" are conflated into a single fatal path #1219 (transient Redis errors conflated into fatal path) — already makes the happy path more resilient. Not a prerequisite for this refactor but reduces the noise that would otherwise compete with any new retry logic.What this issue is NOT
max_deliverandack_waitare tuning knobs, not the root cause; tuning them is a last resort if none of (A)/(B)/(C) is viable.Where to look (for implementers)
ami-data-companionresult-posting code path (the worker-side caller ofPOST /api/v2/jobs/<id>/result/)reserve_taskscall becomes onePOST /result/by design or by accidentami/jobs/views.py:302-356— the currentresultaction onJobViewSetami/jobs/tasks.py:50-178—process_nats_pipeline_result(the celery task that actually does the saving)job.pipeline.save_results()resolves to on thePipelinemodel — idempotency is load-bearing for any retry scheme built on top of this