Experimental asyncio-native worker + metrics & engine/coordinator improvements#341
Conversation
|
Warning Review limit reached
More reviews will be available in 20 minutes and 47 seconds. Learn how PR review limits work. Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file). ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits. 🚦 How do rate limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Plus Run ID: ⛔ Files ignored due to path filters (1)
📒 Files selected for processing (2)
📝 WalkthroughWalkthroughThis PR adds async worker packaging and runtime support, shared worker runtime changes, coordinator shutdown/service-name wiring, and a UI checkbox update. It also updates documentation, workspace configuration, example data, and test coverage across the async and sync worker paths. ChangesAsync worker, shared runtime, stdlib, and coordinator updates
Feature selection checkbox update
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…oordinator improvements (#60) * Engine executor improvements - execution_context: synthesize entity verdicts from LabelEffects in the sync ProcessAction response so callers see verdicts without a separate lookup - execution_visualizer: accept the engine as a parameter to render_graph - extract external_service_utils_base for reuse by alternate executors - binary_operation_executor: safe division to avoid ZeroDivisionError in rules - minor grammar / executor / type cleanups * Stdlib UDF additions and fixes - add count_regex_matches UDF (count regex matches in a string) + tests - coerce_type defaults to True for JsonData/EntityJson string parsing - string / categories / experiments / rules UDF cleanups - remove dead WEBHOOKS_USE_PUBSUB feature-flag constant * Add CountOver query UDF and Druid SQL translator - CountOver QueryUdfBase for sliding-window burst detection over Druid - validate_count_over: reject nested/multiple/invalid CountOver usages - ast_druid_translator: lower CountOver to SQL with selectable engine (window functions vs self-join) and output modes (SCAN/TIMESERIES/top_n); interpolate a caller-supplied datasource name into the FROM clause - minor FeatureSelectModal cleanup * Self-heal etcd source reloads and harden the watch mux - dedup applied config on the hash actually applied, not the received hash, so a dropped/failed apply self-heals on the next identical etcd event - WatchMux: dedup duplicate watch responses - watcherd: downgrade a noisy reconnect log to debug - extract sources_provider_base to share reload logic across worker variants * Coordinator: graceful shutdown, sync-RPC fast-reject, configurable leases - fast-reject new sync RPCs with Unavailable once the pod is draining so the client retries on a healthier pod instead of timing out - nack queued sync + async actions on shutdown; extend the drain window to 30s - make pubsub lease-extension durations configurable; default the minimum to 30s * Metrics and observability improvements - instruments: rule audit logging and an enforcement-gap metric - monitored_rules_metrics: emit per-rule metrics from the label output sink - tag worker metrics with worker_type; instrument the multi-output sink - coordinator_metrics: additional counters for sync/async paths * Add experimental asyncio-native worker An alternative worker built on asyncio instead of gevent. Runs the rules engine, input stream, sinks, and publisher on the event loop, and talks to the coordinator over a bidirectional gRPC stream (pigeon client). - osprey_async_worker package: executor, engine, sinks (input/output/rules), coordinator input stream, etcd sources provider, async MX-lookup stdlib UDF, and a first-party plugin hook to bootstrap async UDFs - self-contained pigeon client with vendored exceptions/interceptors so the package does not depend on worker-internal gRPC plumbing - break AST refcycles on source swap; recompile off the event loop - extract acking_contexts_base so sync and async workers share ack handling - coordinator: bidirectional stream + pigeon module support - wire osprey_async_worker into the uv workspace; enable pytest asyncio_mode * Add pluggable LLM provider hook with tool calling - osprey.worker.lib.llm: provider-agnostic base, a tool-calling loop, and tool definitions, with a register_llm_provider plugin hook - example_plugins: Anthropic provider implementation + tests - docs: LLM integration guide
The async worker's etcd-backed discovery imported an external `hash_ring` package (a compiled C extension) that isn't part of open-source osprey. Replace it with a small, stdlib-only consistent-hash ring (MD5 virtual points) that is a drop-in for the API discovery uses — HashRingNode(name, num_replicas), HashRing(n).add_nodes(...), find_node(key)->name, find_nodes(key, count)->names. This makes the async worker self-contained and buildable without any private dependency. Placement is an internal detail (only fleet self-consistency matters), so an exact match to any prior implementation is unnecessary.
- osprey_worker/Dockerfile: ADD the osprey_async_worker workspace member (pyproject + source) so `uv sync --locked` resolves the full 192-package workspace. Without it the locked sync rejected the lockfile and integration-tests failed at the Docker build stage. - ruff / ruff-format: fix E402 import ordering (execution_context, async pigeon client) and format the re-expressed code to upstream style. - mypy: type-correctness across the async worker, engine, and external service helpers (annotations / isinstance-narrowing / casts only; no behavior change). - parse_int: add the missing UdfCategories.CAST member it references (was an import-time AttributeError). - fawltydeps: ignore pytest-asyncio (a pytest plugin used via marker, not imported).
1fe672a to
958592c
Compare
JsonData's `coerce_type` now defaults to True (12e8ed2), so `Entity[str]` reading an int coerces (123 -> "123") instead of raising InvalidJsonType. This upstream strict-type-check test relied on the old False default and expected 2 errors. Make it opt into `coerce_type=False` so it still verifies strict type checking (both A and B mismatch -> 2 errors); the coercion path is covered separately by test_entity_can_coerce_type.
Keep the stable gevent worker image free of the experimental asyncio worker, while giving users an opt-in way to run it. - osprey_worker/Dockerfile (gevent, stable): keep osprey_async_worker/pyproject.toml present (required for `uv sync --locked` to validate the workspace lockfile) but exclude it from install via `--no-install-package osprey-async-worker` and stop copying its source. `osprey.async_worker` is then not importable, so execution_context's optional async import cleanly falls back to None — i.e. this image behaves exactly as it did before the asyncio worker existed. - osprey_async_worker/Dockerfile (NEW, EXPERIMENTAL): installs the full workspace (the only image that does) and defaults to the asyncio worker. Phase-0: static/ JSONL input + stdout sink; no Kafka/coordinator input wired upstream yet. - entrypoint.sh: add an `osprey-async-worker` command. - docker-compose: add an `osprey-async-worker` service gated behind the `experimental` profile, so it is never built or started by default (or by the `test` profile used in CI).
…he box The Phase-0 asyncio worker uses the stdlib-only engine, but the entrypoint and compose default pointed at ./example_rules, which needs plugin UDFs (TextContains, BanUser) the stdlib engine doesn't provide — so the image failed rule validation out of the box. - Add osprey_async_worker/example_rules/: a stdlib-only ruleset (LongMessage — message text > 100 chars) + minimal config + sample_actions.jsonl. - entrypoint.sh: default OSPREY_RULES_PATH to the bundled example and support an optional OSPREY_INPUT_FILE (JSONL actions). - docker-compose: point the experimental osprey-async-worker service at the bundled rules + sample actions. Verified: `docker run` of the async image with no mounts boots, compiles the ruleset, and processes both sample actions to the stdout sink (LongMessage false/true), then exits cleanly.
The asyncio image was based on the gevent worker's Dockerfile and inherited ENV/EXPOSE lines that don't apply to the Phase-0 async worker: - ETCD_PEERS: the async worker reads rules from --rules-path and never reads ETCD_PEERS (confirmed unused anywhere in osprey_async_worker). The http://etcd:2379 default also only resolves inside the compose network. - PORT + EXPOSE: the worker is a CLI runner (static/JSONL input -> stdout sink); it does not bind a port. Keep PYTHONPATH (image-structural) and the jemalloc allocator tuning. Actual runtime config (rules path, input file, DD flags) is supplied at run time via compose / `docker run -e`, not baked into the image.
|
Oooh thank you for the PR! This is great, I know gevent has caused headaches and been the topic of many discussions in the past. I'm also especially interested in the pluggable LLM provider hook :) @juanmrad do you think you'd be able to give this a review? |
|
Also @cmttt any chance you can break this into smaller PRs to make it easier to review? |
…root_cache before nulling AST parents _break_old_graph_cycles nulls `parent` back-pointers on a discarded graph's AST nodes so refcount can reclaim it without waiting for gen-2 GC. But those Root objects are memoized in the module-level, never-evicted parsed_ast_root_cache (keyed by Source content). When a file's exact content RECURS -- e.g. a rule revert -- the cache hands back the same parent-nulled Root, and validation fails with "`Rule(...)` must be assigned to a variable" (the Rule call's parent Assign is gone), failing the recompile and wedging the worker on stale rules until restart. Fix: pop the source from parsed_ast_root_cache before nulling its parents, so any future recurrence of that content re-parses a fresh, intact Root. Adds a regression test that fails without the eviction. Reproduced + validated at the AST level: null-only => re-parse cache-hit, Rule parent None (corrupt); evict+null => re-parse cache-miss, Rule parent Assign (ok).
…compile Adds _freeze_resident_graph() (gc.collect(); gc.freeze()), called at boot and after each recompile's swap + cycle-break. Moves the resident rule graph + long-lived objects into the permanent GC generation so per-message gen-2 collections stay cheap as rules recompile -- otherwise refcycle-held AST objects (parsed_ast_root_cache) accumulate as permanent gen-2 survivors and per-message GC CPU climbs with uptime. On by default. Graduates the freeze from the smite-side worker (a polling background loop) into the engine, inline + serialized with the recompile path. Updates the now-stale "we deliberately do NOT call gc.collect()" comment.
Drops the experimental CountOver sliding-window burst-detection feature: the count_over/validate_count_over UDFs, the SQL-lowering paths in DruidQueryTransformer, the parse_query_filter SQL routing, and their tests/snapshots. Reverts the touched engine files to their main state.
Adds AsyncKafkaInputStream, mirroring the gevent KafkaInputStream envelope decode but polling kafka-python off the event loop via asyncio.to_thread (no gevent, no new dependency). Wires it into osprey-async-cli run behind --input-source kafka with --kafka-* options, so the async worker can consume an Osprey-format topic directly (e.g. from at-kafka) without a coordinator.
Wire osprey-async-cli run --with-plugins to the async plugin system (bootstrap_async_udfs / _ast_validators / _output_sinks + AsyncOspreyEngine) instead of the sync gevent engine, removing the engine type-mismatch cast. Add a reference async plugin in example_plugins, registered under the osprey_async_plugin entry-point group: the pure-computation TextContains UDF (runs inline in the async executor) and an example AsyncBaseOutputSink, plus tests. This makes the asyncio worker's example plugins actually available.
There was a problem hiding this comment.
Actionable comments posted: 1
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (8)
osprey_coordinator/src/priority_queue.rs (1)
214-223:⚠️ Potential issue | 🟠 Major | ⚡ Quick winStop logging full action payloads when nack send fails.
println!("... {:?}", action.action)can expose action data/secret data in logs, and this helper now covers both sync and async drains.Suggested fix
- Err(_) => println!( - "tried to nack {:?} and the nacking receiver was dropped", - action.action - ), + Err(_) => tracing::warn!( + ack_id = action.action.ack_id, + action_id = action.action.action_id, + "failed to nack action because the receiver was dropped" + ),🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_coordinator/src/priority_queue.rs` around lines 214 - 223, The nack_all function currently logs the full action payload using println! with debug formatting on action.action when the nack send fails, which can expose sensitive data in logs. Remove the action.action payload from the error log message in the Err branch of the match statement where action.acking_oneshot_sender.send(AckOrNack::Nack) is called, and replace it with a generic error message that does not include any action details or payload information.osprey_coordinator/src/osprey_bidirectional_stream.rs (1)
89-97:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winIncrement
bidi_acks_receivedonly after parsing a valid ack payload.Right now the counter increments before
ack_or_nackvalidation, so malformed messages are counted as received acks.Suggested fix
- metrics.bidi_acks_received.incr(); let duration = Instant::now().duration_since(state.send_time); metrics.action_outstanding_duration.record(duration); - state.action_acker.ack_or_nack( - ack_or_nack - .ack_or_nack - .context("no `ack_or_nack` in proto")?, - ); + let ack_or_nack = ack_or_nack + .ack_or_nack + .context("no `ack_or_nack` in proto")?; + state.action_acker.ack_or_nack(ack_or_nack); + metrics.bidi_acks_received.incr();🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_coordinator/src/osprey_bidirectional_stream.rs` around lines 89 - 97, The metrics.bidi_acks_received counter is being incremented before the ack_or_nack payload is validated, which means malformed messages that fail the context check will still be counted. Move the metrics.bidi_acks_received.incr() call to execute only after the ack_or_nack validation succeeds (after the context("no `ack_or_nack` in proto")? check completes successfully), ensuring that only valid ack payloads are counted in the metrics.osprey_coordinator/src/pub_sub_streaming_pull/flow_control.rs (1)
70-77:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUpdate default value in the doc comment to match runtime behavior.
The doc says default min lease extension is 10s, but
FlowControl::new()now defaults to 30s.Suggested fix
- /// Default: (min = 10 seconds, max = 600 seconds) + /// Default: (min = 30 seconds, max = 600 seconds)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_coordinator/src/pub_sub_streaming_pull/flow_control.rs` around lines 70 - 77, The documentation comment for the lease extension settings on the FlowControl struct states a default minimum of 10 seconds in the Default section, but the actual implementation in FlowControl::new() sets the default minimum to 30 seconds. Update the doc comment's Default line to reflect the actual default minimum value of 30 seconds to match the runtime behavior implemented in FlowControl::new().osprey_worker/src/osprey/worker/lib/sources_provider.py (1)
60-71:⚠️ Potential issue | 🟠 Major | ⚡ Quick winEnsure paused input streams are always resumed when watcher callbacks fail.
At Line 67, an exception from
self._sources_watcher_callback()exits before the resume block (Lines 69-71), which can leave ingestion paused until restart. Wrap update/callback intry/finallyso resume is guaranteed.Proposed fix
if self._input_stream_ready_signaler is not None: logging.info('Pausing input streams') self._input_stream_ready_signaler.pause_input_stream() self._input_stream_ready_signaler.wait_for_input_stream_to_pause() - self._current_sources = new_sources - if self._sources_watcher_callback: - self._sources_watcher_callback() - - if self._input_stream_ready_signaler is not None: - logging.info('Restarting input streams') - self._input_stream_ready_signaler.resume_input_stream() + try: + self._current_sources = new_sources + if self._sources_watcher_callback: + self._sources_watcher_callback() + finally: + if self._input_stream_ready_signaler is not None: + logging.info('Restarting input streams') + self._input_stream_ready_signaler.resume_input_stream()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/src/osprey/worker/lib/sources_provider.py` around lines 60 - 71, The resume_input_stream call in the _input_stream_ready_signaler block is not guaranteed to execute if self._sources_watcher_callback() raises an exception, leaving input streams paused indefinitely. Refactor the code by wrapping the self._current_sources assignment and self._sources_watcher_callback() invocation in a try block, and move the input stream resume block (the logging and resume_input_stream call) into a corresponding finally block to ensure the streams are always resumed regardless of callback failures.osprey_async_worker/src/osprey/async_worker/tests/test_external_service.py (1)
201-222: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd regression tests for duplicate-key batches and cancellation cleanup.
Please add tests for:
batch_get(['a', 'a'])duplicate-key behavior, and- cancellation during in-flight fetch ensuring cache doesn’t retain unresolved futures.
As per coding guidelines (AGENTS.md), “New behavior requires a test; bug fixes require a regression test.”
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/tests/test_external_service.py` around lines 201 - 222, Add two new test functions after test_batch_get_uses_cache to cover missing regression cases. First, create a test that calls batch_get with duplicate keys (e.g., ['a', 'a']) to verify the behavior when the same key appears multiple times in a single batch request. Second, create a test that triggers cancellation of an in-flight batch_get request to ensure the cache properly cleans up and does not retain unresolved futures from the cancelled operation. Both tests should follow the same pytest.mark.asyncio pattern and assertion style as the existing tests, verifying the service call counts and cache state appropriately.Source: Coding guidelines
entrypoint.sh (1)
11-52:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
entrypoint.shis a restricted file and needs explicit human approval.Please gate merge on recorded manual approval for this entrypoint change.
As per coding guidelines, AGENTS.md lists
entrypoint.shunder human-approval-required actions.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@entrypoint.sh` around lines 11 - 52, The entrypoint.sh file is restricted and requires explicit human approval before merging, as documented in AGENTS.md under human-approval-required actions. Obtain recorded manual approval from the appropriate stakeholder for the changes made to the entrypoint.sh file (the cli-osprey-ui-api, cli-osprey-worker, and cli-osprey-async-worker functions) and ensure this approval is documented on the pull request before proceeding with the merge.Source: Coding guidelines
osprey_async_worker/Dockerfile (2)
11-73:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRun the image as a non-root user.
There is no
USERdirective, so runtime defaults to root (Line 71 onward still executes as root). This weakens container isolation.🔧 Suggested hardening
FROM python:3.11-slim ARG TARGETPLATFORM @@ WORKDIR /osprey @@ +RUN addgroup --system osprey && \ + adduser --system --ingroup osprey --home /osprey osprey && \ + chown -R osprey:osprey /osprey @@ COPY entrypoint.sh /osprey/entrypoint.sh +USER osprey🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/Dockerfile` around lines 11 - 73, The Dockerfile lacks a USER directive, causing the container to run as root by default which presents a security risk. Add a non-root user creation command (using RUN to invoke useradd or similar) after the source code is added but before the ENTRYPOINT directive, then add a USER directive specifying the created user immediately before the ENTRYPOINT and CMD lines to ensure the container executes with restricted privileges.Source: Linters/SAST tools
1-73:⚠️ Potential issue | 🟠 Major | ⚡ Quick winThis Dockerfile change needs explicit human approval before merge.
This file is in the restricted set for manual approval in repo policy.
As per coding guidelines, AGENTS.md marks
**/Dockerfilechanges as “Human-approval-required actions.”🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/Dockerfile` around lines 1 - 73, This Dockerfile change requires explicit human approval before it can be merged per repository policy as documented in AGENTS.md. As the agent/intern, you cannot automatically fix this - you must request explicit approval from a human maintainer or project lead who has authority to review and approve Dockerfile changes. Do not attempt to modify the Dockerfile further as a workaround; instead, ensure proper approval is obtained through the established review process before this change can be merged.Source: Coding guidelines
🟡 Minor comments (16)
osprey_coordinator/src/sync_action_rpc.rs-95-100 (1)
95-100:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAvoid retrying the intentional drain rejection.
This branch returns
Unavailableby design, butprocess_actionretries all errors once, so draining requests are immediately re-executed and counted again. Consider skipping the second attempt when shutdown is active.Suggested fix
- match self.try_process_action(ack_id, &action_request).await { + match self.try_process_action(ack_id, &action_request).await { response @ Ok(_) => response, + Err(e) if self.is_shutting_down.load(Ordering::Acquire) => Err(e), Err(e) => { tracing::error!("initial process_action attempt failed, retrying: {}", e); self.try_process_action(ack_id, &action_request).await } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_coordinator/src/sync_action_rpc.rs` around lines 95 - 100, When the coordinator is shutting down, the code returns an `Unavailable` error in the `is_shutting_down` check, but the `process_action` function automatically retries all errors once, causing draining requests to be executed and counted twice. Instead of returning `Unavailable` which is retryable, return a non-retryable error status code such as `FailedPrecondition` so that `process_action` will skip the retry attempt when shutdown is active.osprey_worker/src/osprey/engine/stdlib/udfs/count_regex_matches.py-14-17 (1)
14-17:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winImplementation currently counts duplicate patterns, despite “distinct patterns” contract.
The code iterates the raw
patternslist, so duplicates increment the count multiple times. Either dedupe before compile, or update the UDF docs to match current behavior.💡 Proposed fix (dedupe while preserving order)
- for pattern in arguments.patterns.value: + for pattern in dict.fromkeys(arguments.patterns.value): try: self._compiled.append(re.compile(pattern, flags))Also applies to: 42-53
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/src/osprey/engine/stdlib/udfs/count_regex_matches.py` around lines 14 - 17, The CountRegexMatchesArguments class documentation states it counts "distinct patterns", but the implementation processes all patterns in the input list including duplicates, causing them to be counted multiple times. Modify the CountRegexMatchesArguments class to deduplicate the patterns list while preserving order before they are processed, ensuring the actual behavior matches the documented contract of counting only distinct patterns. This deduplication should be applied where the patterns are stored or validated in the class, and ensure it also applies to any processing logic in the referenced code section at lines 42-53.osprey_worker/src/osprey/worker/sinks/sink/rules_sink.py-146-148 (1)
146-148:⚠️ Potential issue | 🟡 MinorConfirm skip-flag migration with upstream producers.
At Line 146, skip behavior now keys only on
osprey_skip_async. The codebase contains no references to the legacyosprey_v2_skip_async_classificationflag in Python source code, and action data is deserialized from external message sources (Kafka). If any upstream producer still emits only the old flag, those actions will process without being skipped. Verify that all producers have migrated to the new flag name before merging.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/src/osprey/worker/sinks/sink/rules_sink.py` around lines 146 - 148, The skip condition at line 146 in rules_sink.py currently only checks for the new 'osprey_skip_async' flag, but upstream Kafka producers may still be emitting the legacy 'osprey_v2_skip_async_classification' flag. Update the condition in the action.data.get() call to check for both the new flag 'osprey_skip_async' and the legacy flag 'osprey_v2_skip_async_classification' using a boolean OR operation, ensuring that actions from producers that haven't yet migrated to the new flag name will still be skipped correctly during the transition period.osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_count_regex_matches.py-39-43 (1)
39-43:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winEscape
targetsafely when building DSL test input.
target="{target}"will break for values containing quotes/escapes, causing parser failures unrelated to UDF behavior. Use a repr-safe literal instead.Proposed fix
- Count = CountRegexMatches(patterns={patterns!r}, target="{target}") + Count = CountRegexMatches(patterns={patterns!r}, target={target!r}) @@ - result = execute(f'Count = CountRegexMatches(patterns={patterns!r}, target="{target}"{extra_args})') + result = execute(f'Count = CountRegexMatches(patterns={patterns!r}, target={target!r}{extra_args})')Also applies to: 68-72
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_count_regex_matches.py` around lines 39 - 43, The target variable in the DSL test input string is being directly interpolated without proper escaping, which will cause parser failures if the target value contains quotes or escape characters. In the execute() function call within the test_count_regex_matches function, change the target parameter from target="{target}" to target={target!r} to use repr() formatting, which will properly escape special characters and quotes just like the patterns parameter already does with {patterns!r}. This same fix should be applied to all similar occurrences throughout the test file.docs/development/workflow.md-31-33 (1)
31-33:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix the mypy path in the documented command.
Line 32 points to
osprey_worker/src/osprey_worker/lib, which doesn’t match the repo path shape (osprey_worker/src/osprey/worker/lib) and is likely to fail when copied.Proposed fix
-uv run mypy osprey_worker/src/osprey_worker/lib +uv run mypy osprey_worker/src/osprey/worker/lib🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/development/workflow.md` around lines 31 - 33, The mypy command path in the Type checking documentation is incorrect. Update the path in the uv run mypy command from osprey_worker/src/osprey_worker/lib to osprey_worker/src/osprey/worker/lib to match the actual repository directory structure. This ensures the documented command will work correctly when users copy it.osprey_async_worker/src/osprey/async_worker/executor.py-1-9 (1)
1-9:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winModule docstring contradicts runtime behavior.
The header says there is no thread pool /
run_in_executor, but legacy paths below do userun_in_executor. Please align the docs with actual behavior.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/executor.py` around lines 1 - 9, The module docstring at the beginning of the executor.py file incorrectly states that there is no thread pool and no run_in_executor, but the actual implementation below uses run_in_executor for legacy sync UDF execution paths. Update the module docstring to accurately document all execution paths including the thread pool and run_in_executor usage for handling legacy sync UDFs, so the documentation reflects the true runtime behavior of the executor.osprey_async_worker/src/osprey/async_worker/lib/pigeon/client.py-389-389 (1)
389-389:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUse a targeted type-ignore code (or explicit protocol) instead of a bare ignore.
Line 389 introduces a broad
# type: ignore, which makes type suppression hard to audit.🔧 Suggested fix
- client = self._stub_cls(channel) # type: ignore + client = self._stub_cls(channel) # type: ignore[call-arg] # Generated grpc stubs accept Channel at runtime.As per coding guidelines, new Python suppressions must be narrowly scoped (
# type: ignore[<code>]) with justification.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/lib/pigeon/client.py` at line 389, The bare `# type: ignore` comment on line 389 where `client = self._stub_cls(channel)` is assigned needs to be replaced with a specific type error code. First, determine the exact type error by temporarily removing the ignore comment and letting your type checker (mypy) report the specific error code. Then replace the bare `# type: ignore` with a narrowly scoped `# type: ignore[error_code]` format (for example, `# type: ignore[assignment]` or `# type: ignore[arg-type]`) and add a brief comment explaining why the specific type suppression is necessary for the _stub_cls instantiation.Source: Coding guidelines
osprey_async_worker/src/osprey/async_worker/lib/discovery/async_directory.py-268-268 (1)
268-268:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
tolerate_drainingis ignored in the fallback branch.When no visible instances exist, the fallback always excludes draining instances, even if
tolerate_draining=True. That makes tolerant selection unexpectedly fail during visibility delays.Suggested fix
- return services if services else [w.service for w in self._instances.values() if not w.service.draining] + if services: + return services + if tolerate_draining: + return [w.service for w in self._instances.values()] + return [w.service for w in self._instances.values() if not w.service.draining]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/lib/discovery/async_directory.py` at line 268, In the fallback branch of the return statement in the method containing line 268, the filter condition is unconditionally excluding draining instances with `if not w.service.draining`, which ignores the `tolerate_draining` parameter. Modify the filter condition in the list comprehension to check the `tolerate_draining` parameter: if `tolerate_draining` is True, include all instances regardless of draining status; if False, maintain the current behavior of excluding draining instances. This ensures that when `tolerate_draining=True`, draining instances are included in the fallback list during visibility delays.osprey_async_worker/src/osprey/async_worker/stdlib_udfs/async_mx_lookup.py-33-33 (1)
33-33:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAdd justification (or remove) the new class-level
# type: ignore[misc].Line 33 introduces a new ignore without an inline rationale. Please either remove it by fixing the underlying typing issue or add a concise justification for this exact ignore code.
As per coding guidelines: “Do not disable lint or type rules to silence errors; fix the underlying issue or use narrowly-scoped
# type: ignore[<code>]with explanatory comments.”🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/stdlib_udfs/async_mx_lookup.py` at line 33, The MXLookup class has a type: ignore[misc] comment without justification. Either remove this ignore by fixing the underlying typing issue (such as properly typing the generic parameters or the class inheritance), or add a concise inline comment explaining why the specific misc type error cannot be resolved. If keeping the ignore, add a comment like # type: ignore[misc] # <reason> to document the necessity of this suppression.Source: Coding guidelines
osprey_async_worker/src/osprey/async_worker/cli/main.py-364-365 (1)
364-365:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix misleading benchmark fallback output.
On Line 365, when gevent is unavailable the CLI prints “Same performance,” which is inaccurate because no comparison was run.
Suggested fix
- else: - click.echo(' Same performance') + else: + click.echo('Comparison skipped: gevent benchmark unavailable')🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/cli/main.py` around lines 364 - 365, The else block in main.py that prints "Same performance" is misleading because this message is displayed when gevent is unavailable, but it incorrectly suggests that a performance comparison was actually performed. Replace the click.echo statement with a message that accurately reflects the situation, such as indicating that gevent is not available and therefore no comparison could be made, rather than claiming the performance is the same.osprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/baggage.py-34-36 (1)
34-36:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winConstructor-configured baggage values are never applied.
Line 34 defines
baggage_header/baggage, but Line 60 injects only span baggage and Line 61 appends those headers;self._baggage_headerandself._baggageare currently unused in request metadata generation.Proposed fix
def _intercept_call(self, continuation, client_call_details, request_or_iterator): @@ - baggage_headers = {} - baggage_propagator.inject(get_baggage(current_span()), baggage_headers) + baggage_headers = {} + merged_baggage = {**self._baggage, **get_baggage(current_span())} + baggage_propagator.inject(merged_baggage, baggage_headers) + if self._baggage_header != BAGGAGE_HEADER and BAGGAGE_HEADER in baggage_headers: + baggage_headers[self._baggage_header] = baggage_headers.pop(BAGGAGE_HEADER) metadata.extend([(k, v) for k, v in baggage_headers.items()])Also applies to: 60-61
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/baggage.py` around lines 34 - 36, The BaggageInterceptor constructor accepts and stores baggage_header and baggage parameters in self._baggage_header and self._baggage, but these values are never applied when generating request metadata. In the code section where span baggage is injected into request headers (around lines 60-61), you need to merge the constructor-configured self._baggage with the span baggage, and use self._baggage_header instead of just relying on the span baggage. Update the baggage injection logic to include and apply both the constructor-configured baggage dictionary and the appropriate baggage header name when building the request metadata headers.osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py-63-83 (1)
63-83:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winSame missing
_channel_poolissue.This test will also fail when
stop()is called without_channel_poolbeing mocked.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py` around lines 63 - 83, The test_input_stream_shutdown_event_unblocks test is missing the initialization of the _channel_pool attribute on the stream object. When stream.stop() is called, the stop() method attempts to access _channel_pool but it doesn't exist since only _shutdown_event was initialized using __new__. Add a mock or initialization for stream._channel_pool (similar to how stream._shutdown_event is set up) before calling await stream.stop() to ensure the stop() method has all required attributes available.osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py-37-46 (1)
37-46:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winTest uses non-existent attribute
_request_queue.The test assigns
stream._request_queuebutOspreyCoordinatorBiDirectionalStreamuses_outgoing_queue. This test passes but doesn't validate the actual class behavior.🐛 Proposed fix
`@pytest.mark.asyncio` async def test_bidirectional_stream_queue_based(): """Stream uses asyncio.Queue for sending requests.""" stream = OspreyCoordinatorBiDirectionalStream.__new__(OspreyCoordinatorBiDirectionalStream) - stream._request_queue = asyncio.Queue() - stream._should_run = True + stream._outgoing_queue = asyncio.Queue() + stream._stopped = False - await stream._request_queue.put('test_request') - item = await stream._request_queue.get() + await stream._outgoing_queue.put('test_request') + item = await stream._outgoing_queue.get() assert item == 'test_request'🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py` around lines 37 - 46, The test method test_bidirectional_stream_queue_based is assigning and accessing a non-existent attribute _request_queue, but the OspreyCoordinatorBiDirectionalStream class actually uses the attribute name _outgoing_queue. Replace all occurrences of _request_queue in this test with _outgoing_queue to ensure the test validates the actual class behavior and uses the correct attribute that the class implements.osprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.py-257-259 (1)
257-259:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAssert on user-driven path risks crash.
get_uptime()usesassert self._connect_time is not None, which will crash the process if called before a connection is established. Per coding guidelines, prefer explicit error handling on user/RPC paths.🛡️ Proposed fix
def get_uptime(self) -> float: - assert self._connect_time is not None, 'This was called before a connection was established' + if self._connect_time is None: + return 0.0 return time.time() - self._connect_time🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.py` around lines 257 - 259, The get_uptime() method uses an assert statement to validate that self._connect_time is not None, which is unsafe for user-driven or RPC paths since asserts can be disabled at runtime. Replace the assert with explicit error handling by raising an appropriate exception (such as RuntimeError) with the same descriptive error message when self._connect_time is None, ensuring the error is always caught regardless of Python optimization flags.osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py-52-60 (1)
52-60:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winTest for
stop()is incomplete — missing_channel_poolmock.
OspreyCoordinatorInputStream.stop()callsawait self._channel_pool.close(), but this test doesn't mock_channel_pool, so it will raiseAttributeErrorwhen run.🐛 Proposed fix
`@pytest.mark.asyncio` async def test_input_stream_stop(): """Stop sets the shutdown event.""" stream = OspreyCoordinatorInputStream.__new__(OspreyCoordinatorInputStream) stream._shutdown_event = asyncio.Event() + stream._channel_pool = MagicMock() + stream._channel_pool.close = MagicMock(return_value=asyncio.coroutine(lambda: None)()) assert not stream._shutdown_event.is_set() await stream.stop() assert stream._shutdown_event.is_set()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py` around lines 52 - 60, The test_input_stream_stop test creates an OspreyCoordinatorInputStream instance and sets _shutdown_event, but the stop() method also calls await self._channel_pool.close(), which is not mocked. Add a mock object with an async close method and assign it to stream._channel_pool before calling await stream.stop() to prevent an AttributeError when the test runs.osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py-47-57 (1)
47-57:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winTimeout retries lack backoff sleep.
When a
TimeoutErroroccurs (lines 47-51), the loop retries immediately without any delay, unlike the generic exception path (line 57) which sleeps before retrying. This could cause rapid retry storms against an already-struggling sink.🔧 Proposed fix to add backoff on timeout
except TimeoutError: logger.warning(f'Timeout pushing to {sink_name} (attempt {attempt}/{attempts})') metrics.increment('output_sink.timeout', tags=[f'sink:{sink_name}']) if attempt == attempts: metrics.increment('output_sink.timeout_exhausted', tags=[f'sink:{sink_name}']) + else: + await asyncio.sleep(0.5 * attempt) except Exception as exc:🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py` around lines 47 - 57, The TimeoutError exception handler (which catches TimeoutError at line 47) is missing the backoff sleep that exists in the generic Exception handler path. To fix this, add an await asyncio.sleep call with the same exponential backoff formula (0.5 * attempt) to the TimeoutError block, ensuring it sleeps before retrying on timeout just like the generic Exception handler does. This sleep should be added after the metrics increment and before checking if the attempt has exhausted the maximum number of retries (before the if attempt == attempts check).
🧹 Nitpick comments (9)
osprey_worker/src/osprey/worker/lib/llm/tests/test_base.py (1)
77-84: 💤 Low valueConsider using
pytest.raisesfor consistency.The try/except/else pattern works correctly, but
pytest.raises(TypeError)would be more idiomatic and consistent with typical pytest style. This is purely a style preference.♻️ Optional refactor
def test_base_provider_is_abstract_and_subclassable() -> None: # BaseLLMProvider cannot be instantiated directly. - try: - BaseLLMProvider() # type: ignore[abstract] - except TypeError: - pass - else: - raise AssertionError('expected BaseLLMProvider to be abstract') + import pytest + with pytest.raises(TypeError): + BaseLLMProvider() # type: ignore[abstract]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/src/osprey/worker/lib/llm/tests/test_base.py` around lines 77 - 84, The test function `test_base_provider_is_abstract_and_subclassable` uses a manual try/except/else pattern to verify that `BaseLLMProvider` raises a TypeError when instantiated directly. Replace this pattern with the `pytest.raises(TypeError)` context manager that wraps the instantiation of `BaseLLMProvider()` to align with pytest conventions and improve readability.osprey_worker/src/osprey/worker/lib/llm/tools.py (1)
43-57: 💤 Low value
Noneas sentinel prevents declaring explicitnulldefaults.Using
Noneto mean "no default" makes it impossible to emit a JSON Schemadefault: nullfor parameters wherenullis a valid default value. If this is intentional (null defaults not needed), the current approach is fine. Otherwise, consider a sentinel like_UNSET = object().This is a minor edge case that may never arise in practice.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/src/osprey/worker/lib/llm/tools.py` around lines 43 - 57, The issue is that using `None` as the sentinel value for "no default" in the ToolParameter dataclass prevents explicitly setting a JSON Schema default value of null. Replace the `None` sentinel with a dedicated sentinel object (e.g., `_UNSET = object()`) defined at module level. Update the default parameter initialization in the ToolParameter dataclass to use this new sentinel instead of `None`, and change the condition in the build_input_schema function that checks `if parameter.default is not None:` to instead check against the new sentinel object to determine whether to include a default value in the schema.osprey_worker/src/osprey/worker/sinks/sink/output_sink.py (1)
190-197: ⚡ Quick winDefensively copy
monitored_labelson assignment.This currently stores the caller’s set by reference, so later external mutations can change sink behavior unexpectedly.
💡 Proposed fix
def __init__(self, labels_provider: LabelsProvider, monitored_labels: set[str] | None = None) -> None: self._labels_provider = labels_provider - self._monitored_labels: set[str] = monitored_labels or set() + self._monitored_labels: set[str] = set(monitored_labels or set()) def set_monitored_labels(self, monitored_labels: set[str]) -> None: """Set monitored labels for metrics emission. Can be called post-engine compilation.""" - self._monitored_labels = monitored_labels + self._monitored_labels = set(monitored_labels)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/src/osprey/worker/sinks/sink/output_sink.py` around lines 190 - 197, The __init__ method and set_monitored_labels method both store direct references to the caller's monitored_labels set instead of making defensive copies. This allows external mutations to the original set to unexpectedly change the sink's behavior. Fix this by creating a defensive copy of the monitored_labels parameter when assigning it to self._monitored_labels in both locations - use the set() constructor to create a copy of the passed set in the __init__ method when monitored_labels is not None, and similarly in the set_monitored_labels method when storing the provided monitored_labels.osprey_worker/src/osprey/worker/lib/etcd/watcher/watcherd_impl.py (1)
147-152: ⚡ Quick winPreserve traceback context on retry warnings.
Switching to
log.warningdrops stack traces here, which makes recurring stream/reset failures harder to root-cause.Proposed patch
- log.warning( + log.warning( '%r: etcd-watcherd stream raised an error (%r). sleeping for %.2f sec before retrying', self, e, delay, + exc_info=True, ) @@ - log.warning('%r: etcd raised an error (%r). sleeping for %.2f sec before retrying', self, e, delay) + log.warning( + '%r: etcd raised an error (%r). sleeping for %.2f sec before retrying', + self, + e, + delay, + exc_info=True, + )Also applies to: 169-170
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/src/osprey/worker/lib/etcd/watcher/watcherd_impl.py` around lines 147 - 152, The log.warning calls for etcd-watcherd stream errors drop stack traces, making debugging harder. Replace the log.warning call in the error handling block (around line 147) with log.exception to preserve the full traceback context when logging the error. Apply the same fix to the similar error handling at line 169-170 as noted in the comment. This will ensure that stack traces are captured and logged, making it easier to root-cause recurring stream/reset failures.osprey_worker/Dockerfile (1)
39-42: ⚡ Quick winUse
COPYinstead ofADDfor these local file copies.These lines are plain local copies;
COPYis the clearer/safer primitive and aligns with lint output.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_worker/Dockerfile` around lines 39 - 42, Replace all four ADD instructions with COPY instructions for the pyproject.toml file copies. Change the ADD commands that copy osprey_rpc/pyproject.toml, osprey_worker/pyproject.toml, osprey_async_worker/pyproject.toml, and example_plugins/pyproject.toml from ADD to COPY, as COPY is the appropriate primitive for copying local files and is clearer and safer than ADD for this use case.Source: Linters/SAST tools
osprey_async_worker/Dockerfile (1)
38-47: ⚡ Quick winPrefer
COPYoverADDfor plain file/directory copies.These
ADDinstructions are simple filesystem copies (no archive extraction / remote URL fetch), soCOPYis clearer and matches lint expectations.Also applies to: 63-67
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/Dockerfile` around lines 38 - 47, Replace all ADD instructions with COPY instructions in the Dockerfile. The ADD commands for uv.lock, pyproject.toml, README.md, LICENSE.md, and the various project-specific pyproject.toml files (osprey_rpc, osprey_worker, osprey_async_worker, example_plugins) are simple filesystem copies and should use COPY instead, as ADD is only needed when extracting archives or fetching remote URLs. Apply this change to all ADD instructions including those at lines 63-67 as well.Source: Linters/SAST tools
osprey_async_worker/src/osprey/async_worker/tests/test_pigeon_client.py (1)
13-25: ⚡ Quick winHarden contextvar tests against state leakage between tests.
These tests reset
skip_rate_limit_context.skipmanually, but a mid-test failure can leave shared state behind for subsequent tests. A small fixture (ortry/finally) would make this deterministic.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/tests/test_pigeon_client.py` around lines 13 - 25, The tests test_skip_rate_limit_set_and_get and test_skip_rate_limit_property_api manually reset skip_rate_limit_context.skip but lack proper cleanup if a test fails mid-execution, potentially leaving the context in a dirty state for subsequent tests. Refactor these tests to use either a pytest fixture that automatically resets skip_rate_limit_context.skip to its initial state before and after each test, or wrap the test logic in a try/finally block to ensure cleanup occurs regardless of test outcome.osprey_async_worker/src/osprey/async_worker/tests/test_no_gevent_imports.py (1)
32-37: ⚡ Quick winKeep gevent-allowed importers single-sourced.
Line 76 hardcodes the allowlist instead of using
_ALLOWED_GEVENT_IMPORTERS, so the test can drift if one side changes.Suggested fix
- f'allowed = {{"sentry_sdk", "ddtrace"}}\n' + f'allowed = {set(_ALLOWED_GEVENT_IMPORTERS)!r}\n'Also applies to: 76-76
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/tests/test_no_gevent_imports.py` around lines 32 - 37, The allowlist for gevent importers is defined once in the `_ALLOWED_GEVENT_IMPORTERS` constant but is hardcoded again at line 76, creating a maintenance issue where changes to one location won't sync with the other. Replace the hardcoded allowlist at line 76 with a reference to the `_ALLOWED_GEVENT_IMPORTERS` constant to ensure the test uses a single-sourced allowlist and prevents the test from drifting if the allowlist is updated in the future.osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py (1)
59-61: 💤 Low valueConsider parallel stop for faster shutdown.
stop()awaits each sink sequentially. For faster graceful shutdown (especially with many sinks or slow-to-stop sinks), consider usingasyncio.gather:♻️ Proposed refactor
async def stop(self) -> None: - for sink in self._sinks: - await sink.stop() + await asyncio.gather(*(sink.stop() for sink in self._sinks), return_exceptions=True)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py` around lines 59 - 61, The stop() method in the OutputSink class currently awaits each sink's stop operation sequentially, which is inefficient for shutdown performance. Refactor the stop() method to use asyncio.gather() to run all sink.stop() calls concurrently instead of sequentially, allowing multiple sinks to stop in parallel rather than waiting for each one to complete before moving to the next.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 5ab76602-d02e-4274-ac63-0680a261abed
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (133)
docker-compose.yamldocs/DEVELOPMENT.mddocs/SUMMARY.mddocs/development/workflow.mddocs/llm.mdentrypoint.shexample_plugins/pyproject.tomlexample_plugins/src/async_sinks/__init__.pyexample_plugins/src/async_sinks/example_async_output_sink.pyexample_plugins/src/llm/__init__.pyexample_plugins/src/llm/anthropic_provider.pyexample_plugins/src/llm/tests/__init__.pyexample_plugins/src/llm/tests/test_anthropic_provider.pyexample_plugins/src/register_async_plugins.pyexample_plugins/src/register_plugins.pyexample_plugins/src/tests/__init__.pyexample_plugins/src/tests/test_register_async_plugins.pyosprey_async_worker/Dockerfileosprey_async_worker/example_rules/config/example_config.yamlosprey_async_worker/example_rules/main.smlosprey_async_worker/example_rules/rules/long_message.smlosprey_async_worker/example_rules/sample_actions.jsonlosprey_async_worker/pyproject.tomlosprey_async_worker/src/osprey/async_worker/__init__.pyosprey_async_worker/src/osprey/async_worker/adaptor/__init__.pyosprey_async_worker/src/osprey/async_worker/adaptor/constants.pyosprey_async_worker/src/osprey/async_worker/adaptor/hookspecs.pyosprey_async_worker/src/osprey/async_worker/adaptor/interfaces.pyosprey_async_worker/src/osprey/async_worker/adaptor/plugin_manager.pyosprey_async_worker/src/osprey/async_worker/cli/__init__.pyosprey_async_worker/src/osprey/async_worker/cli/main.pyosprey_async_worker/src/osprey/async_worker/engine.pyosprey_async_worker/src/osprey/async_worker/executor.pyosprey_async_worker/src/osprey/async_worker/lib/__init__.pyosprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.pyosprey_async_worker/src/osprey/async_worker/lib/discovery/__init__.pyosprey_async_worker/src/osprey/async_worker/lib/discovery/async_directory.pyosprey_async_worker/src/osprey/async_worker/lib/discovery/hash_ring.pyosprey_async_worker/src/osprey/async_worker/lib/etcd/__init__.pyosprey_async_worker/src/osprey/async_worker/lib/etcd/sources_provider.pyosprey_async_worker/src/osprey/async_worker/lib/external_service.pyosprey_async_worker/src/osprey/async_worker/lib/pigeon/__init__.pyosprey_async_worker/src/osprey/async_worker/lib/pigeon/client.pyosprey_async_worker/src/osprey/async_worker/lib/pigeon/exceptions.pyosprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/__init__.pyosprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/baggage.pyosprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/metadata.pyosprey_async_worker/src/osprey/async_worker/lib/pigeon/skip_rate_limit.pyosprey_async_worker/src/osprey/async_worker/lib/publisher.pyosprey_async_worker/src/osprey/async_worker/lib/utils/__init__.pyosprey_async_worker/src/osprey/async_worker/singletons.pyosprey_async_worker/src/osprey/async_worker/sinks/__init__.pyosprey_async_worker/src/osprey/async_worker/sinks/sink/__init__.pyosprey_async_worker/src/osprey/async_worker/sinks/sink/input_stream.pyosprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.pyosprey_async_worker/src/osprey/async_worker/sinks/sink/rules_sink.pyosprey_async_worker/src/osprey/async_worker/stdlib_udfs/__init__.pyosprey_async_worker/src/osprey/async_worker/stdlib_udfs/_async_stdlib_plugin.pyosprey_async_worker/src/osprey/async_worker/stdlib_udfs/async_mx_lookup.pyosprey_async_worker/src/osprey/async_worker/tests/__init__.pyosprey_async_worker/src/osprey/async_worker/tests/conftest.pyosprey_async_worker/src/osprey/async_worker/tests/test_async_executor.pyosprey_async_worker/src/osprey/async_worker/tests/test_async_sinks.pyosprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.pyosprey_async_worker/src/osprey/async_worker/tests/test_engine.pyosprey_async_worker/src/osprey/async_worker/tests/test_etcd_sources_provider.pyosprey_async_worker/src/osprey/async_worker/tests/test_external_service.pyosprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.pyosprey_async_worker/src/osprey/async_worker/tests/test_no_gevent_imports.pyosprey_async_worker/src/osprey/async_worker/tests/test_pigeon_client.pyosprey_async_worker/src/osprey/async_worker/tests/test_plugin_manager.pyosprey_async_worker/src/osprey/async_worker/tests/test_publisher.pyosprey_async_worker/test_data/input.jsonlosprey_async_worker/test_data/rules/main.smlosprey_async_worker/test_data/rules/rules/test_rule.smlosprey_coordinator/src/consumer/pubsub.rsosprey_coordinator/src/coordinator_metrics.rsosprey_coordinator/src/main.rsosprey_coordinator/src/osprey_bidirectional_stream.rsosprey_coordinator/src/pigeon/mod.rsosprey_coordinator/src/priority_queue.rsosprey_coordinator/src/pub_sub_streaming_pull/flow_control.rsosprey_coordinator/src/shutdown_handler.rsosprey_coordinator/src/sync_action_rpc.rsosprey_ui/src/components/event_stream/FeatureSelectModal.tsxosprey_worker/Dockerfileosprey_worker/src/osprey/engine/ast/grammar.pyosprey_worker/src/osprey/engine/executor/execution_context.pyosprey_worker/src/osprey/engine/executor/execution_visualizer.pyosprey_worker/src/osprey/engine/executor/executor.pyosprey_worker/src/osprey/engine/executor/external_service_utils.pyosprey_worker/src/osprey/engine/executor/external_service_utils_base.pyosprey_worker/src/osprey/engine/executor/node_executor/binary_operation_executor.pyosprey_worker/src/osprey/engine/executor/tests/test_get_verdicts_pb2_proto.pyosprey_worker/src/osprey/engine/executor/udf_execution_helpers.pyosprey_worker/src/osprey/engine/stdlib/configs/feature_flags_config.pyosprey_worker/src/osprey/engine/stdlib/udfs/categories.pyosprey_worker/src/osprey/engine/stdlib/udfs/count_regex_matches.pyosprey_worker/src/osprey/engine/stdlib/udfs/experiments.pyosprey_worker/src/osprey/engine/stdlib/udfs/json_data.pyosprey_worker/src/osprey/engine/stdlib/udfs/rules.pyosprey_worker/src/osprey/engine/stdlib/udfs/string.pyosprey_worker/src/osprey/engine/stdlib/udfs/tests/test_count_regex_matches.pyosprey_worker/src/osprey/engine/stdlib/udfs/tests/test_count_regex_matches/test_rejects_invalid_regex.txtosprey_worker/src/osprey/engine/stdlib/udfs/tests/test_entity.pyosprey_worker/src/osprey/engine/stdlib/udfs/tests/test_experiments.pyosprey_worker/src/osprey/engine/utils/types.pyosprey_worker/src/osprey/worker/_stdlibplugin/udf_register.pyosprey_worker/src/osprey/worker/adaptor/hookspecs/osprey_hooks.pyosprey_worker/src/osprey/worker/adaptor/plugin_manager.pyosprey_worker/src/osprey/worker/cli/sinks.pyosprey_worker/src/osprey/worker/lib/etcd/tests/test_watcher_mux.pyosprey_worker/src/osprey/worker/lib/etcd/watcher/_mux.pyosprey_worker/src/osprey/worker/lib/etcd/watcher/watcherd_impl.pyosprey_worker/src/osprey/worker/lib/instruments/__init__.pyosprey_worker/src/osprey/worker/lib/llm/__init__.pyosprey_worker/src/osprey/worker/lib/llm/base.pyosprey_worker/src/osprey/worker/lib/llm/loop.pyosprey_worker/src/osprey/worker/lib/llm/tests/__init__.pyosprey_worker/src/osprey/worker/lib/llm/tests/test_base.pyosprey_worker/src/osprey/worker/lib/llm/tests/test_loop.pyosprey_worker/src/osprey/worker/lib/llm/tests/test_tools.pyosprey_worker/src/osprey/worker/lib/llm/tools.pyosprey_worker/src/osprey/worker/lib/osprey_engine.pyosprey_worker/src/osprey/worker/lib/sources_provider.pyosprey_worker/src/osprey/worker/lib/sources_provider_base.pyosprey_worker/src/osprey/worker/lib/tests/test_sources_provider.pyosprey_worker/src/osprey/worker/sinks/sink/monitored_rules_metrics.pyosprey_worker/src/osprey/worker/sinks/sink/output_sink.pyosprey_worker/src/osprey/worker/sinks/sink/rules_sink.pyosprey_worker/src/osprey/worker/sinks/utils/acking_contexts.pyosprey_worker/src/osprey/worker/sinks/utils/acking_contexts_base.pypyproject.toml
💤 Files with no reviewable changes (1)
- osprey_worker/src/osprey/engine/stdlib/configs/feature_flags_config.py
| # 4. Store audit state for execute() to consume. | ||
| # Safe because WhenRules is synchronous (execute_async=False) and CallExecutor.execute() | ||
| # calls resolve_arguments() then execute() without yielding to the event loop. | ||
| self._failed_rule_names = failed_rule_names | ||
| self._then_failed = then_failed | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Locate CallExecutor and UDFBase implementations =="
fd -t f 'call_executor.py|base.py|executor.py|udf/base.py' .
echo
echo "== Inspect CallExecutor execute flow and resolve->execute ordering =="
rg -n -C4 'class CallExecutor|def execute\(|resolve_arguments\(|unresolved_arguments|UDFBase' .
echo
echo "== Inspect UDFBase for shared-state indicators (__slots__, caching, singleton patterns) =="
rg -n -C4 'class UDFBase|__slots__|`@lru_cache`|singleton|execute_async' .
echo
echo "== Inspect worker concurrency model hints (gevent/threading) =="
rg -n -C3 'gevent|greenlet|threading|ThreadPool|concurrent' osprey_workerRepository: roostorg/osprey
Length of output: 50372
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Locate the rules.py file ==="
find . -name rules.py -type f | grep -v __pycache__
echo
echo "=== Inspect WhenRules class definition and audit state usage ==="
rg -n 'class WhenRules|_failed_rule_names|_then_failed' --max-count=30 osprey_worker/src/osprey/engine/stdlib/udfs/
echo
echo "=== Inspect CallExecutor.execute() method signature and flow ==="
rg -n -A20 'class CallExecutor|def execute\(' osprey_worker/src/osprey/engine/executor/node_executor/call_executor.py | head -80
echo
echo "=== Search for WhenRules instantiation patterns (new/creation) ==="
rg -n 'WhenRules\(' osprey_worker --max-count=20
echo
echo "=== Check if UDFs are instantiated per-request or cached ==="
rg -n 'instantiate|create.*udf|_udf.*=' osprey_worker/src/osprey/engine/executor/node_executor/call_executor.py | head -20Repository: roostorg/osprey
Length of output: 5831
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check CallExecutor instantiation and lifecycle ==="
rg -n 'CallExecutor(' osprey_worker/src/osprey/engine/executor --max-count=20
echo
echo "=== Understand how udf_map is built and where UDFs come from ==="
rg -n 'ValidateCallKwargs|udf_map' osprey_worker/src/osprey/engine --max-count=15
echo
echo "=== Check if UDF instances are created per execution or reused ==="
rg -n 'class.*Executor.*execute\(|_udf.*=' osprey_worker/src/osprey/engine/executor/executor.py | head -30
echo
echo "=== Inspect the validation context to see if UDFs are instantiated ==="
fd -t f -e py 'validator' osprey_worker/src/osprey/engine/ast_validator | head -10Repository: roostorg/osprey
Length of output: 209
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check CallExecutor instantiation and lifecycle ==="
rg -n 'CallExecutor\(' osprey_worker/src/osprey/engine/executor --max-count=20
echo
echo "=== Understand how udf_map is built and where UDFs come from ==="
rg -n 'ValidateCallKwargs|udf_map' osprey_worker/src/osprey/engine --max-count=15
echo
echo "=== Check if UDF instances are created per execution or reused ==="
rg -n 'class.*Executor|def execute' osprey_worker/src/osprey/engine/executor/executor.py | head -30
echo
echo "=== Inspect the validation context to see if UDFs are instantiated ==="
fd -t f -e py 'validator' osprey_worker/src/osprey/engine/ast_validator | head -10
echo
echo "=== Search for where _udf is instantiated (created) ==="
rg -n '_udf\s*=' osprey_worker/src/osprey/engine/executor/node_executor/call_executor.pyRepository: roostorg/osprey
Length of output: 16335
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Inspect ValidateCallKwargs validator to see where UDF instances are created ==="
fd -t f 'validate_call_kwargs.py' osprey_worker/src/osprey/engine/ast_validator/validators/
echo
echo "=== Read the ValidateCallKwargs file ==="
wc -l osprey_worker/src/osprey/engine/ast_validator/validators/validate_call_kwargs.py
echo
echo "=== Look at UDF instantiation in ValidateCallKwargs ==="
rg -n 'def visit|UDFBase|instantiat|registry\.get' osprey_worker/src/osprey/engine/ast_validator/validators/validate_call_kwargs.py | head -40Repository: roostorg/osprey
Length of output: 685
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Read ValidateCallKwargs to understand UDF instance creation ==="
cat -n osprey_worker/src/osprey/engine/ast_validator/validators/validate_call_kwargs.py | head -100Repository: roostorg/osprey
Length of output: 5162
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Continue reading ValidateCallKwargs (lines 100-211) ==="
cat -n osprey_worker/src/osprey/engine/ast_validator/validators/validate_call_kwargs.py | sed -n '100,211p'Repository: roostorg/osprey
Length of output: 7003
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check when ValidateCallKwargs runs and if ValidatedSources are reused ==="
rg -n 'ValidatedSources|validate\(' osprey_worker/src/osprey/engine/executor/executor.py | head -30
echo
echo "=== Search for where ASTs are compiled/cached ==="
rg -n 'parse|compile|ValidatedSources\(' osprey_worker/src/osprey/worker/lib/osprey_engine.py | head -20
echo
echo "=== Check concurrency model in executor ==="
rg -n 'gevent|threading|asyncio|concurrent|Thread' osprey_worker/src/osprey/engine/executor/executor.py | head -20
echo
echo "=== Inspect the actual worker execution flow ==="
fd -t f 'osprey_engine.py' osprey_worker/src/osprey/worker/lib/ | xargs wc -lRepository: roostorg/osprey
Length of output: 137
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Look at where execute() is called in the main executor ==="
rg -n 'def execute\(' osprey_worker/src/osprey/engine/executor/executor.py
echo
echo "=== Check how execution graph is built and sources are validated ==="
rg -n 'validated_sources|execution_graph|ExecutionGraph' osprey_worker/src/osprey/engine/executor/executor.py | head -30
echo
echo "=== Check the concurrency model with gevent hints ==="
rg -n 'gevent|Greenlet|Pool' osprey_worker/src/osprey/engine/executor/executor.py | head -20
echo
echo "=== Inspect worker lifecycle - when parsing/validation happens ==="
rg -n 'def execute|validated_sources|parse' osprey_worker/src/osprey/worker/lib/osprey_engine.py | head -40Repository: roostorg/osprey
Length of output: 2916
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Read the complete WhenRules class around lines 166-193 ==="
sed -n '116,200p' osprey_worker/src/osprey/engine/stdlib/udfs/rules.py | cat -n
echo
echo "=== Check if there are any synchronization primitives (locks) used ==="
rg -n 'Lock|RLock|Semaphore|Event' osprey_worker/src/osprey/engine/stdlib/udfs/rules.pyRepository: roostorg/osprey
Length of output: 5033
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Check what execution_context.resolved() does ==="
rg -n 'def resolved\(' osprey_worker/src/osprey/engine/executor/execution_context.py
echo
echo "=== Check if execution_context has any gevent-aware operations ==="
fd -t f 'execution_context.py' osprey_worker/src/osprey/engine/executor/ | xargs wc -l
echo
echo "=== Check for any gevent-aware calls in resolve_arguments or execution context ==="
rg -n 'gevent|sleep|join|wait' osprey_worker/src/osprey/engine/executor/execution_context.pyRepository: roostorg/osprey
Length of output: 913
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Read execution_context.resolved() implementation ==="
sed -n '178,220p' osprey_worker/src/osprey/engine/executor/execution_context.py | cat -n
echo
echo "=== Check how CallExecutor fits into the executor flow and when greenlets yield ==="
rg -n -B5 -A5 'call_executor.execute\(\)|resolved_arguments.*=.*_udf.resolve' osprey_worker/src/osprey/engine/executor/executor.py | head -60
echo
echo "=== Look for gevent.sleep or other yields in the executor loop ==="
rg -n 'gevent.sleep\|greenlet.*spawn\|gevent.wait' osprey_worker/src/osprey/engine/executor/executor.py | head -20Repository: roostorg/osprey
Length of output: 2660
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Verify if CallExecutor instances are created fresh each execution or reused ==="
rg -n 'BaseNodeExecutor\|class.*Executor.*__init__' osprey_worker/src/osprey/engine/executor/node_executor/_base_node_executor.py | head -20
echo
echo "=== Check how execution graph creates executors (fresh vs reused) ==="
rg -n 'ExecutionGraph\|_executors\|executor.*=' osprey_worker/src/osprey/engine/executor/execution_graph.py | head -30
echo
echo "=== Confirm: same AST node = same executor = same UDF instance ==="
rg -n 'executor.node\|id\(node\)' osprey_worker/src/osprey/engine/executor/executor.py | head -20Repository: roostorg/osprey
Length of output: 143
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Search for execution graph and executor caching ==="
fd -t f 'execution_graph.py' osprey_worker/src/osprey/engine/executor/
echo
echo "=== Read how executors are managed in execution_graph ==="
head -100 osprey_worker/src/osprey/engine/executor/execution_graph.py | cat -n
echo
echo "=== Verify that CallExecutor uses SAME UDF instance across multiple rule executions ==="
grep -n 'class CallExecutor\|_udf.*=' osprey_worker/src/osprey/engine/executor/node_executor/call_executor.pyRepository: roostorg/osprey
Length of output: 5720
Move per-invocation audit state off self to prevent cross-contamination under gevent concurrency.
self._failed_rule_names and self._then_failed are set during resolve_arguments() and read during execute(). While a single CallExecutor.execute() call sequences these atomically, the same WhenRules instance is reused across multiple concurrent gevent greenlet executions. Greenlets can interleave between different invocations—if greenlet A calls resolve_arguments() (setting state), then greenlet B overwrites the state, greenlet A will read wrong values in execute().
Store audit state in the resolved arguments object instead. For example, extend WhenRulesArguments with failed_rule_names and then_failed fields, set them in resolve_arguments() alongside the other fields, and read them from arguments in execute() rather than from self.
Also applies to: lines 190–193
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@osprey_worker/src/osprey/engine/stdlib/udfs/rules.py` around lines 166 - 171,
Move the audit state variables from instance attributes to the arguments object
to avoid cross-contamination under gevent concurrency. Extend the
WhenRulesArguments class with new fields for failed_rule_names and then_failed,
then update the resolve_arguments method to assign these values to the arguments
object (e.g., arguments.failed_rule_names = failed_rule_names) instead of to
self._failed_rule_names and self._then_failed. Finally, update the execute
method (lines 190-193) to read these values from the arguments parameter
(arguments.failed_rule_names and arguments.then_failed) instead of from self.
Drops the pluggable LLM provider surface entirely: the worker lib/llm package (base, tool-calling loop, tools + tests), the register_llm_provider hookspec and bootstrap_llm_provider, the example Anthropic provider, docs/llm.md and its doc references, and the anthropic dependency (re-locked out of uv.lock).
…put) - etcd sources provider: dedup on the APPLIED hash and override mark_sources_applied so a failed compile self-heals on the next identical etcd re-delivery instead of wedging on stale rules (+ regression test). - Kafka input: manual offset commit after each processed batch (at-least-once) instead of auto-commit-before-processing that could lose actions. - coordinator stream: send a Nack when the context was marked nack instead of always acking; add BaseAckingContext.should_nack accessor. - async CLI: surface a fatal sink-task error instead of reporting clean shutdown; fall back to the stdout sink (with a warning) when --with-plugins finds no registered async output sink.
The normal ack path consulted should_nack, but the shutdown / rule-reload / uptime graceful-disconnect branches still always-acked. Compute ack/verdicts once after the yield and use it on every finalize path, so a nacked action is nacked regardless of which path finalizes it. Adds a graceful-disconnect nack test.
- AsyncKafkaInputStream funnels poll/commit/close through a single worker thread (ThreadPoolExecutor max_workers=1) so stop()'s close() can never race an in-flight poll() on the thread-unsafe kafka-python consumer (+ regression test that blocks in poll while stop() is requested). - test_discovery_pool_creates_channels: make async so grpc.aio.insecure_channel binds to a running loop (was order-dependent 'no running event loop'). - coordinator stop() tests: provide an AsyncMock channel pool so stop()'s channel_pool.close() is awaited (and assert it).
On SIGTERM/SIGINT, stop the input stream first so the in-flight action can finalize through the stream's own shutdown path (the coordinator stream acks/nacks and graceful-disconnects after the current yield), then drain the sink task with a bounded timeout, cancelling only as a fallback. Previously the sink task was cancelled outright, which could skip the finalize block and leave an in-flight action unacked.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py`:
- Line 158: The call to poll_entered.wait() in the run_in_executor block at line
158 does not check or assert the return value, which means if the wait times out
the test will continue without verifying that the poll actually started. Capture
the boolean return value from the poll_entered.wait(2.0) call and assert that it
is True to ensure the synchronization point was reached before the timeout,
confirming an in-flight poll existed before proceeding with the stop-path
assertion.
- Around line 34-40: Replace the type: ignore comment and type violations with a
Protocol-based approach. Create a Protocol class that defines the interface for
the stream object with a _stopped attribute, then change the self.stream type
annotation from object to use this Protocol instead of object. Remove the #
type: ignore[attr-defined] comment from the poll method where
self.stream._stopped is accessed. Additionally, locate any Sequence[Any] type
annotations in the file (specifically around line 139) and replace Any with the
appropriate concrete type or a Protocol definition to comply with the
disallow_any_generics mypy configuration.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: cd8bc8bb-dd6b-4fc0-81dc-2c2e8ec0cfa8
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (13)
docker-compose.yamldocs/development/README.mdexample_plugins/pyproject.tomlosprey_async_worker/src/osprey/async_worker/cli/main.pyosprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.pyosprey_async_worker/src/osprey/async_worker/lib/etcd/sources_provider.pyosprey_async_worker/src/osprey/async_worker/sinks/sink/input_stream.pyosprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.pyosprey_async_worker/src/osprey/async_worker/tests/test_etcd_sources_provider.pyosprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.pyosprey_worker/src/osprey/worker/adaptor/hookspecs/osprey_hooks.pyosprey_worker/src/osprey/worker/adaptor/plugin_manager.pyosprey_worker/src/osprey/worker/sinks/utils/acking_contexts_base.py
💤 Files with no reviewable changes (3)
- osprey_worker/src/osprey/worker/adaptor/hookspecs/osprey_hooks.py
- example_plugins/pyproject.toml
- osprey_worker/src/osprey/worker/adaptor/plugin_manager.py
✅ Files skipped from review due to trivial changes (1)
- docs/development/README.md
🚧 Files skipped from review as they are similar to previous changes (6)
- osprey_worker/src/osprey/worker/sinks/utils/acking_contexts_base.py
- osprey_async_worker/src/osprey/async_worker/sinks/sink/input_stream.py
- docker-compose.yaml
- osprey_async_worker/src/osprey/async_worker/cli/main.py
- osprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.py
- osprey_async_worker/src/osprey/async_worker/lib/etcd/sources_provider.py
| self.stream: object = None # set by the test to enable auto-stop | ||
|
|
||
| def poll(self, timeout_ms: int = 0, max_records: int = 0) -> Mapping[str, Sequence[_FakeRecord]]: | ||
| if self._batches: | ||
| return {'tp-0': self._batches.pop(0)} | ||
| if self.stream is not None: | ||
| self.stream._stopped = True # type: ignore[attr-defined] |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Verify repository mypy typing constraints relevant to Any / ignores.
fd -a '^pyproject.toml$' | xargs -r rg -n '^\[tool\.mypy\]|disallow.*any|warn.*ignore|strict'
# Verify introduced Any / type-ignore occurrences in this changed test file.
rg -n --type=py '\bAny\b|#\s*type:\s*ignore' osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.pyRepository: roostorg/osprey
Length of output: 990
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Read the test file to see context around lines 34-40 and 139
echo "=== Context around lines 34-40 ==="
sed -n '25,50p' osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py
echo -e "\n=== Context around lines 139 ==="
sed -n '130,150p' osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py
echo -e "\n=== Full imports and class definitions ==="
head -n 50 osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.pyRepository: roostorg/osprey
Length of output: 3323
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check git status and recent changes to the test file
echo "=== Git diff for the test file ==="
git diff HEAD -- osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py | head -200
echo -e "\n=== Check if there are staged changes ==="
git diff --cached -- osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py | head -200Repository: roostorg/osprey
Length of output: 137
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Check git log for recent changes to this file
echo "=== Recent commits touching the test file ==="
git log --oneline -20 -- osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py
echo -e "\n=== Show the last commit that touched this file ==="
git log -1 -p -- osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py | head -300Repository: roostorg/osprey
Length of output: 7953
Address mypy violations: replace Sequence[Any] and # type: ignore with proper typing
This new test file introduces typing escapes at lines 40 and 139 that conflict with the repository's mypy configuration (disallow_any_generics = true):
- Line 40:
# type: ignore[attr-defined]suppresses a valid type error instead of using proper typing - Line 139:
Sequence[Any]violatesdisallow_any_genericssinceAnyis used in a generic type signature
Follow the coding guideline to avoid new # type: ignore and Any. Replace these with a Protocol-based approach and object type:
Suggested cleanup
-from typing import Any, List, Mapping, Sequence
+from typing import List, Mapping, Protocol, Sequence
+
+
+class _SupportsStoppedFlag(Protocol):
+ _stopped: bool
@@
- self.stream: object = None # set by the test to enable auto-stop
+ self.stream: _SupportsStoppedFlag | None = None # set by the test to enable auto-stop
@@
- self.stream._stopped = True # type: ignore[attr-defined]
+ self.stream._stopped = True
@@
- def poll(self, timeout_ms: int = 0, max_records: int = 0) -> Mapping[str, Sequence[Any]]:
+ def poll(self, timeout_ms: int = 0, max_records: int = 0) -> Mapping[str, Sequence[object]]:🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py`
around lines 34 - 40, Replace the type: ignore comment and type violations with
a Protocol-based approach. Create a Protocol class that defines the interface
for the stream object with a _stopped attribute, then change the self.stream
type annotation from object to use this Protocol instead of object. Remove the #
type: ignore[attr-defined] comment from the poll method where
self.stream._stopped is accessed. Additionally, locate any Sequence[Any] type
annotations in the file (specifically around line 139) and replace Any with the
appropriate concrete type or a Protocol definition to comply with the
disallow_any_generics mypy configuration.
Source: Coding guidelines
| anext_task = asyncio.create_task(stream.__aiter__().__anext__()) | ||
|
|
||
| # Wait until poll() is actually executing on the consumer's worker thread. | ||
| await asyncio.get_running_loop().run_in_executor(None, poll_entered.wait, 2.0) |
There was a problem hiding this comment.
Assert poll-start synchronization explicitly to avoid false-pass behavior
Line 158 drops the boolean result of poll_entered.wait(...). If the wait times out, the test can continue without proving an in-flight poll existed before the stop-path assertion.
Suggested fix
- await asyncio.get_running_loop().run_in_executor(None, poll_entered.wait, 2.0)
+ started = await asyncio.get_running_loop().run_in_executor(None, poll_entered.wait, 2.0)
+ assert started, 'poll() did not start within timeout'🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py`
at line 158, The call to poll_entered.wait() in the run_in_executor block at
line 158 does not check or assert the return value, which means if the wait
times out the test will continue without verifying that the poll actually
started. Capture the boolean return value from the poll_entered.wait(2.0) call
and assert that it is True to ensure the synchronization point was reached
before the timeout, confirming an in-flight poll existed before proceeding with
the stop-path assertion.
The graceful drain timeout only covered awaiting the sink task; rules_sink.stop() ran outside it, so a slow/stuck consumer.close() (Kafka) could hang shutdown indefinitely. Run stop() + drain together under one wait_for budget, cancelling both the drain and the sink task if it overruns.
4.13+ switched its metadata to the SPDX id PSF-2.0, which the allow-list doesn't recognize (it lists the equivalent Python-2.0 / Python Software Foundation License classifier). 4.12.2 still reports the allowed classifier and satisfies pytest-asyncio 1.4.0's typing-extensions>=4.12 floor, so pytest-asyncio stays at 1.4.0 (0.23.x is incompatible with pytest 9).
example_plugins/src/tests/test_register_async_plugins.py imports osprey.async_worker, which the worker image deliberately excludes (--no-install-package osprey-async-worker). In the integration-tests container that import is unresolvable, so it aborted collection of the whole suite. Relocate it to the async worker's own test package, where osprey.async_worker is available.
haileyok
left a comment
There was a problem hiding this comment.
i most closely looked at things that touch files still relevant to gevent, and those changes seem fine. the async code runs well against bluesky firehose (ran for ~24 hours without problems) and is quite a bit more efficient than gevnt which is cool.
we'll want to add some docs about what this is and how to use it, but esp. since this is still experimental we can do that in a followup.
What this is
A themed series re-expressing worker/engine/coordinator improvements on top of
main, intended for upstreaming. The headline is an experimental asyncio-native worker (an alternative to the gevent worker), plus a metrics/observability pass and several independently-useful engine and coordinator improvements. Deployment-specific references have been removed so the diff reflects only generally-applicable changes.Commits
Upstream-ready series— the themed body of the contribution (see Highlights below).async worker: self-contained consistent-hash ring for discovery— the async worker's etcd-backed discovery originally imported a Discord-internalhash_ringpackage (a compiled C extension that isn't part of open-source osprey). This commit replaces it with a small, stdlib-only consistent-hash ring (osprey_async_worker/.../lib/discovery/hash_ring.py; MD5 virtual points) that is a drop-in for the API discovery uses —HashRingNode(name, num_replicas),HashRing(n).add_nodes(...),find_node(key) -> name,find_nodes(key, count) -> names. The async worker now has no external or private dependency for discovery.Highlights
ProcessActionresponse,render_graph(engine=...),external_service_utils_baseextraction for reuse by alternate executors, safe division inbinary_operation_executor.count_regex_matchesUDF;coerce_typedefaults toTrueforJsonData/EntityJson.sources_provider_baseextraction shared across worker variants.Unavailablewhile draining; nack queued actions on shutdown; configurable pubsub lease-extension durations.worker_typetags, additional coordinator counters.osprey_async_workerpackage (executor, engine, input/output/rules sinks, coordinator input stream over a bidirectional gRPC stream, etcd sources provider, async MX-lookup stdlib UDF, and a plugin hook). Recompiles off the event loop; breaks AST refcycles on source swap.register_llm_providerhook; an example Anthropic provider; docs.Notes
main, so they are battle-tested in practice.mainvia 3-way merge (not wholesale file replacement), so files changed concurrently upstream keep their versions.Summary by CodeRabbit
New Features
Bug Fixes