Skip to content

Bound async output-sink retries + per-sink circuit breaker#67

Open
haileyok wants to merge 1 commit into
mainfrom
hailey/async-sink-circuit-breaker
Open

Bound async output-sink retries + per-sink circuit breaker#67
haileyok wants to merge 1 commit into
mainfrom
hailey/async-sink-circuit-breaker

Conversation

@haileyok

Copy link
Copy Markdown
Collaborator

Problem

AsyncMultiOutputSink._push_one retries an unhealthy sink with no overall deadline and no circuit breaker. Since AsyncRulesSink processes one message at a time and a single AsyncMultiOutputSink is shared across every stream, one slow downstream — e.g. smite-labels going gRPC UNAVAILABLE during a GKE node-pool auto-upgrade drain — blocks every stream → gRPC flow-control backpressure → fleet ingestion stall (throughput collapse, growing backlog), then snap-recovery once the downstream returns. For the label sink (timeout=5.0, max_retries=5) a single message can block its stream for tens of seconds, and without a breaker every label-effect message keeps paying that cost.

Fix

Bound each per-sink push three independent ways:

  • per-attempt timeout (sink.timeout) with a capped exponential backoff (max_backoff_seconds),
  • an overall per-push deadline (max_total_push_seconds) across all attempts + backoff, and
  • a per-sink circuit breaker: opens after circuit_failure_threshold consecutive failures, skips that sink for circuit_cooldown_seconds, then takes a half-open probe.

New metrics: output_sink.circuit_open, output_sink.circuit_opened, output_sink.deadline_exhausted. A fully-failed push still never propagates, so the one-at-a-time stream loop is preserved. Defaults: max_total_push_seconds=10, circuit_failure_threshold=5, circuit_cooldown_seconds=5, max_backoff_seconds=1.

The breaker lives on the shared AsyncMultiOutputSink (one per process, shared across streams), so once a downstream trips, every stream fast-skips it. No lock is needed: all breaker state mutations are await-free on the single-threaded event loop.

Tradeoffs (intentional)

  • Open circuit drops effects. While a sink's circuit is open its pushes are skipped and the message is still ACKed (dropped, not queued/DLQ'd). This matches prior behavior — it already dropped after retry exhaustion — the breaker just makes it happen faster and in bulk during an outage. A durable queue would be separate work.
  • First-wave cost. Up to concurrency in-flight messages still pay the deadline before the breaker trips. Defaults are conservative; tune per-deployment if the first wave matters.

Testing

9 new unit tests: overall-deadline bound, capped backoff, open-after-threshold + skip, half-open re-probe, half-open → closed recovery, success resets the failure counter, per-sink isolation (tees to healthy siblings while one circuit is open), healthy sink never trips, and "push never raises when all sinks fail." The sink test file passes 17/17, stable across repeated runs. ruff + mypy clean.

Reproduction (red on the prior code, green after this change): a single push to a hard-down sink went from unbounded (601ms in the scaled repro; ~30s with the real label sink) to bounded by the deadline, and the post-trip push from 601ms to ~0.05ms.

Integration (discord/discord, after merge)

Bump the pinned osprey commit to this merge SHA:

  • discord_smite/requirements.in + discord_smite/requirements.txt (osprey-async-worker / osprey-worker / osprey-rpc)
  • discord_devops/docker/discord_smite/osprey_coordinator.Dockerfile (OSPREY_COMMIT)

AsyncMultiOutputSink._push_one retried an unhealthy sink with no overall deadline and no circuit breaker. Because AsyncRulesSink processes one message at a time and the sink is shared across every stream, a single slow downstream (e.g. labels gRPC UNAVAILABLE during a node drain) blocked every stream -> flow-control backpressure -> ingestion stall.

Bound each per-sink push three ways: per-attempt timeout with capped backoff, an overall max_total_push_seconds deadline, and a per-sink circuit breaker (opens after circuit_failure_threshold consecutive failures, half-open probe after circuit_cooldown_seconds). Add output_sink.circuit_open/.circuit_opened/.deadline_exhausted metrics. A fully-failed push still never propagates, so the stream loop is preserved.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant