Summary
Today the NATS orchestration layer (ami/ml/orchestration/nats_queue.py) logs its activity to the module logger (logger = logging.getLogger(__name__)), which means its messages go to stdout / New Relic / docker logs but do not appear in the job's own log stream that users see in the UI.
The result is that when a job misbehaves, users looking at its log see:
Successfully queued N/N images to stream for job X
- A gap
- Then either normal result messages, or silence, or a cryptic failure
… with no visibility into what the NATS layer was actually doing during that gap. This makes triage (by users, maintainers, and on-call) slow and error-prone because the most important state transitions in the async pipeline are invisible from the record users have.
What to log via job.logger
The _logger pattern used in cleanup_async_job_resources(job_id, job_logger) already exists — extend it to the rest of the lifecycle.
Candidate events to surface on the job logger (not just the module logger), with structured context where possible:
Setup
- Stream creation / reuse:
"Created NATS stream job_<id>" or "Reusing existing NATS stream job_<id> (last_seq=N)"
- Consumer creation / reuse, with config snapshot:
max_deliver, ack_wait, max_ack_pending, deliver_policy, ack_policy
- Advisory stream / DLQ consumer setup result
Publishing
- Per-batch (not per-message) progress if more than N messages:
"Queued batch 3/12 (100 images) to job_<id>, stream_seq 201-300"
- Final tally including any
publish_task failures (today these just return False with a module-level error)
Fetching / delivery (worker side)
- First fetch for a job:
"Worker <W> fetched first batch (size=8) for job_<id> from stream, consumer_seq=1-8"
- When
reserve_tasks() returns 0 but the worker expected >0: log consumer state (num_pending, num_ack_pending, num_redelivered, delivered.consumer_seq, ack_floor.stream_seq) so a "nothing happening" moment self-diagnoses
- When a redelivery is observed:
"Redelivery of seq=N on job_<id> (attempt 2/5)" — surfaces crash loops early
Cleanup
- Already logs on cleanup success — also log the final consumer stats before deletion:
"Deleting job_<id> consumer (delivered=<n>, ack_floor=<n>, redelivered=<n>)". This is the single most useful forensic line for "what happened to this job" after the fact.
Error paths
- Every
return False or silent swallow in the NATS code should log the exception with context against the job logger, not just the module logger
- Especially:
publish_task failures on individual messages (currently logged only to module logger)
Why this matters
Today, a concrete example: a job was killed by a single ConnectionResetError on Redis during update_state. The user's log contains exactly one line about the failure:
ERROR Job <id> marked as FAILURE: Redis state missing for job
Which is inaccurate (state wasn't missing, Redis briefly dropped the connection) and offers zero actionable detail. The real error, Error while reading from redis:6379 : (104, 'Connection reset by peer'), was only visible by shelling into the celery worker and grepping raw docker logs — not something a user can do.
The same pattern applies to NATS: when a consumer exhausts max_deliver (#1168), the user sees nothing in the job log. When a publish fails for 1 message out of 900, the user sees nothing. When a redelivery happens because a worker crashed, the user sees nothing.
Implementation notes
logger = logging.getLogger(__name__) stays — those logs still go to stdout/New Relic and are useful for ops dashboards that aren't per-job.
- Add an optional
job_logger parameter to TaskQueueManager.__init__ or to each method that currently does per-job work, so the manager can log to both the module logger and the job logger when a job context exists.
- Where a job logger isn't available (e.g., advisory listener running in the background), fall back to the module logger only — don't leak cross-job writes.
- Do NOT increase log volume in per-message paths — stay batch-granular or event-granular. Per-message logging in a 10k-image job is noise.
- Key-value structured fields are better than prose when log aggregation is involved (so downstream tools can query on
consumer_seq, stream_seq, redelivered, etc.).
Related
Summary
Today the NATS orchestration layer (
ami/ml/orchestration/nats_queue.py) logs its activity to the module logger (logger = logging.getLogger(__name__)), which means its messages go to stdout / New Relic / docker logs but do not appear in the job's own log stream that users see in the UI.The result is that when a job misbehaves, users looking at its log see:
Successfully queued N/N images to stream for job X… with no visibility into what the NATS layer was actually doing during that gap. This makes triage (by users, maintainers, and on-call) slow and error-prone because the most important state transitions in the async pipeline are invisible from the record users have.
What to log via
job.loggerThe
_loggerpattern used incleanup_async_job_resources(job_id, job_logger)already exists — extend it to the rest of the lifecycle.Candidate events to surface on the job logger (not just the module logger), with structured context where possible:
Setup
"Created NATS stream job_<id>"or"Reusing existing NATS stream job_<id> (last_seq=N)"max_deliver,ack_wait,max_ack_pending,deliver_policy,ack_policyPublishing
"Queued batch 3/12 (100 images) to job_<id>, stream_seq 201-300"publish_taskfailures (today these justreturn Falsewith a module-level error)Fetching / delivery (worker side)
"Worker <W> fetched first batch (size=8) for job_<id> from stream, consumer_seq=1-8"reserve_tasks()returns 0 but the worker expected >0: log consumer state (num_pending,num_ack_pending,num_redelivered,delivered.consumer_seq,ack_floor.stream_seq) so a "nothing happening" moment self-diagnoses"Redelivery of seq=N on job_<id> (attempt 2/5)"— surfaces crash loops earlyCleanup
"Deleting job_<id> consumer (delivered=<n>, ack_floor=<n>, redelivered=<n>)". This is the single most useful forensic line for "what happened to this job" after the fact.Error paths
return Falseor silent swallow in the NATS code should log the exception with context against the job logger, not just the module loggerpublish_taskfailures on individual messages (currently logged only to module logger)Why this matters
Today, a concrete example: a job was killed by a single
ConnectionResetErroron Redis duringupdate_state. The user's log contains exactly one line about the failure:Which is inaccurate (state wasn't missing, Redis briefly dropped the connection) and offers zero actionable detail. The real error,
Error while reading from redis:6379 : (104, 'Connection reset by peer'), was only visible by shelling into the celery worker and grepping raw docker logs — not something a user can do.The same pattern applies to NATS: when a consumer exhausts
max_deliver(#1168), the user sees nothing in the job log. When a publish fails for 1 message out of 900, the user sees nothing. When a redelivery happens because a worker crashed, the user sees nothing.Implementation notes
logger = logging.getLogger(__name__)stays — those logs still go to stdout/New Relic and are useful for ops dashboards that aren't per-job.job_loggerparameter toTaskQueueManager.__init__or to each method that currently does per-job work, so the manager can log to both the module logger and the job logger when a job context exists.consumer_seq,stream_seq,redelivered, etc.).Related