Skip to content

Decouple ML result ingestion from the HTTP POST: large bodies, no durable handoff, no retry-record path #1223

@mihow

Description

@mihow

Context

While exercising #1222 end-to-end against real ML workers, the ADC worker's POST /api/v2/jobs/<id>/result/ calls repeatedly failed with HTTP 413 (Request Entity Too Large), and after raising body size limits, with HTTP 503 once each request started exceeding gunicorn's default 30 s worker timeout. The 413 was the first symptom, but after reading the current code path it's clear the underlying issue is broader than "the body limit is too low." This issue captures both the observation and the design direction we're leaning toward.

Observed behavior

Payload sizes (proxy error log, two small async_api jobs using 18 source images each, global_moths_2024 pipeline):

Job Worker POST body size
18 images Worker A 321 MB
18 images Worker B 139 – 301 MB

Per-image that's ~8-17 MB of result JSON, about an order of magnitude larger than what the default 100 MB DATA_UPLOAD_MAX_MEMORY_SIZE assumes.

Redelivery cascade observed on a larger run. A 636-image job was canceled mid-flight after ~16 minutes of struggling with 503s. The #1222 forensic cleanup snapshot captured this consumer state:

Finalizing NATS consumer job-<id>-consumer before deletion
  (delivered=920 ack_floor=169 num_pending=366 num_ack_pending=43 num_redelivered=141)

Reading that: the consumer had 920 delivery attempts for 636 tasks, only 169 had been acked, 43 were in-flight, and 141 of the attempts were redeliveries because the first attempt never got acked in time. Workers were fine — GPUs busy, inference running — but result POSTs were either 413ing, 503ing, or taking so long that the ~30 s ack_wait window passed before the ack made it back to NATS. The redelivery factor (920/636 ≈ 1.45×) is the cost of that: about 45% of the work was being redone.

This is exactly the kind of silent "workers look busy, job doesn't progress" pattern #1220 was asking for visibility into, and the snapshot is the receipt.

Current architecture (after code read)

The current result-handling path is not quite what the surface behavior suggests. Reading ami/jobs/views.py:302-356 and ami/jobs/tasks.py:50-178:

  1. Worker POSTs {results: [PipelineTaskResult, ...]} with the full reserved batch (currently 139-321 MB).
  2. DRF parses the JSON body synchronously — this blocks the uvicorn worker event loop for as long as the parse takes on a body this size.
  3. MLJobResultsRequestSerializer(...).is_valid(raise_exception=True) runs Pydantic validation over the whole list.
  4. For each result in the validated list, Django calls process_nats_pipeline_result.delay(job_id, result_data, reply_subject) — one celery enqueue per task, via RabbitMQ.
  5. Django returns 200 {"status":"accepted","results_queued":N} without waiting for any save to happen.
  6. A celery worker picks up each task, validates again (PipelineResultsResponse(**result_data)), runs job.pipeline.save_results(...), and then calls _ack_task_via_nats(reply_subject). NATS ack happens inside the celery task, not at HTTP time.
  7. The celery task has max_retries=0 — the comment (tasks.py:50) explicitly defers retry semantics to NATS's max_deliver/ack_wait redelivery.

So the celery-backed save is already in place. The HTTP request is not waiting on save_results(). What it is waiting on is the JSON parse of a ~200 MB body, plus the per-task fan-out to RabbitMQ inside the request handler. Both are expensive enough on large payloads to blow through gunicorn's worker timeout on a non-trivial fraction of requests, which is what produces the 503 → NATS redelivery cascade.

Also relevant: the per-task result_data dict is being shipped to RabbitMQ as the celery task argument. That's using the message broker as a heavyweight object store for N copies of the same large payload that was just uploaded over HTTP — a re-serialization step that a durable intermediate would eliminate.

Requirements (from discussion)

The refactor should satisfy, in order of importance:

  1. POSTs should be fast and the worker should keep moving. The worker should not be holding a connection open for tens of seconds per batch just to hand over bytes.
  2. Results should be saved safely before ingest, and should look structurally valid. There needs to be a durable place where a payload lives between "worker says it's done" and "Django has ingested it into the relational database." A payload that fails schema validation should be preserved (with a reason) rather than silently dropped.
  3. Results should actually be ingested into the database, or the failure should be recorded. If ingest fails, the system should either retry it idempotently or surface the failure explicitly on the job record. Not "job stays STARTED forever" (which is what PSv2: Async jobs hang forever when NATS tasks exhaust max_deliver without posting results #1168 is about today) and not "results silently vanish." The per-job log from feat(jobs): make NATS queue activity visible in async ML job logs #1222 is the right surface for the user-visible side of this.

Design directions

These are directions to discuss, not a decided plan. Listed roughly in order of how cleanly they satisfy the three requirements above.

A. Durable intermediate storage between worker and Django ingest

Worker writes the raw result payload to an object store (S3-compatible; the same storage used for source images could be reused, or a dedicated bucket for ingest queuing), then POSTs a short JSON to Django with {reply_subject, bucket_key, content_length, content_hash}. Django enqueues a celery task that reads the object from the bucket, validates it, saves it, and acks NATS. The HTTP POST handler never touches a body larger than a few KB regardless of pipeline output density.

Pros: Fully satisfies all three requirements. POST is cheap. Payload is durable and inspectable before ingest. Retry becomes "re-run the celery task against the same object" — fully idempotent if save_results() is idempotent per source image. Failed objects can be moved to a failed/ prefix as a native DLQ. The on-disk object is a great artifact for post-mortem investigation of malformed results.

Cons: Biggest change. Requires coordinated ADC-side and Django-side work. Bucket lifecycle policy needed (how long do successfully-ingested objects stick around?). Validation needs to move: pre-POST for schema sanity, post-POST in the celery task for semantic checks. Requires the worker to have bucket credentials or a presigned-URL flow.

B. Keep HTTP POST, but stop using RabbitMQ as a bulk carrier

The HTTP POST already returns before saves happen — that part isn't broken. What's broken is that (i) DRF parses 200 MB of JSON on the event loop, and (ii) the full result_data dict gets shipped to RabbitMQ once per task. A middle-ground refactor:

  • Stream-parse the request body (or hand the raw body to a celery task by blob, rather than a Python dict)
  • Write the parsed result_data for each task to a local disk queue or postgres BLOB, enqueue a small celery task that references the stored record by id instead of carrying the dict
  • Keep NATS ack inside the celery task, where it already is

Pros: Smaller change than (A). Doesn't require an object store. Keeps the existing POST contract for the ADC side.

Cons: Still puts the full body through the HTTP layer (so nginx/Django still need generous body limits). Doesn't give us the "failed object frozen in a bucket for inspection" win from (A). Postgres-BLOB-as-queue has its own operational costs.

C. Worker-side adaptive batching (tactical, not architectural)

Worker posts incrementally — ideally one result at a time, or in small fixed-size chunks (e.g. 10 results or 10 MB per POST, whichever is smaller). Adaptive: on 5xx, halve the batch size and retry; on sustained success, grow it back.

Pros: Smallest change. Only touches ADC. Fits under today's server-side contract. Lets each successful result get acked to NATS independently so a mid-batch failure doesn't unwind work already done. Gives the per-job log from #1222 meaningful progress granularity ("1/18 results posted" instead of a 30 s silent gap followed by a 300 MB upload).

Cons: Doesn't solve the fundamental mismatch between "big payload" and "HTTP POST as the carrier." Larger pipelines with denser detections will eventually produce a single-task result that's still 20-30 MB; at that point batching further doesn't help and we're back to needing (A) or (B). Treat as a stopgap, not the end state.

What we still need to verify

Before committing to any of the above, concrete things to answer:

  • Is job.pipeline.save_results() idempotent per source image? The retry story in (A) and (B) depends on "re-run the task against the same payload" being safe. If it's idempotent only in the happy path and creates duplicate detections/classifications on re-run, we need to fix that first.
  • What's driving the 8-17 MB per-image payload? Is it unusually dense detections for this project, or is every pipeline output this large? If it's project-specific, (C) alone buys us more time than it looks.
  • Does the NATS consumer's ack_wait interact badly with our current 30 s gunicorn timeout? On the canceled 636-image run, the symptom was specifically that redelivery fired before successful POSTs could complete. A longer ack_wait (or a smaller batch per POST) would bend the same curve.
  • Streaming vs. buffered request body in DRF + UvicornWorker — whether DRF even supports stream-parsing an array of Pydantic-typed results without materializing the whole list. If not, (B) is significantly harder than it looks.
  • Bucket credentials from the ADC worker side — does the worker already have credentials for the project's object storage, or would that be net-new surface area? Affects the cost of (A).

Relationship to other tickets

What this issue is NOT

  • Not a plan to just keep bumping body size limits. The existing bump to 500 MB was an unblock-testing patch, not a design decision; that 500 MB ceiling will get blown through by the next pipeline with denser detections or a larger reserved batch size.
  • Not a plan to batch-compress. Adds a CPU/memory hit on the worker and opaque failure modes when the receiving side can't decompress.
  • Not scoped to fix NATS semantics or retry counts. max_deliver and ack_wait are tuning knobs, not the root cause; tuning them is a last resort if none of (A)/(B)/(C) is viable.

Where to look (for implementers)

  • ami-data-companion result-posting code path (the worker-side caller of POST /api/v2/jobs/<id>/result/)
  • How reserved tasks are grouped into a result payload in the ADC worker — whether one reserve_tasks call becomes one POST /result/ by design or by accident
  • ami/jobs/views.py:302-356 — the current result action on JobViewSet
  • ami/jobs/tasks.py:50-178process_nats_pipeline_result (the celery task that actually does the saving)
  • Whatever job.pipeline.save_results() resolves to on the Pipeline model — idempotency is load-bearing for any retry scheme built on top of this

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions