Skip to content

Surface NATS consumer/queue lifecycle events in the job logger so users can see what's happening from the UI #1220

@mihow

Description

@mihow

Summary

Today the NATS orchestration layer (ami/ml/orchestration/nats_queue.py) logs its activity to the module logger (logger = logging.getLogger(__name__)), which means its messages go to stdout / New Relic / docker logs but do not appear in the job's own log stream that users see in the UI.

The result is that when a job misbehaves, users looking at its log see:

  • Successfully queued N/N images to stream for job X
  • A gap
  • Then either normal result messages, or silence, or a cryptic failure

… with no visibility into what the NATS layer was actually doing during that gap. This makes triage (by users, maintainers, and on-call) slow and error-prone because the most important state transitions in the async pipeline are invisible from the record users have.

What to log via job.logger

The _logger pattern used in cleanup_async_job_resources(job_id, job_logger) already exists — extend it to the rest of the lifecycle.

Candidate events to surface on the job logger (not just the module logger), with structured context where possible:

Setup

  • Stream creation / reuse: "Created NATS stream job_<id>" or "Reusing existing NATS stream job_<id> (last_seq=N)"
  • Consumer creation / reuse, with config snapshot: max_deliver, ack_wait, max_ack_pending, deliver_policy, ack_policy
  • Advisory stream / DLQ consumer setup result

Publishing

  • Per-batch (not per-message) progress if more than N messages: "Queued batch 3/12 (100 images) to job_<id>, stream_seq 201-300"
  • Final tally including any publish_task failures (today these just return False with a module-level error)

Fetching / delivery (worker side)

  • First fetch for a job: "Worker <W> fetched first batch (size=8) for job_<id> from stream, consumer_seq=1-8"
  • When reserve_tasks() returns 0 but the worker expected >0: log consumer state (num_pending, num_ack_pending, num_redelivered, delivered.consumer_seq, ack_floor.stream_seq) so a "nothing happening" moment self-diagnoses
  • When a redelivery is observed: "Redelivery of seq=N on job_<id> (attempt 2/5)" — surfaces crash loops early

Cleanup

  • Already logs on cleanup success — also log the final consumer stats before deletion: "Deleting job_<id> consumer (delivered=<n>, ack_floor=<n>, redelivered=<n>)". This is the single most useful forensic line for "what happened to this job" after the fact.

Error paths

  • Every return False or silent swallow in the NATS code should log the exception with context against the job logger, not just the module logger
  • Especially: publish_task failures on individual messages (currently logged only to module logger)

Why this matters

Today, a concrete example: a job was killed by a single ConnectionResetError on Redis during update_state. The user's log contains exactly one line about the failure:

ERROR Job <id> marked as FAILURE: Redis state missing for job

Which is inaccurate (state wasn't missing, Redis briefly dropped the connection) and offers zero actionable detail. The real error, Error while reading from redis:6379 : (104, 'Connection reset by peer'), was only visible by shelling into the celery worker and grepping raw docker logs — not something a user can do.

The same pattern applies to NATS: when a consumer exhausts max_deliver (#1168), the user sees nothing in the job log. When a publish fails for 1 message out of 900, the user sees nothing. When a redelivery happens because a worker crashed, the user sees nothing.

Implementation notes

  • logger = logging.getLogger(__name__) stays — those logs still go to stdout/New Relic and are useful for ops dashboards that aren't per-job.
  • Add an optional job_logger parameter to TaskQueueManager.__init__ or to each method that currently does per-job work, so the manager can log to both the module logger and the job logger when a job context exists.
  • Where a job logger isn't available (e.g., advisory listener running in the background), fall back to the module logger only — don't leak cross-job writes.
  • Do NOT increase log volume in per-message paths — stay batch-granular or event-granular. Per-message logging in a 10k-image job is noise.
  • Key-value structured fields are better than prose when log aggregation is involved (so downstream tools can query on consumer_seq, stream_seq, redelivered, etc.).

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions