Worker-connection-gated readiness probe for the coordinator#36
Open
cmttt wants to merge 9 commits into
Open
Conversation
Adds an asyncio-native osprey worker as a sibling package to the gevent worker. Includes coordinator input stream, sinks (rules, output), pigeon client + interceptors, etcd-backed sources provider, executor, and engine. Coordinator gains a bidirectional stream and pigeon module. Subsequent tuning to match the gevent baseline: - Move recompile off the event loop; force gc.collect after engine swap - Match gevent's compile CPU profile (and later remove the gc.collect hack) - Empirically widen pause-input-stream jitter (final: 0-600s) - Preserve WatchMux dedup state across watcher iteration
Engine: AST/grammar tweaks, executor and execution-context refinements, UDF helpers, stdlib UDF updates (categories, experiments, json_data, rules, string), external service utils. Worker: udf_register, osprey_engine, sources_provider (incl. skipping engine recompile on no-op etcd sources events), output/rules sinks. Plus repo housekeeping: CODEOWNERS, PR template, dev workflow doc, and a small UI polish in FeatureSelectModal.
- _mux.py / test_watcher_mux.py: preserve dedup state correctly across watcher iteration. - watcherd_impl.py: downgrade etcd watcher reconnect logs from exception to warning (these reconnects are routine, not errors). - action.proto + generated pb2/pb2.pyi: add GuildBatchMLScore action.
add and register count regex matches udf
* Bootstrap async UDFs through first-party plugin hook Move the async-stdlib MXLookup override from a hardcoded list inside bootstrap_async_udfs into a small first-party pluggy plugin module (stdlib_udfs/_async_stdlib_plugin.py). The plugin registers async-native stdlib replacements through the same register_udfs hook used by discord_osprey_async_plugins. _deduplicate_udfs already handles override-by-class-name, so MXLookup (async) automatically shadows MXLookup (sync) — no special path. Adding a new async stdlib override now means: drop the class file in stdlib_udfs/ and append it to _async_stdlib_plugin.register_udfs(). No edits to bootstrap_async_udfs required. * Tests for async UDF bootstrap behavior Lock down that bootstrap_async_udfs: - resolves MXLookup to the async-native class (not sync stdlib) - doesn't leak SyncMXLookup into the registry - preserves non-overridden stdlib UDFs (JsonData, Rule) - registers the first-party async-stdlib plugin via the same hook used by third-party plugins - emits AsyncMXLookup through the register_udfs hook
The K8s readiness probe on osprey-coordinator-asyncio currently returns
SERVING the moment pigeon binds port 80 — well before any async worker
has dialed in via the bidi stream. discord_api → coord process_action
calls land on a freshly-Ready pod, get enqueued onto a priority queue
with no consumers, and time out with DEADLINE_EXCEEDED.
This change wires a custom HealthChecker on the sync_action service
that reports healthy only when at least one worker is connected via
bidi:
* `OspreyCoordinatorServer` (the bidi server) now owns an
`Arc<AtomicI64>` connected-worker counter.
* Each bidi stream handler increments on entry and decrements via a
`WorkerConnectionGuard` Drop impl, so disconnects (graceful, panic,
cancellation) all release the count.
* `pigeon::Server::with_health_checker` is added so a closure can be
attached to every gRPC service registered on the Server.
* `main.rs` swaps `pigeon::serve(sync_action, ...)` for an explicit
`Server::new(...).with_health_checker(...).serve()` chain that
gates SERVING on `connected_workers > 0`.
Note that pigeon's grpc.health.v1 still fast-paths empty service names
to SERVING (liveness semantics), so the K8s readinessProbe must
specify the sync_action service name to actually invoke this checker.
The cluster-side change ships in a companion discord/discord PR.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a custom
HealthCheckeron the coordinator's sync_action gRPC service that returns SERVING only when at least one async worker is connected via the bidi stream. Without this, new coord pods acceptdiscord_api → coordprocess_actioncalls before any worker has dialed in, those calls enqueue onto a priority queue with no consumers, and discord_api times out withDEADLINE_EXCEEDED.Why
Every rolling deploy of
osprey-coordinator-asyncio-prdproduces ~2,500–3,000DEADLINE_EXCEEDEDerrors ondiscord_api.rpc_client.osprey_coordinator_asyncio.process_action. We've worked through the cluster-config knobs over the last few PRs (surge=1, 30s startupProbe delay, 30s preStop sleep, +grace period) and total failures are roughly unchanged because they all address the drain side. The actual gap is on the warmup side: pigeon'sgrpc.health.v1answers SERVING the moment port 80 binds (~2s after container start), well before the bidi service has been announced in etcd and workers have connected. K8s readiness flips green at the 30s startupProbe boundary, traffic flows, and the first ~5–15 seconds of inbound calls have nowhere to go.How
pigeon::Server::with_health_checker— new builder method that pushes aHealthChecker(clone-able closure form:Fn() -> bool + Clone + Send + Sync + 'static) onto every gRPC service registered on the Server. No existing call sites are touched.OspreyCoordinatorServer—Arc<AtomicI64>, owned by the bidi server. Each bidi stream handler increments the counter on entry and decrements it via aWorkerConnectionGuardDropimpl, so disconnects (graceful, panic, cancellation) all release the count without explicit decrement code at every break site.main.rsswap —pigeon::serve(sync_action, ...)is replaced with an explicitpigeon::Server::new(...).with_health_checker(\|\| count > 0).serve()chain, mirroring everythingpigeon::servedoes plus the new checker. Bidi service still uses the unmodifiedpigeon::serve(no readiness gating; workers don't K8s-probe it).Net diff: ~76 lines, 3 files, 0 unsafe code.
Important caveat — companion change required
Pigeon's
grpc.health.v1.Health/Checkreturns SERVING for any request with an empty service name (intentional: that's the liveness fast-path,health.rs:101-104). K8s' built-in gRPC probe defaults to an empty service name, which means just shipping this PR does nothing on its own — the K8s probe still goes through the fast-path and ignores the new checker.The accompanying discord/discord cluster config must change the readiness probe to:
Once both ship, the K8s readiness probe will route through the registered service-name path →
HealthServer::check→ our checker →connected_workers > 0.Test plan
cargo checkclean (211 pre-existing warnings, no errors / no new warnings from this change)grpcurl localhost:80 grpc.health.v1.Health/Check '{"service":"osprey.rpc.osprey_coordinator.sync_action.v1.OspreyCoordinatorSyncActionService"}'before any worker connects → expectNOT_SERVING; after worker connects → expectSERVINGconnection_guarddecrements correctly under panic and cancellation by checking counter after worker disconnects mid-streamprocess_action.failure{exception_type:deadline_exceeded}rate during rollout drops materially (target: <100 vs the current ~2,500–3,000 baseline)