Skip to content

Experimental asyncio-native worker + metrics & engine/coordinator improvements#341

Merged
cmttt merged 25 commits into
roostorg:mainfrom
cmttt:roostorg-main-snapshot
Jun 25, 2026
Merged

Experimental asyncio-native worker + metrics & engine/coordinator improvements#341
cmttt merged 25 commits into
roostorg:mainfrom
cmttt:roostorg-main-snapshot

Conversation

@cmttt

@cmttt cmttt commented Jun 6, 2026

Copy link
Copy Markdown
Collaborator

What this is

A themed series re-expressing worker/engine/coordinator improvements on top of main, intended for upstreaming. The headline is an experimental asyncio-native worker (an alternative to the gevent worker), plus a metrics/observability pass and several independently-useful engine and coordinator improvements. Deployment-specific references have been removed so the diff reflects only generally-applicable changes.

Commits

  1. Upstream-ready series — the themed body of the contribution (see Highlights below).
  2. async worker: self-contained consistent-hash ring for discovery — the async worker's etcd-backed discovery originally imported a Discord-internal hash_ring package (a compiled C extension that isn't part of open-source osprey). This commit replaces it with a small, stdlib-only consistent-hash ring (osprey_async_worker/.../lib/discovery/hash_ring.py; MD5 virtual points) that is a drop-in for the API discovery uses — HashRingNode(name, num_replicas), HashRing(n).add_nodes(...), find_node(key) -> name, find_nodes(key, count) -> names. The async worker now has no external or private dependency for discovery.

Highlights

  1. Engine executor improvements — verdict synthesis in the sync ProcessAction response, render_graph(engine=...), external_service_utils_base extraction for reuse by alternate executors, safe division in binary_operation_executor.
  2. Stdlib UDF additions & fixescount_regex_matches UDF; coerce_type defaults to True for JsonData/EntityJson.
  3. CountOver query UDF + Druid SQL translator — sliding-window burst detection lowered to Druid SQL (window-function vs self-join engines; SCAN/TIMESERIES/top_n output modes).
  4. Self-heal etcd source reloads — dedup on the applied config hash so a dropped/failed apply self-heals on the next identical event; watch-mux dedup; sources_provider_base extraction shared across worker variants.
  5. Coordinator: graceful shutdown, sync-RPC fast-reject, configurable leases — fast-reject new sync RPCs with Unavailable while draining; nack queued actions on shutdown; configurable pubsub lease-extension durations.
  6. Metrics & observability — rule audit logging, enforcement-gap metric, monitored-rules metrics, worker_type tags, additional coordinator counters.
  7. Experimental asyncio-native worker — a new osprey_async_worker package (executor, engine, input/output/rules sinks, coordinator input stream over a bidirectional gRPC stream, etcd sources provider, async MX-lookup stdlib UDF, and a plugin hook). Recompiles off the event loop; breaks AST refcycles on source swap.
  8. Pluggable LLM provider hook — a provider-agnostic base + tool-calling loop with a register_llm_provider hook; an example Anthropic provider; docs.

Notes

  • The async worker is self-contained — no external or private dependencies (see commit 2 re: discovery ring).
  • These changes have been running in production on top of main, so they are battle-tested in practice.
  • Each theme was produced by applying changes onto main via 3-way merge (not wholesale file replacement), so files changed concurrently upstream keep their versions.

Summary by CodeRabbit

  • New Features

    • Added an experimental async worker with its own CLI, container image, and compose service.
    • Introduced async plugin support, including async UDFs, output sinks, and discovery hooks.
    • Added async input sources for files, Kafka, and coordinator streams.
    • New async rules execution path now supports benchmarking and concurrent processing.
  • Bug Fixes

    • Improved shutdown handling and source-update deduplication.
    • Added safer external-service caching and better error handling.
    • Updated coordinator/worker metrics and routing behavior for more reliable operation.

@cmttt cmttt requested review from a team, EXBreder, ayubun, haileyok and vinaysrao1 as code owners June 6, 2026 20:45
@coderabbitai

coderabbitai Bot commented Jun 6, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

Warning

Review limit reached

@haileyok, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 20 minutes and 47 seconds. Learn how PR review limits work.

Your organization has used up its prepaid credits, and credit purchases are no longer available. Enable the review add-on in the billing tab to keep reviews running — you're only billed for reviews past your plan's rate limits ($0.25/file).

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based credits.

🚦 How do rate limits work?

CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability.

For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: fadb651c-f4b5-4cfc-a958-dd38ac4bc3a0

📥 Commits

Reviewing files that changed from the base of the PR and between 0180e36 and 6ac0694.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (2)
  • osprey_async_worker/src/osprey/async_worker/tests/test_register_async_plugins.py
  • pyproject.toml
📝 Walkthrough

Walkthrough

This PR adds async worker packaging and runtime support, shared worker runtime changes, coordinator shutdown/service-name wiring, and a UI checkbox update. It also updates documentation, workspace configuration, example data, and test coverage across the async and sync worker paths.

Changes

Async worker, shared runtime, stdlib, and coordinator updates

Layer / File(s) Summary
Async packaging, examples, and docs
pyproject.toml, docs/development/*, entrypoint.sh, example_plugins/*, osprey_async_worker/{Dockerfile,pyproject.toml,example_rules/*,test_data/*}, docker-compose.async-worker.yaml
uv workspace config, async-worker packaging, example plugins and rules, and contributor docs are updated together.
Async plugin contracts
osprey_async_worker/src/osprey/async_worker/{adaptor/*,stdlib_udfs/*,tests/test_plugin_manager.py}
Async hook specs, async UDF interfaces, plugin bootstrap, the async MXLookup override, and plugin-manager tests are added.
Async CLI and execution runtime
osprey_async_worker/src/osprey/async_worker/{cli/main.py,engine.py,executor.py,singletons.py,sinks/sink/*}
The async CLI, engine, executor, singleton bootstrap, and core input/output/rules streams are wired together.
Async discovery and service clients
osprey_async_worker/src/osprey/async_worker/lib/{discovery/*,etcd/sources_provider.py,external_service.py,pigeon/*,publisher.py,coordinator_input_stream.py}, osprey_async_worker/src/osprey/async_worker/tests/{test_external_service.py,test_pigeon_client.py,test_publisher.py,test_coordinator_input_stream.py,test_etcd_sources_provider.py}
Async discovery, routing, external-service, Pub/Sub, and coordinator-stream clients are added with supporting tests.
Async worker test coverage
osprey_async_worker/src/osprey/async_worker/tests/{conftest.py,test_async_executor.py,test_async_sinks.py,test_engine.py,test_kafka_input_stream.py,test_no_gevent_imports.py}
Async worker fixtures and tests cover execution, sink fan-out, engine recompilation, Kafka input handling, and gevent import checks.
Shared worker runtime and plugin support
osprey_worker/Dockerfile, osprey_worker/src/osprey/{adaptor/*,engine/executor/*,lib/{etcd/{watcher/*,tests/test_watcher_mux.py},tests/test_sources_provider.py,instruments/*,osprey_engine.py,sources_provider*.py},sinks/{sink/*,utils/*},cli/sinks.py}
Shared worker source-provider, execution-context/result, ack-context, validation-exporter, watcher, sink, and sync-worker Dockerfile plumbing is updated.
Sync stdlib behavior and fixes
osprey_worker/src/osprey/engine/{ast/grammar.py,executor/{node_executor/*,tests/test_get_verdicts_pb2_proto.py},stdlib/{configs/*,udfs/*,udfs/tests/*},utils/types.py}, osprey_worker/src/osprey/worker/_stdlibplugin/udf_register.py
Sync stdlib UDF behavior, string UDF docstrings, and registration/tests are updated, along with utility and watcher fixes.
Coordinator service naming and shutdown handling
osprey_coordinator/src/{consumer/pubsub.rs,coordinator_metrics.rs,main.rs,osprey_bidirectional_stream.rs,pigeon/mod.rs,priority_queue.rs,pub_sub_streaming_pull/flow_control.rs,shutdown_handler.rs,sync_action_rpc.rs}
Coordinator Pub/Sub tuning, service naming, shutdown signaling, queue draining, and bidi stream counters are updated.

Feature selection checkbox update

Layer / File(s) Summary
Feature checkbox selection
osprey_ui/src/components/event_stream/FeatureSelectModal.tsx
The checkbox checked state uses selectedFeatures.has(feature) directly, and the helper fallback is removed.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Suggested labels

documentation

Suggested reviewers

  • EXBreder
  • vinaysrao1
  • haileyok
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 32.74% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title captures the experimental async worker and the broader engine/coordinator/metrics work reflected in the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands.

cmttt and others added 3 commits June 8, 2026 17:33
…oordinator improvements (#60)

* Engine executor improvements

- execution_context: synthesize entity verdicts from LabelEffects in the sync
  ProcessAction response so callers see verdicts without a separate lookup
- execution_visualizer: accept the engine as a parameter to render_graph
- extract external_service_utils_base for reuse by alternate executors
- binary_operation_executor: safe division to avoid ZeroDivisionError in rules
- minor grammar / executor / type cleanups

* Stdlib UDF additions and fixes

- add count_regex_matches UDF (count regex matches in a string) + tests
- coerce_type defaults to True for JsonData/EntityJson string parsing
- string / categories / experiments / rules UDF cleanups
- remove dead WEBHOOKS_USE_PUBSUB feature-flag constant

* Add CountOver query UDF and Druid SQL translator

- CountOver QueryUdfBase for sliding-window burst detection over Druid
- validate_count_over: reject nested/multiple/invalid CountOver usages
- ast_druid_translator: lower CountOver to SQL with selectable engine
  (window functions vs self-join) and output modes (SCAN/TIMESERIES/top_n);
  interpolate a caller-supplied datasource name into the FROM clause
- minor FeatureSelectModal cleanup

* Self-heal etcd source reloads and harden the watch mux

- dedup applied config on the hash actually applied, not the received hash,
  so a dropped/failed apply self-heals on the next identical etcd event
- WatchMux: dedup duplicate watch responses
- watcherd: downgrade a noisy reconnect log to debug
- extract sources_provider_base to share reload logic across worker variants

* Coordinator: graceful shutdown, sync-RPC fast-reject, configurable leases

- fast-reject new sync RPCs with Unavailable once the pod is draining so the
  client retries on a healthier pod instead of timing out
- nack queued sync + async actions on shutdown; extend the drain window to 30s
- make pubsub lease-extension durations configurable; default the minimum to 30s

* Metrics and observability improvements

- instruments: rule audit logging and an enforcement-gap metric
- monitored_rules_metrics: emit per-rule metrics from the label output sink
- tag worker metrics with worker_type; instrument the multi-output sink
- coordinator_metrics: additional counters for sync/async paths

* Add experimental asyncio-native worker

An alternative worker built on asyncio instead of gevent. Runs the rules
engine, input stream, sinks, and publisher on the event loop, and talks to
the coordinator over a bidirectional gRPC stream (pigeon client).

- osprey_async_worker package: executor, engine, sinks (input/output/rules),
  coordinator input stream, etcd sources provider, async MX-lookup stdlib UDF,
  and a first-party plugin hook to bootstrap async UDFs
- self-contained pigeon client with vendored exceptions/interceptors so the
  package does not depend on worker-internal gRPC plumbing
- break AST refcycles on source swap; recompile off the event loop
- extract acking_contexts_base so sync and async workers share ack handling
- coordinator: bidirectional stream + pigeon module support
- wire osprey_async_worker into the uv workspace; enable pytest asyncio_mode

* Add pluggable LLM provider hook with tool calling

- osprey.worker.lib.llm: provider-agnostic base, a tool-calling loop, and
  tool definitions, with a register_llm_provider plugin hook
- example_plugins: Anthropic provider implementation + tests
- docs: LLM integration guide
The async worker's etcd-backed discovery imported an external `hash_ring`
package (a compiled C extension) that isn't part of open-source osprey. Replace
it with a small, stdlib-only consistent-hash ring (MD5 virtual points) that is a
drop-in for the API discovery uses — HashRingNode(name, num_replicas),
HashRing(n).add_nodes(...), find_node(key)->name, find_nodes(key, count)->names.

This makes the async worker self-contained and buildable without any private
dependency. Placement is an internal detail (only fleet self-consistency
matters), so an exact match to any prior implementation is unnecessary.
- osprey_worker/Dockerfile: ADD the osprey_async_worker workspace member
  (pyproject + source) so `uv sync --locked` resolves the full 192-package
  workspace. Without it the locked sync rejected the lockfile and
  integration-tests failed at the Docker build stage.
- ruff / ruff-format: fix E402 import ordering (execution_context, async
  pigeon client) and format the re-expressed code to upstream style.
- mypy: type-correctness across the async worker, engine, and external
  service helpers (annotations / isinstance-narrowing / casts only; no
  behavior change).
- parse_int: add the missing UdfCategories.CAST member it references
  (was an import-time AttributeError).
- fawltydeps: ignore pytest-asyncio (a pytest plugin used via marker, not
  imported).
@cmttt cmttt force-pushed the roostorg-main-snapshot branch from 1fe672a to 958592c Compare June 8, 2026 19:43
cmttt and others added 5 commits June 8, 2026 19:54
JsonData's `coerce_type` now defaults to True (12e8ed2), so `Entity[str]`
reading an int coerces (123 -> "123") instead of raising InvalidJsonType.
This upstream strict-type-check test relied on the old False default and
expected 2 errors. Make it opt into `coerce_type=False` so it still
verifies strict type checking (both A and B mismatch -> 2 errors); the
coercion path is covered separately by test_entity_can_coerce_type.
Keep the stable gevent worker image free of the experimental asyncio worker,
while giving users an opt-in way to run it.

- osprey_worker/Dockerfile (gevent, stable): keep osprey_async_worker/pyproject.toml
  present (required for `uv sync --locked` to validate the workspace lockfile) but
  exclude it from install via `--no-install-package osprey-async-worker` and stop
  copying its source. `osprey.async_worker` is then not importable, so
  execution_context's optional async import cleanly falls back to None — i.e. this
  image behaves exactly as it did before the asyncio worker existed.
- osprey_async_worker/Dockerfile (NEW, EXPERIMENTAL): installs the full workspace
  (the only image that does) and defaults to the asyncio worker. Phase-0: static/
  JSONL input + stdout sink; no Kafka/coordinator input wired upstream yet.
- entrypoint.sh: add an `osprey-async-worker` command.
- docker-compose: add an `osprey-async-worker` service gated behind the
  `experimental` profile, so it is never built or started by default (or by the
  `test` profile used in CI).
…he box

The Phase-0 asyncio worker uses the stdlib-only engine, but the entrypoint and
compose default pointed at ./example_rules, which needs plugin UDFs
(TextContains, BanUser) the stdlib engine doesn't provide — so the image failed
rule validation out of the box.

- Add osprey_async_worker/example_rules/: a stdlib-only ruleset (LongMessage —
  message text > 100 chars) + minimal config + sample_actions.jsonl.
- entrypoint.sh: default OSPREY_RULES_PATH to the bundled example and support an
  optional OSPREY_INPUT_FILE (JSONL actions).
- docker-compose: point the experimental osprey-async-worker service at the
  bundled rules + sample actions.

Verified: `docker run` of the async image with no mounts boots, compiles the
ruleset, and processes both sample actions to the stdout sink (LongMessage
false/true), then exits cleanly.
The asyncio image was based on the gevent worker's Dockerfile and inherited
ENV/EXPOSE lines that don't apply to the Phase-0 async worker:

- ETCD_PEERS: the async worker reads rules from --rules-path and never reads
  ETCD_PEERS (confirmed unused anywhere in osprey_async_worker). The
  http://etcd:2379 default also only resolves inside the compose network.
- PORT + EXPOSE: the worker is a CLI runner (static/JSONL input -> stdout sink);
  it does not bind a port.

Keep PYTHONPATH (image-structural) and the jemalloc allocator tuning. Actual
runtime config (rules path, input file, DD flags) is supplied at run time via
compose / `docker run -e`, not baked into the image.
@julietshen

Copy link
Copy Markdown
Member

Oooh thank you for the PR! This is great, I know gevent has caused headaches and been the topic of many discussions in the past. I'm also especially interested in the pluggable LLM provider hook :)

@juanmrad do you think you'd be able to give this a review?

@julietshen

Copy link
Copy Markdown
Member

Also @cmttt any chance you can break this into smaller PRs to make it easier to review?

cmttt and others added 6 commits June 15, 2026 07:09
…root_cache before nulling AST parents

_break_old_graph_cycles nulls `parent` back-pointers on a discarded graph's AST
nodes so refcount can reclaim it without waiting for gen-2 GC. But those Root
objects are memoized in the module-level, never-evicted parsed_ast_root_cache
(keyed by Source content). When a file's exact content RECURS -- e.g. a rule
revert -- the cache hands back the same parent-nulled Root, and validation fails
with "`Rule(...)` must be assigned to a variable" (the Rule call's parent Assign
is gone), failing the recompile and wedging the worker on stale rules until restart.

Fix: pop the source from parsed_ast_root_cache before nulling its parents, so any
future recurrence of that content re-parses a fresh, intact Root.

Adds a regression test that fails without the eviction. Reproduced + validated at
the AST level: null-only => re-parse cache-hit, Rule parent None (corrupt);
evict+null => re-parse cache-miss, Rule parent Assign (ok).
…compile

Adds _freeze_resident_graph() (gc.collect(); gc.freeze()), called at boot and after
each recompile's swap + cycle-break. Moves the resident rule graph + long-lived objects
into the permanent GC generation so per-message gen-2 collections stay cheap as rules
recompile -- otherwise refcycle-held AST objects (parsed_ast_root_cache) accumulate as
permanent gen-2 survivors and per-message GC CPU climbs with uptime. On by default.

Graduates the freeze from the smite-side worker (a polling background loop) into the
engine, inline + serialized with the recompile path. Updates the now-stale
"we deliberately do NOT call gc.collect()" comment.
Drops the experimental CountOver sliding-window burst-detection feature:
the count_over/validate_count_over UDFs, the SQL-lowering paths in
DruidQueryTransformer, the parse_query_filter SQL routing, and their
tests/snapshots. Reverts the touched engine files to their main state.
Adds AsyncKafkaInputStream, mirroring the gevent KafkaInputStream envelope
decode but polling kafka-python off the event loop via asyncio.to_thread
(no gevent, no new dependency). Wires it into osprey-async-cli run behind
--input-source kafka with --kafka-* options, so the async worker can consume
an Osprey-format topic directly (e.g. from at-kafka) without a coordinator.
Wire osprey-async-cli run --with-plugins to the async plugin system
(bootstrap_async_udfs / _ast_validators / _output_sinks + AsyncOspreyEngine)
instead of the sync gevent engine, removing the engine type-mismatch cast.

Add a reference async plugin in example_plugins, registered under the
osprey_async_plugin entry-point group: the pure-computation TextContains UDF
(runs inline in the async executor) and an example AsyncBaseOutputSink, plus
tests. This makes the asyncio worker's example plugins actually available.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Note

Due to the large number of review comments, Critical severity comments were prioritized as inline comments.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
osprey_coordinator/src/priority_queue.rs (1)

214-223: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Stop logging full action payloads when nack send fails.

println!("... {:?}", action.action) can expose action data/secret data in logs, and this helper now covers both sync and async drains.

Suggested fix
-                    Err(_) => println!(
-                        "tried to nack {:?} and the nacking receiver was dropped",
-                        action.action
-                    ),
+                    Err(_) => tracing::warn!(
+                        ack_id = action.action.ack_id,
+                        action_id = action.action.action_id,
+                        "failed to nack action because the receiver was dropped"
+                    ),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_coordinator/src/priority_queue.rs` around lines 214 - 223, The
nack_all function currently logs the full action payload using println! with
debug formatting on action.action when the nack send fails, which can expose
sensitive data in logs. Remove the action.action payload from the error log
message in the Err branch of the match statement where
action.acking_oneshot_sender.send(AckOrNack::Nack) is called, and replace it
with a generic error message that does not include any action details or payload
information.
osprey_coordinator/src/osprey_bidirectional_stream.rs (1)

89-97: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Increment bidi_acks_received only after parsing a valid ack payload.

Right now the counter increments before ack_or_nack validation, so malformed messages are counted as received acks.

Suggested fix
-            metrics.bidi_acks_received.incr();
             let duration = Instant::now().duration_since(state.send_time);
             metrics.action_outstanding_duration.record(duration);
-            state.action_acker.ack_or_nack(
-                ack_or_nack
-                    .ack_or_nack
-                    .context("no `ack_or_nack` in proto")?,
-            );
+            let ack_or_nack = ack_or_nack
+                .ack_or_nack
+                .context("no `ack_or_nack` in proto")?;
+            state.action_acker.ack_or_nack(ack_or_nack);
+            metrics.bidi_acks_received.incr();
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_coordinator/src/osprey_bidirectional_stream.rs` around lines 89 - 97,
The metrics.bidi_acks_received counter is being incremented before the
ack_or_nack payload is validated, which means malformed messages that fail the
context check will still be counted. Move the metrics.bidi_acks_received.incr()
call to execute only after the ack_or_nack validation succeeds (after the
context("no `ack_or_nack` in proto")? check completes successfully), ensuring
that only valid ack payloads are counted in the metrics.
osprey_coordinator/src/pub_sub_streaming_pull/flow_control.rs (1)

70-77: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update default value in the doc comment to match runtime behavior.

The doc says default min lease extension is 10s, but FlowControl::new() now defaults to 30s.

Suggested fix
-    /// Default: (min = 10 seconds, max = 600 seconds)
+    /// Default: (min = 30 seconds, max = 600 seconds)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_coordinator/src/pub_sub_streaming_pull/flow_control.rs` around lines
70 - 77, The documentation comment for the lease extension settings on the
FlowControl struct states a default minimum of 10 seconds in the Default
section, but the actual implementation in FlowControl::new() sets the default
minimum to 30 seconds. Update the doc comment's Default line to reflect the
actual default minimum value of 30 seconds to match the runtime behavior
implemented in FlowControl::new().
osprey_worker/src/osprey/worker/lib/sources_provider.py (1)

60-71: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Ensure paused input streams are always resumed when watcher callbacks fail.

At Line 67, an exception from self._sources_watcher_callback() exits before the resume block (Lines 69-71), which can leave ingestion paused until restart. Wrap update/callback in try/finally so resume is guaranteed.

Proposed fix
         if self._input_stream_ready_signaler is not None:
             logging.info('Pausing input streams')
             self._input_stream_ready_signaler.pause_input_stream()
             self._input_stream_ready_signaler.wait_for_input_stream_to_pause()
 
-        self._current_sources = new_sources
-        if self._sources_watcher_callback:
-            self._sources_watcher_callback()
-
-        if self._input_stream_ready_signaler is not None:
-            logging.info('Restarting input streams')
-            self._input_stream_ready_signaler.resume_input_stream()
+        try:
+            self._current_sources = new_sources
+            if self._sources_watcher_callback:
+                self._sources_watcher_callback()
+        finally:
+            if self._input_stream_ready_signaler is not None:
+                logging.info('Restarting input streams')
+                self._input_stream_ready_signaler.resume_input_stream()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/src/osprey/worker/lib/sources_provider.py` around lines 60 -
71, The resume_input_stream call in the _input_stream_ready_signaler block is
not guaranteed to execute if self._sources_watcher_callback() raises an
exception, leaving input streams paused indefinitely. Refactor the code by
wrapping the self._current_sources assignment and
self._sources_watcher_callback() invocation in a try block, and move the input
stream resume block (the logging and resume_input_stream call) into a
corresponding finally block to ensure the streams are always resumed regardless
of callback failures.
osprey_async_worker/src/osprey/async_worker/tests/test_external_service.py (1)

201-222: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add regression tests for duplicate-key batches and cancellation cleanup.

Please add tests for:

  1. batch_get(['a', 'a']) duplicate-key behavior, and
  2. cancellation during in-flight fetch ensuring cache doesn’t retain unresolved futures.

As per coding guidelines (AGENTS.md), “New behavior requires a test; bug fixes require a regression test.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/tests/test_external_service.py`
around lines 201 - 222, Add two new test functions after
test_batch_get_uses_cache to cover missing regression cases. First, create a
test that calls batch_get with duplicate keys (e.g., ['a', 'a']) to verify the
behavior when the same key appears multiple times in a single batch request.
Second, create a test that triggers cancellation of an in-flight batch_get
request to ensure the cache properly cleans up and does not retain unresolved
futures from the cancelled operation. Both tests should follow the same
pytest.mark.asyncio pattern and assertion style as the existing tests, verifying
the service call counts and cache state appropriately.

Source: Coding guidelines

entrypoint.sh (1)

11-52: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

entrypoint.sh is a restricted file and needs explicit human approval.

Please gate merge on recorded manual approval for this entrypoint change.

As per coding guidelines, AGENTS.md lists entrypoint.sh under human-approval-required actions.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@entrypoint.sh` around lines 11 - 52, The entrypoint.sh file is restricted and
requires explicit human approval before merging, as documented in AGENTS.md
under human-approval-required actions. Obtain recorded manual approval from the
appropriate stakeholder for the changes made to the entrypoint.sh file (the
cli-osprey-ui-api, cli-osprey-worker, and cli-osprey-async-worker functions) and
ensure this approval is documented on the pull request before proceeding with
the merge.

Source: Coding guidelines

osprey_async_worker/Dockerfile (2)

11-73: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Run the image as a non-root user.

There is no USER directive, so runtime defaults to root (Line 71 onward still executes as root). This weakens container isolation.

🔧 Suggested hardening
 FROM python:3.11-slim
 ARG TARGETPLATFORM
@@
 WORKDIR /osprey
@@
+RUN addgroup --system osprey && \
+    adduser --system --ingroup osprey --home /osprey osprey && \
+    chown -R osprey:osprey /osprey
@@
 COPY entrypoint.sh /osprey/entrypoint.sh
+USER osprey
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/Dockerfile` around lines 11 - 73, The Dockerfile lacks a
USER directive, causing the container to run as root by default which presents a
security risk. Add a non-root user creation command (using RUN to invoke useradd
or similar) after the source code is added but before the ENTRYPOINT directive,
then add a USER directive specifying the created user immediately before the
ENTRYPOINT and CMD lines to ensure the container executes with restricted
privileges.

Source: Linters/SAST tools


1-73: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

This Dockerfile change needs explicit human approval before merge.

This file is in the restricted set for manual approval in repo policy.

As per coding guidelines, AGENTS.md marks **/Dockerfile changes as “Human-approval-required actions.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/Dockerfile` around lines 1 - 73, This Dockerfile change
requires explicit human approval before it can be merged per repository policy
as documented in AGENTS.md. As the agent/intern, you cannot automatically fix
this - you must request explicit approval from a human maintainer or project
lead who has authority to review and approve Dockerfile changes. Do not attempt
to modify the Dockerfile further as a workaround; instead, ensure proper
approval is obtained through the established review process before this change
can be merged.

Source: Coding guidelines

🟡 Minor comments (16)
osprey_coordinator/src/sync_action_rpc.rs-95-100 (1)

95-100: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Avoid retrying the intentional drain rejection.

This branch returns Unavailable by design, but process_action retries all errors once, so draining requests are immediately re-executed and counted again. Consider skipping the second attempt when shutdown is active.

Suggested fix
-        match self.try_process_action(ack_id, &action_request).await {
+        match self.try_process_action(ack_id, &action_request).await {
             response @ Ok(_) => response,
+            Err(e) if self.is_shutting_down.load(Ordering::Acquire) => Err(e),
             Err(e) => {
                 tracing::error!("initial process_action attempt failed, retrying: {}", e);
                 self.try_process_action(ack_id, &action_request).await
             }
         }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_coordinator/src/sync_action_rpc.rs` around lines 95 - 100, When the
coordinator is shutting down, the code returns an `Unavailable` error in the
`is_shutting_down` check, but the `process_action` function automatically
retries all errors once, causing draining requests to be executed and counted
twice. Instead of returning `Unavailable` which is retryable, return a
non-retryable error status code such as `FailedPrecondition` so that
`process_action` will skip the retry attempt when shutdown is active.
osprey_worker/src/osprey/engine/stdlib/udfs/count_regex_matches.py-14-17 (1)

14-17: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Implementation currently counts duplicate patterns, despite “distinct patterns” contract.

The code iterates the raw patterns list, so duplicates increment the count multiple times. Either dedupe before compile, or update the UDF docs to match current behavior.

💡 Proposed fix (dedupe while preserving order)
-        for pattern in arguments.patterns.value:
+        for pattern in dict.fromkeys(arguments.patterns.value):
             try:
                 self._compiled.append(re.compile(pattern, flags))

Also applies to: 42-53

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/src/osprey/engine/stdlib/udfs/count_regex_matches.py` around
lines 14 - 17, The CountRegexMatchesArguments class documentation states it
counts "distinct patterns", but the implementation processes all patterns in the
input list including duplicates, causing them to be counted multiple times.
Modify the CountRegexMatchesArguments class to deduplicate the patterns list
while preserving order before they are processed, ensuring the actual behavior
matches the documented contract of counting only distinct patterns. This
deduplication should be applied where the patterns are stored or validated in
the class, and ensure it also applies to any processing logic in the referenced
code section at lines 42-53.
osprey_worker/src/osprey/worker/sinks/sink/rules_sink.py-146-148 (1)

146-148: ⚠️ Potential issue | 🟡 Minor

Confirm skip-flag migration with upstream producers.

At Line 146, skip behavior now keys only on osprey_skip_async. The codebase contains no references to the legacy osprey_v2_skip_async_classification flag in Python source code, and action data is deserialized from external message sources (Kafka). If any upstream producer still emits only the old flag, those actions will process without being skipped. Verify that all producers have migrated to the new flag name before merging.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/src/osprey/worker/sinks/sink/rules_sink.py` around lines 146 -
148, The skip condition at line 146 in rules_sink.py currently only checks for
the new 'osprey_skip_async' flag, but upstream Kafka producers may still be
emitting the legacy 'osprey_v2_skip_async_classification' flag. Update the
condition in the action.data.get() call to check for both the new flag
'osprey_skip_async' and the legacy flag 'osprey_v2_skip_async_classification'
using a boolean OR operation, ensuring that actions from producers that haven't
yet migrated to the new flag name will still be skipped correctly during the
transition period.
osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_count_regex_matches.py-39-43 (1)

39-43: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Escape target safely when building DSL test input.

target="{target}" will break for values containing quotes/escapes, causing parser failures unrelated to UDF behavior. Use a repr-safe literal instead.

Proposed fix
-    Count = CountRegexMatches(patterns={patterns!r}, target="{target}")
+    Count = CountRegexMatches(patterns={patterns!r}, target={target!r})
@@
-    result = execute(f'Count = CountRegexMatches(patterns={patterns!r}, target="{target}"{extra_args})')
+    result = execute(f'Count = CountRegexMatches(patterns={patterns!r}, target={target!r}{extra_args})')

Also applies to: 68-72

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_count_regex_matches.py`
around lines 39 - 43, The target variable in the DSL test input string is being
directly interpolated without proper escaping, which will cause parser failures
if the target value contains quotes or escape characters. In the execute()
function call within the test_count_regex_matches function, change the target
parameter from target="{target}" to target={target!r} to use repr() formatting,
which will properly escape special characters and quotes just like the patterns
parameter already does with {patterns!r}. This same fix should be applied to all
similar occurrences throughout the test file.
docs/development/workflow.md-31-33 (1)

31-33: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix the mypy path in the documented command.

Line 32 points to osprey_worker/src/osprey_worker/lib, which doesn’t match the repo path shape (osprey_worker/src/osprey/worker/lib) and is likely to fail when copied.

Proposed fix
-uv run mypy osprey_worker/src/osprey_worker/lib
+uv run mypy osprey_worker/src/osprey/worker/lib
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/development/workflow.md` around lines 31 - 33, The mypy command path in
the Type checking documentation is incorrect. Update the path in the uv run mypy
command from osprey_worker/src/osprey_worker/lib to
osprey_worker/src/osprey/worker/lib to match the actual repository directory
structure. This ensures the documented command will work correctly when users
copy it.
osprey_async_worker/src/osprey/async_worker/executor.py-1-9 (1)

1-9: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Module docstring contradicts runtime behavior.

The header says there is no thread pool / run_in_executor, but legacy paths below do use run_in_executor. Please align the docs with actual behavior.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/executor.py` around lines 1 - 9,
The module docstring at the beginning of the executor.py file incorrectly states
that there is no thread pool and no run_in_executor, but the actual
implementation below uses run_in_executor for legacy sync UDF execution paths.
Update the module docstring to accurately document all execution paths including
the thread pool and run_in_executor usage for handling legacy sync UDFs, so the
documentation reflects the true runtime behavior of the executor.
osprey_async_worker/src/osprey/async_worker/lib/pigeon/client.py-389-389 (1)

389-389: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use a targeted type-ignore code (or explicit protocol) instead of a bare ignore.

Line 389 introduces a broad # type: ignore, which makes type suppression hard to audit.

🔧 Suggested fix
-            client = self._stub_cls(channel)  # type: ignore
+            client = self._stub_cls(channel)  # type: ignore[call-arg]  # Generated grpc stubs accept Channel at runtime.

As per coding guidelines, new Python suppressions must be narrowly scoped (# type: ignore[<code>]) with justification.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/lib/pigeon/client.py` at line
389, The bare `# type: ignore` comment on line 389 where `client =
self._stub_cls(channel)` is assigned needs to be replaced with a specific type
error code. First, determine the exact type error by temporarily removing the
ignore comment and letting your type checker (mypy) report the specific error
code. Then replace the bare `# type: ignore` with a narrowly scoped `# type:
ignore[error_code]` format (for example, `# type: ignore[assignment]` or `#
type: ignore[arg-type]`) and add a brief comment explaining why the specific
type suppression is necessary for the _stub_cls instantiation.

Source: Coding guidelines

osprey_async_worker/src/osprey/async_worker/lib/discovery/async_directory.py-268-268 (1)

268-268: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

tolerate_draining is ignored in the fallback branch.

When no visible instances exist, the fallback always excludes draining instances, even if tolerate_draining=True. That makes tolerant selection unexpectedly fail during visibility delays.

Suggested fix
-        return services if services else [w.service for w in self._instances.values() if not w.service.draining]
+        if services:
+            return services
+        if tolerate_draining:
+            return [w.service for w in self._instances.values()]
+        return [w.service for w in self._instances.values() if not w.service.draining]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/lib/discovery/async_directory.py`
at line 268, In the fallback branch of the return statement in the method
containing line 268, the filter condition is unconditionally excluding draining
instances with `if not w.service.draining`, which ignores the
`tolerate_draining` parameter. Modify the filter condition in the list
comprehension to check the `tolerate_draining` parameter: if `tolerate_draining`
is True, include all instances regardless of draining status; if False, maintain
the current behavior of excluding draining instances. This ensures that when
`tolerate_draining=True`, draining instances are included in the fallback list
during visibility delays.
osprey_async_worker/src/osprey/async_worker/stdlib_udfs/async_mx_lookup.py-33-33 (1)

33-33: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add justification (or remove) the new class-level # type: ignore[misc].

Line 33 introduces a new ignore without an inline rationale. Please either remove it by fixing the underlying typing issue or add a concise justification for this exact ignore code.

As per coding guidelines: “Do not disable lint or type rules to silence errors; fix the underlying issue or use narrowly-scoped # type: ignore[<code>] with explanatory comments.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/stdlib_udfs/async_mx_lookup.py`
at line 33, The MXLookup class has a type: ignore[misc] comment without
justification. Either remove this ignore by fixing the underlying typing issue
(such as properly typing the generic parameters or the class inheritance), or
add a concise inline comment explaining why the specific misc type error cannot
be resolved. If keeping the ignore, add a comment like # type: ignore[misc] #
<reason> to document the necessity of this suppression.

Source: Coding guidelines

osprey_async_worker/src/osprey/async_worker/cli/main.py-364-365 (1)

364-365: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix misleading benchmark fallback output.

On Line 365, when gevent is unavailable the CLI prints “Same performance,” which is inaccurate because no comparison was run.

Suggested fix
-    else:
-        click.echo('  Same performance')
+    else:
+        click.echo('Comparison skipped: gevent benchmark unavailable')
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/cli/main.py` around lines 364 -
365, The else block in main.py that prints "Same performance" is misleading
because this message is displayed when gevent is unavailable, but it incorrectly
suggests that a performance comparison was actually performed. Replace the
click.echo statement with a message that accurately reflects the situation, such
as indicating that gevent is not available and therefore no comparison could be
made, rather than claiming the performance is the same.
osprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/baggage.py-34-36 (1)

34-36: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Constructor-configured baggage values are never applied.

Line 34 defines baggage_header/baggage, but Line 60 injects only span baggage and Line 61 appends those headers; self._baggage_header and self._baggage are currently unused in request metadata generation.

Proposed fix
     def _intercept_call(self, continuation, client_call_details, request_or_iterator):
@@
-        baggage_headers = {}
-        baggage_propagator.inject(get_baggage(current_span()), baggage_headers)
+        baggage_headers = {}
+        merged_baggage = {**self._baggage, **get_baggage(current_span())}
+        baggage_propagator.inject(merged_baggage, baggage_headers)
+        if self._baggage_header != BAGGAGE_HEADER and BAGGAGE_HEADER in baggage_headers:
+            baggage_headers[self._baggage_header] = baggage_headers.pop(BAGGAGE_HEADER)
         metadata.extend([(k, v) for k, v in baggage_headers.items()])

Also applies to: 60-61

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@osprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/baggage.py`
around lines 34 - 36, The BaggageInterceptor constructor accepts and stores
baggage_header and baggage parameters in self._baggage_header and self._baggage,
but these values are never applied when generating request metadata. In the code
section where span baggage is injected into request headers (around lines
60-61), you need to merge the constructor-configured self._baggage with the span
baggage, and use self._baggage_header instead of just relying on the span
baggage. Update the baggage injection logic to include and apply both the
constructor-configured baggage dictionary and the appropriate baggage header
name when building the request metadata headers.
osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py-63-83 (1)

63-83: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Same missing _channel_pool issue.

This test will also fail when stop() is called without _channel_pool being mocked.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py`
around lines 63 - 83, The test_input_stream_shutdown_event_unblocks test is
missing the initialization of the _channel_pool attribute on the stream object.
When stream.stop() is called, the stop() method attempts to access _channel_pool
but it doesn't exist since only _shutdown_event was initialized using __new__.
Add a mock or initialization for stream._channel_pool (similar to how
stream._shutdown_event is set up) before calling await stream.stop() to ensure
the stop() method has all required attributes available.
osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py-37-46 (1)

37-46: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Test uses non-existent attribute _request_queue.

The test assigns stream._request_queue but OspreyCoordinatorBiDirectionalStream uses _outgoing_queue. This test passes but doesn't validate the actual class behavior.

🐛 Proposed fix
 `@pytest.mark.asyncio`
 async def test_bidirectional_stream_queue_based():
     """Stream uses asyncio.Queue for sending requests."""
     stream = OspreyCoordinatorBiDirectionalStream.__new__(OspreyCoordinatorBiDirectionalStream)
-    stream._request_queue = asyncio.Queue()
-    stream._should_run = True
+    stream._outgoing_queue = asyncio.Queue()
+    stream._stopped = False

-    await stream._request_queue.put('test_request')
-    item = await stream._request_queue.get()
+    await stream._outgoing_queue.put('test_request')
+    item = await stream._outgoing_queue.get()
     assert item == 'test_request'
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py`
around lines 37 - 46, The test method test_bidirectional_stream_queue_based is
assigning and accessing a non-existent attribute _request_queue, but the
OspreyCoordinatorBiDirectionalStream class actually uses the attribute name
_outgoing_queue. Replace all occurrences of _request_queue in this test with
_outgoing_queue to ensure the test validates the actual class behavior and uses
the correct attribute that the class implements.
osprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.py-257-259 (1)

257-259: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Assert on user-driven path risks crash.

get_uptime() uses assert self._connect_time is not None, which will crash the process if called before a connection is established. Per coding guidelines, prefer explicit error handling on user/RPC paths.

🛡️ Proposed fix
     def get_uptime(self) -> float:
-        assert self._connect_time is not None, 'This was called before a connection was established'
+        if self._connect_time is None:
+            return 0.0
         return time.time() - self._connect_time
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.py`
around lines 257 - 259, The get_uptime() method uses an assert statement to
validate that self._connect_time is not None, which is unsafe for user-driven or
RPC paths since asserts can be disabled at runtime. Replace the assert with
explicit error handling by raising an appropriate exception (such as
RuntimeError) with the same descriptive error message when self._connect_time is
None, ensuring the error is always caught regardless of Python optimization
flags.
osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py-52-60 (1)

52-60: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Test for stop() is incomplete — missing _channel_pool mock.

OspreyCoordinatorInputStream.stop() calls await self._channel_pool.close(), but this test doesn't mock _channel_pool, so it will raise AttributeError when run.

🐛 Proposed fix
 `@pytest.mark.asyncio`
 async def test_input_stream_stop():
     """Stop sets the shutdown event."""
     stream = OspreyCoordinatorInputStream.__new__(OspreyCoordinatorInputStream)
     stream._shutdown_event = asyncio.Event()
+    stream._channel_pool = MagicMock()
+    stream._channel_pool.close = MagicMock(return_value=asyncio.coroutine(lambda: None)())

     assert not stream._shutdown_event.is_set()
     await stream.stop()
     assert stream._shutdown_event.is_set()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py`
around lines 52 - 60, The test_input_stream_stop test creates an
OspreyCoordinatorInputStream instance and sets _shutdown_event, but the stop()
method also calls await self._channel_pool.close(), which is not mocked. Add a
mock object with an async close method and assign it to stream._channel_pool
before calling await stream.stop() to prevent an AttributeError when the test
runs.
osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py-47-57 (1)

47-57: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Timeout retries lack backoff sleep.

When a TimeoutError occurs (lines 47-51), the loop retries immediately without any delay, unlike the generic exception path (line 57) which sleeps before retrying. This could cause rapid retry storms against an already-struggling sink.

🔧 Proposed fix to add backoff on timeout
             except TimeoutError:
                 logger.warning(f'Timeout pushing to {sink_name} (attempt {attempt}/{attempts})')
                 metrics.increment('output_sink.timeout', tags=[f'sink:{sink_name}'])
                 if attempt == attempts:
                     metrics.increment('output_sink.timeout_exhausted', tags=[f'sink:{sink_name}'])
+                else:
+                    await asyncio.sleep(0.5 * attempt)
             except Exception as exc:
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py` around
lines 47 - 57, The TimeoutError exception handler (which catches TimeoutError at
line 47) is missing the backoff sleep that exists in the generic Exception
handler path. To fix this, add an await asyncio.sleep call with the same
exponential backoff formula (0.5 * attempt) to the TimeoutError block, ensuring
it sleeps before retrying on timeout just like the generic Exception handler
does. This sleep should be added after the metrics increment and before checking
if the attempt has exhausted the maximum number of retries (before the if
attempt == attempts check).
🧹 Nitpick comments (9)
osprey_worker/src/osprey/worker/lib/llm/tests/test_base.py (1)

77-84: 💤 Low value

Consider using pytest.raises for consistency.

The try/except/else pattern works correctly, but pytest.raises(TypeError) would be more idiomatic and consistent with typical pytest style. This is purely a style preference.

♻️ Optional refactor
 def test_base_provider_is_abstract_and_subclassable() -> None:
     # BaseLLMProvider cannot be instantiated directly.
-    try:
-        BaseLLMProvider()  # type: ignore[abstract]
-    except TypeError:
-        pass
-    else:
-        raise AssertionError('expected BaseLLMProvider to be abstract')
+    import pytest
+    with pytest.raises(TypeError):
+        BaseLLMProvider()  # type: ignore[abstract]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/src/osprey/worker/lib/llm/tests/test_base.py` around lines 77 -
84, The test function `test_base_provider_is_abstract_and_subclassable` uses a
manual try/except/else pattern to verify that `BaseLLMProvider` raises a
TypeError when instantiated directly. Replace this pattern with the
`pytest.raises(TypeError)` context manager that wraps the instantiation of
`BaseLLMProvider()` to align with pytest conventions and improve readability.
osprey_worker/src/osprey/worker/lib/llm/tools.py (1)

43-57: 💤 Low value

None as sentinel prevents declaring explicit null defaults.

Using None to mean "no default" makes it impossible to emit a JSON Schema default: null for parameters where null is a valid default value. If this is intentional (null defaults not needed), the current approach is fine. Otherwise, consider a sentinel like _UNSET = object().

This is a minor edge case that may never arise in practice.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/src/osprey/worker/lib/llm/tools.py` around lines 43 - 57, The
issue is that using `None` as the sentinel value for "no default" in the
ToolParameter dataclass prevents explicitly setting a JSON Schema default value
of null. Replace the `None` sentinel with a dedicated sentinel object (e.g.,
`_UNSET = object()`) defined at module level. Update the default parameter
initialization in the ToolParameter dataclass to use this new sentinel instead
of `None`, and change the condition in the build_input_schema function that
checks `if parameter.default is not None:` to instead check against the new
sentinel object to determine whether to include a default value in the schema.
osprey_worker/src/osprey/worker/sinks/sink/output_sink.py (1)

190-197: ⚡ Quick win

Defensively copy monitored_labels on assignment.

This currently stores the caller’s set by reference, so later external mutations can change sink behavior unexpectedly.

💡 Proposed fix
 def __init__(self, labels_provider: LabelsProvider, monitored_labels: set[str] | None = None) -> None:
     self._labels_provider = labels_provider
-    self._monitored_labels: set[str] = monitored_labels or set()
+    self._monitored_labels: set[str] = set(monitored_labels or set())

 def set_monitored_labels(self, monitored_labels: set[str]) -> None:
     """Set monitored labels for metrics emission. Can be called post-engine compilation."""
-    self._monitored_labels = monitored_labels
+    self._monitored_labels = set(monitored_labels)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/src/osprey/worker/sinks/sink/output_sink.py` around lines 190 -
197, The __init__ method and set_monitored_labels method both store direct
references to the caller's monitored_labels set instead of making defensive
copies. This allows external mutations to the original set to unexpectedly
change the sink's behavior. Fix this by creating a defensive copy of the
monitored_labels parameter when assigning it to self._monitored_labels in both
locations - use the set() constructor to create a copy of the passed set in the
__init__ method when monitored_labels is not None, and similarly in the
set_monitored_labels method when storing the provided monitored_labels.
osprey_worker/src/osprey/worker/lib/etcd/watcher/watcherd_impl.py (1)

147-152: ⚡ Quick win

Preserve traceback context on retry warnings.

Switching to log.warning drops stack traces here, which makes recurring stream/reset failures harder to root-cause.

Proposed patch
-            log.warning(
+            log.warning(
                 '%r: etcd-watcherd stream raised an error (%r). sleeping for %.2f sec before retrying',
                 self,
                 e,
                 delay,
+                exc_info=True,
             )
@@
-            log.warning('%r: etcd raised an error (%r). sleeping for %.2f sec before retrying', self, e, delay)
+            log.warning(
+                '%r: etcd raised an error (%r). sleeping for %.2f sec before retrying',
+                self,
+                e,
+                delay,
+                exc_info=True,
+            )

Also applies to: 169-170

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/src/osprey/worker/lib/etcd/watcher/watcherd_impl.py` around
lines 147 - 152, The log.warning calls for etcd-watcherd stream errors drop
stack traces, making debugging harder. Replace the log.warning call in the error
handling block (around line 147) with log.exception to preserve the full
traceback context when logging the error. Apply the same fix to the similar
error handling at line 169-170 as noted in the comment. This will ensure that
stack traces are captured and logged, making it easier to root-cause recurring
stream/reset failures.
osprey_worker/Dockerfile (1)

39-42: ⚡ Quick win

Use COPY instead of ADD for these local file copies.

These lines are plain local copies; COPY is the clearer/safer primitive and aligns with lint output.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/Dockerfile` around lines 39 - 42, Replace all four ADD
instructions with COPY instructions for the pyproject.toml file copies. Change
the ADD commands that copy osprey_rpc/pyproject.toml,
osprey_worker/pyproject.toml, osprey_async_worker/pyproject.toml, and
example_plugins/pyproject.toml from ADD to COPY, as COPY is the appropriate
primitive for copying local files and is clearer and safer than ADD for this use
case.

Source: Linters/SAST tools

osprey_async_worker/Dockerfile (1)

38-47: ⚡ Quick win

Prefer COPY over ADD for plain file/directory copies.

These ADD instructions are simple filesystem copies (no archive extraction / remote URL fetch), so COPY is clearer and matches lint expectations.

Also applies to: 63-67

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/Dockerfile` around lines 38 - 47, Replace all ADD
instructions with COPY instructions in the Dockerfile. The ADD commands for
uv.lock, pyproject.toml, README.md, LICENSE.md, and the various project-specific
pyproject.toml files (osprey_rpc, osprey_worker, osprey_async_worker,
example_plugins) are simple filesystem copies and should use COPY instead, as
ADD is only needed when extracting archives or fetching remote URLs. Apply this
change to all ADD instructions including those at lines 63-67 as well.

Source: Linters/SAST tools

osprey_async_worker/src/osprey/async_worker/tests/test_pigeon_client.py (1)

13-25: ⚡ Quick win

Harden contextvar tests against state leakage between tests.

These tests reset skip_rate_limit_context.skip manually, but a mid-test failure can leave shared state behind for subsequent tests. A small fixture (or try/finally) would make this deterministic.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/tests/test_pigeon_client.py`
around lines 13 - 25, The tests test_skip_rate_limit_set_and_get and
test_skip_rate_limit_property_api manually reset skip_rate_limit_context.skip
but lack proper cleanup if a test fails mid-execution, potentially leaving the
context in a dirty state for subsequent tests. Refactor these tests to use
either a pytest fixture that automatically resets skip_rate_limit_context.skip
to its initial state before and after each test, or wrap the test logic in a
try/finally block to ensure cleanup occurs regardless of test outcome.
osprey_async_worker/src/osprey/async_worker/tests/test_no_gevent_imports.py (1)

32-37: ⚡ Quick win

Keep gevent-allowed importers single-sourced.

Line 76 hardcodes the allowlist instead of using _ALLOWED_GEVENT_IMPORTERS, so the test can drift if one side changes.

Suggested fix
-        f'allowed = {{"sentry_sdk", "ddtrace"}}\n'
+        f'allowed = {set(_ALLOWED_GEVENT_IMPORTERS)!r}\n'

Also applies to: 76-76

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/tests/test_no_gevent_imports.py`
around lines 32 - 37, The allowlist for gevent importers is defined once in the
`_ALLOWED_GEVENT_IMPORTERS` constant but is hardcoded again at line 76, creating
a maintenance issue where changes to one location won't sync with the other.
Replace the hardcoded allowlist at line 76 with a reference to the
`_ALLOWED_GEVENT_IMPORTERS` constant to ensure the test uses a single-sourced
allowlist and prevents the test from drifting if the allowlist is updated in the
future.
osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py (1)

59-61: 💤 Low value

Consider parallel stop for faster shutdown.

stop() awaits each sink sequentially. For faster graceful shutdown (especially with many sinks or slow-to-stop sinks), consider using asyncio.gather:

♻️ Proposed refactor
     async def stop(self) -> None:
-        for sink in self._sinks:
-            await sink.stop()
+        await asyncio.gather(*(sink.stop() for sink in self._sinks), return_exceptions=True)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py` around
lines 59 - 61, The stop() method in the OutputSink class currently awaits each
sink's stop operation sequentially, which is inefficient for shutdown
performance. Refactor the stop() method to use asyncio.gather() to run all
sink.stop() calls concurrently instead of sequentially, allowing multiple sinks
to stop in parallel rather than waiting for each one to complete before moving
to the next.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 5ab76602-d02e-4274-ac63-0680a261abed

📥 Commits

Reviewing files that changed from the base of the PR and between 219528a and 134d76c.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (133)
  • docker-compose.yaml
  • docs/DEVELOPMENT.md
  • docs/SUMMARY.md
  • docs/development/workflow.md
  • docs/llm.md
  • entrypoint.sh
  • example_plugins/pyproject.toml
  • example_plugins/src/async_sinks/__init__.py
  • example_plugins/src/async_sinks/example_async_output_sink.py
  • example_plugins/src/llm/__init__.py
  • example_plugins/src/llm/anthropic_provider.py
  • example_plugins/src/llm/tests/__init__.py
  • example_plugins/src/llm/tests/test_anthropic_provider.py
  • example_plugins/src/register_async_plugins.py
  • example_plugins/src/register_plugins.py
  • example_plugins/src/tests/__init__.py
  • example_plugins/src/tests/test_register_async_plugins.py
  • osprey_async_worker/Dockerfile
  • osprey_async_worker/example_rules/config/example_config.yaml
  • osprey_async_worker/example_rules/main.sml
  • osprey_async_worker/example_rules/rules/long_message.sml
  • osprey_async_worker/example_rules/sample_actions.jsonl
  • osprey_async_worker/pyproject.toml
  • osprey_async_worker/src/osprey/async_worker/__init__.py
  • osprey_async_worker/src/osprey/async_worker/adaptor/__init__.py
  • osprey_async_worker/src/osprey/async_worker/adaptor/constants.py
  • osprey_async_worker/src/osprey/async_worker/adaptor/hookspecs.py
  • osprey_async_worker/src/osprey/async_worker/adaptor/interfaces.py
  • osprey_async_worker/src/osprey/async_worker/adaptor/plugin_manager.py
  • osprey_async_worker/src/osprey/async_worker/cli/__init__.py
  • osprey_async_worker/src/osprey/async_worker/cli/main.py
  • osprey_async_worker/src/osprey/async_worker/engine.py
  • osprey_async_worker/src/osprey/async_worker/executor.py
  • osprey_async_worker/src/osprey/async_worker/lib/__init__.py
  • osprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.py
  • osprey_async_worker/src/osprey/async_worker/lib/discovery/__init__.py
  • osprey_async_worker/src/osprey/async_worker/lib/discovery/async_directory.py
  • osprey_async_worker/src/osprey/async_worker/lib/discovery/hash_ring.py
  • osprey_async_worker/src/osprey/async_worker/lib/etcd/__init__.py
  • osprey_async_worker/src/osprey/async_worker/lib/etcd/sources_provider.py
  • osprey_async_worker/src/osprey/async_worker/lib/external_service.py
  • osprey_async_worker/src/osprey/async_worker/lib/pigeon/__init__.py
  • osprey_async_worker/src/osprey/async_worker/lib/pigeon/client.py
  • osprey_async_worker/src/osprey/async_worker/lib/pigeon/exceptions.py
  • osprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/__init__.py
  • osprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/baggage.py
  • osprey_async_worker/src/osprey/async_worker/lib/pigeon/interceptors/metadata.py
  • osprey_async_worker/src/osprey/async_worker/lib/pigeon/skip_rate_limit.py
  • osprey_async_worker/src/osprey/async_worker/lib/publisher.py
  • osprey_async_worker/src/osprey/async_worker/lib/utils/__init__.py
  • osprey_async_worker/src/osprey/async_worker/singletons.py
  • osprey_async_worker/src/osprey/async_worker/sinks/__init__.py
  • osprey_async_worker/src/osprey/async_worker/sinks/sink/__init__.py
  • osprey_async_worker/src/osprey/async_worker/sinks/sink/input_stream.py
  • osprey_async_worker/src/osprey/async_worker/sinks/sink/output_sink.py
  • osprey_async_worker/src/osprey/async_worker/sinks/sink/rules_sink.py
  • osprey_async_worker/src/osprey/async_worker/stdlib_udfs/__init__.py
  • osprey_async_worker/src/osprey/async_worker/stdlib_udfs/_async_stdlib_plugin.py
  • osprey_async_worker/src/osprey/async_worker/stdlib_udfs/async_mx_lookup.py
  • osprey_async_worker/src/osprey/async_worker/tests/__init__.py
  • osprey_async_worker/src/osprey/async_worker/tests/conftest.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_async_executor.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_async_sinks.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_engine.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_etcd_sources_provider.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_external_service.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_no_gevent_imports.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_pigeon_client.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_plugin_manager.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_publisher.py
  • osprey_async_worker/test_data/input.jsonl
  • osprey_async_worker/test_data/rules/main.sml
  • osprey_async_worker/test_data/rules/rules/test_rule.sml
  • osprey_coordinator/src/consumer/pubsub.rs
  • osprey_coordinator/src/coordinator_metrics.rs
  • osprey_coordinator/src/main.rs
  • osprey_coordinator/src/osprey_bidirectional_stream.rs
  • osprey_coordinator/src/pigeon/mod.rs
  • osprey_coordinator/src/priority_queue.rs
  • osprey_coordinator/src/pub_sub_streaming_pull/flow_control.rs
  • osprey_coordinator/src/shutdown_handler.rs
  • osprey_coordinator/src/sync_action_rpc.rs
  • osprey_ui/src/components/event_stream/FeatureSelectModal.tsx
  • osprey_worker/Dockerfile
  • osprey_worker/src/osprey/engine/ast/grammar.py
  • osprey_worker/src/osprey/engine/executor/execution_context.py
  • osprey_worker/src/osprey/engine/executor/execution_visualizer.py
  • osprey_worker/src/osprey/engine/executor/executor.py
  • osprey_worker/src/osprey/engine/executor/external_service_utils.py
  • osprey_worker/src/osprey/engine/executor/external_service_utils_base.py
  • osprey_worker/src/osprey/engine/executor/node_executor/binary_operation_executor.py
  • osprey_worker/src/osprey/engine/executor/tests/test_get_verdicts_pb2_proto.py
  • osprey_worker/src/osprey/engine/executor/udf_execution_helpers.py
  • osprey_worker/src/osprey/engine/stdlib/configs/feature_flags_config.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/categories.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/count_regex_matches.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/experiments.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/json_data.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/rules.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/string.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_count_regex_matches.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_count_regex_matches/test_rejects_invalid_regex.txt
  • osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_entity.py
  • osprey_worker/src/osprey/engine/stdlib/udfs/tests/test_experiments.py
  • osprey_worker/src/osprey/engine/utils/types.py
  • osprey_worker/src/osprey/worker/_stdlibplugin/udf_register.py
  • osprey_worker/src/osprey/worker/adaptor/hookspecs/osprey_hooks.py
  • osprey_worker/src/osprey/worker/adaptor/plugin_manager.py
  • osprey_worker/src/osprey/worker/cli/sinks.py
  • osprey_worker/src/osprey/worker/lib/etcd/tests/test_watcher_mux.py
  • osprey_worker/src/osprey/worker/lib/etcd/watcher/_mux.py
  • osprey_worker/src/osprey/worker/lib/etcd/watcher/watcherd_impl.py
  • osprey_worker/src/osprey/worker/lib/instruments/__init__.py
  • osprey_worker/src/osprey/worker/lib/llm/__init__.py
  • osprey_worker/src/osprey/worker/lib/llm/base.py
  • osprey_worker/src/osprey/worker/lib/llm/loop.py
  • osprey_worker/src/osprey/worker/lib/llm/tests/__init__.py
  • osprey_worker/src/osprey/worker/lib/llm/tests/test_base.py
  • osprey_worker/src/osprey/worker/lib/llm/tests/test_loop.py
  • osprey_worker/src/osprey/worker/lib/llm/tests/test_tools.py
  • osprey_worker/src/osprey/worker/lib/llm/tools.py
  • osprey_worker/src/osprey/worker/lib/osprey_engine.py
  • osprey_worker/src/osprey/worker/lib/sources_provider.py
  • osprey_worker/src/osprey/worker/lib/sources_provider_base.py
  • osprey_worker/src/osprey/worker/lib/tests/test_sources_provider.py
  • osprey_worker/src/osprey/worker/sinks/sink/monitored_rules_metrics.py
  • osprey_worker/src/osprey/worker/sinks/sink/output_sink.py
  • osprey_worker/src/osprey/worker/sinks/sink/rules_sink.py
  • osprey_worker/src/osprey/worker/sinks/utils/acking_contexts.py
  • osprey_worker/src/osprey/worker/sinks/utils/acking_contexts_base.py
  • pyproject.toml
💤 Files with no reviewable changes (1)
  • osprey_worker/src/osprey/engine/stdlib/configs/feature_flags_config.py

Comment on lines +166 to +171
# 4. Store audit state for execute() to consume.
# Safe because WhenRules is synchronous (execute_async=False) and CallExecutor.execute()
# calls resolve_arguments() then execute() without yielding to the event loop.
self._failed_rule_names = failed_rule_names
self._then_failed = then_failed

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "== Locate CallExecutor and UDFBase implementations =="
fd -t f 'call_executor.py|base.py|executor.py|udf/base.py' .

echo
echo "== Inspect CallExecutor execute flow and resolve->execute ordering =="
rg -n -C4 'class CallExecutor|def execute\(|resolve_arguments\(|unresolved_arguments|UDFBase' .

echo
echo "== Inspect UDFBase for shared-state indicators (__slots__, caching, singleton patterns) =="
rg -n -C4 'class UDFBase|__slots__|`@lru_cache`|singleton|execute_async' .

echo
echo "== Inspect worker concurrency model hints (gevent/threading) =="
rg -n -C3 'gevent|greenlet|threading|ThreadPool|concurrent' osprey_worker

Repository: roostorg/osprey

Length of output: 50372


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Locate the rules.py file ==="
find . -name rules.py -type f | grep -v __pycache__

echo
echo "=== Inspect WhenRules class definition and audit state usage ==="
rg -n 'class WhenRules|_failed_rule_names|_then_failed' --max-count=30 osprey_worker/src/osprey/engine/stdlib/udfs/

echo
echo "=== Inspect CallExecutor.execute() method signature and flow ==="
rg -n -A20 'class CallExecutor|def execute\(' osprey_worker/src/osprey/engine/executor/node_executor/call_executor.py | head -80

echo
echo "=== Search for WhenRules instantiation patterns (new/creation) ==="
rg -n 'WhenRules\(' osprey_worker --max-count=20

echo
echo "=== Check if UDFs are instantiated per-request or cached ==="
rg -n 'instantiate|create.*udf|_udf.*=' osprey_worker/src/osprey/engine/executor/node_executor/call_executor.py | head -20

Repository: roostorg/osprey

Length of output: 5831


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check CallExecutor instantiation and lifecycle ==="
rg -n 'CallExecutor(' osprey_worker/src/osprey/engine/executor --max-count=20

echo
echo "=== Understand how udf_map is built and where UDFs come from ==="
rg -n 'ValidateCallKwargs|udf_map' osprey_worker/src/osprey/engine --max-count=15

echo
echo "=== Check if UDF instances are created per execution or reused ==="
rg -n 'class.*Executor.*execute\(|_udf.*=' osprey_worker/src/osprey/engine/executor/executor.py | head -30

echo
echo "=== Inspect the validation context to see if UDFs are instantiated ==="
fd -t f -e py 'validator' osprey_worker/src/osprey/engine/ast_validator | head -10

Repository: roostorg/osprey

Length of output: 209


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check CallExecutor instantiation and lifecycle ==="
rg -n 'CallExecutor\(' osprey_worker/src/osprey/engine/executor --max-count=20

echo
echo "=== Understand how udf_map is built and where UDFs come from ==="
rg -n 'ValidateCallKwargs|udf_map' osprey_worker/src/osprey/engine --max-count=15

echo
echo "=== Check if UDF instances are created per execution or reused ==="
rg -n 'class.*Executor|def execute' osprey_worker/src/osprey/engine/executor/executor.py | head -30

echo
echo "=== Inspect the validation context to see if UDFs are instantiated ==="
fd -t f -e py 'validator' osprey_worker/src/osprey/engine/ast_validator | head -10

echo
echo "=== Search for where _udf is instantiated (created) ==="
rg -n '_udf\s*=' osprey_worker/src/osprey/engine/executor/node_executor/call_executor.py

Repository: roostorg/osprey

Length of output: 16335


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Inspect ValidateCallKwargs validator to see where UDF instances are created ==="
fd -t f 'validate_call_kwargs.py' osprey_worker/src/osprey/engine/ast_validator/validators/

echo
echo "=== Read the ValidateCallKwargs file ==="
wc -l osprey_worker/src/osprey/engine/ast_validator/validators/validate_call_kwargs.py

echo
echo "=== Look at UDF instantiation in ValidateCallKwargs ==="
rg -n 'def visit|UDFBase|instantiat|registry\.get' osprey_worker/src/osprey/engine/ast_validator/validators/validate_call_kwargs.py | head -40

Repository: roostorg/osprey

Length of output: 685


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Read ValidateCallKwargs to understand UDF instance creation ==="
cat -n osprey_worker/src/osprey/engine/ast_validator/validators/validate_call_kwargs.py | head -100

Repository: roostorg/osprey

Length of output: 5162


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Continue reading ValidateCallKwargs (lines 100-211) ==="
cat -n osprey_worker/src/osprey/engine/ast_validator/validators/validate_call_kwargs.py | sed -n '100,211p'

Repository: roostorg/osprey

Length of output: 7003


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check when ValidateCallKwargs runs and if ValidatedSources are reused ==="
rg -n 'ValidatedSources|validate\(' osprey_worker/src/osprey/engine/executor/executor.py | head -30

echo
echo "=== Search for where ASTs are compiled/cached ==="
rg -n 'parse|compile|ValidatedSources\(' osprey_worker/src/osprey/worker/lib/osprey_engine.py | head -20

echo
echo "=== Check concurrency model in executor ==="
rg -n 'gevent|threading|asyncio|concurrent|Thread' osprey_worker/src/osprey/engine/executor/executor.py | head -20

echo
echo "=== Inspect the actual worker execution flow ==="
fd -t f 'osprey_engine.py' osprey_worker/src/osprey/worker/lib/ | xargs wc -l

Repository: roostorg/osprey

Length of output: 137


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Look at where execute() is called in the main executor ==="
rg -n 'def execute\(' osprey_worker/src/osprey/engine/executor/executor.py

echo
echo "=== Check how execution graph is built and sources are validated ==="
rg -n 'validated_sources|execution_graph|ExecutionGraph' osprey_worker/src/osprey/engine/executor/executor.py | head -30

echo
echo "=== Check the concurrency model with gevent hints ==="
rg -n 'gevent|Greenlet|Pool' osprey_worker/src/osprey/engine/executor/executor.py | head -20

echo
echo "=== Inspect worker lifecycle - when parsing/validation happens ==="
rg -n 'def execute|validated_sources|parse' osprey_worker/src/osprey/worker/lib/osprey_engine.py | head -40

Repository: roostorg/osprey

Length of output: 2916


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Read the complete WhenRules class around lines 166-193 ==="
sed -n '116,200p' osprey_worker/src/osprey/engine/stdlib/udfs/rules.py | cat -n

echo
echo "=== Check if there are any synchronization primitives (locks) used ==="
rg -n 'Lock|RLock|Semaphore|Event' osprey_worker/src/osprey/engine/stdlib/udfs/rules.py

Repository: roostorg/osprey

Length of output: 5033


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Check what execution_context.resolved() does ==="
rg -n 'def resolved\(' osprey_worker/src/osprey/engine/executor/execution_context.py

echo
echo "=== Check if execution_context has any gevent-aware operations ==="
fd -t f 'execution_context.py' osprey_worker/src/osprey/engine/executor/ | xargs wc -l

echo
echo "=== Check for any gevent-aware calls in resolve_arguments or execution context ==="
rg -n 'gevent|sleep|join|wait' osprey_worker/src/osprey/engine/executor/execution_context.py

Repository: roostorg/osprey

Length of output: 913


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Read execution_context.resolved() implementation ==="
sed -n '178,220p' osprey_worker/src/osprey/engine/executor/execution_context.py | cat -n

echo
echo "=== Check how CallExecutor fits into the executor flow and when greenlets yield ==="
rg -n -B5 -A5 'call_executor.execute\(\)|resolved_arguments.*=.*_udf.resolve' osprey_worker/src/osprey/engine/executor/executor.py | head -60

echo
echo "=== Look for gevent.sleep or other yields in the executor loop ==="
rg -n 'gevent.sleep\|greenlet.*spawn\|gevent.wait' osprey_worker/src/osprey/engine/executor/executor.py | head -20

Repository: roostorg/osprey

Length of output: 2660


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Verify if CallExecutor instances are created fresh each execution or reused ==="
rg -n 'BaseNodeExecutor\|class.*Executor.*__init__' osprey_worker/src/osprey/engine/executor/node_executor/_base_node_executor.py | head -20

echo
echo "=== Check how execution graph creates executors (fresh vs reused) ==="
rg -n 'ExecutionGraph\|_executors\|executor.*=' osprey_worker/src/osprey/engine/executor/execution_graph.py | head -30

echo
echo "=== Confirm: same AST node = same executor = same UDF instance ==="
rg -n 'executor.node\|id\(node\)' osprey_worker/src/osprey/engine/executor/executor.py | head -20

Repository: roostorg/osprey

Length of output: 143


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "=== Search for execution graph and executor caching ==="
fd -t f 'execution_graph.py' osprey_worker/src/osprey/engine/executor/

echo
echo "=== Read how executors are managed in execution_graph ==="
head -100 osprey_worker/src/osprey/engine/executor/execution_graph.py | cat -n

echo
echo "=== Verify that CallExecutor uses SAME UDF instance across multiple rule executions ==="
grep -n 'class CallExecutor\|_udf.*=' osprey_worker/src/osprey/engine/executor/node_executor/call_executor.py

Repository: roostorg/osprey

Length of output: 5720


Move per-invocation audit state off self to prevent cross-contamination under gevent concurrency.

self._failed_rule_names and self._then_failed are set during resolve_arguments() and read during execute(). While a single CallExecutor.execute() call sequences these atomically, the same WhenRules instance is reused across multiple concurrent gevent greenlet executions. Greenlets can interleave between different invocations—if greenlet A calls resolve_arguments() (setting state), then greenlet B overwrites the state, greenlet A will read wrong values in execute().

Store audit state in the resolved arguments object instead. For example, extend WhenRulesArguments with failed_rule_names and then_failed fields, set them in resolve_arguments() alongside the other fields, and read them from arguments in execute() rather than from self.

Also applies to: lines 190–193

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_worker/src/osprey/engine/stdlib/udfs/rules.py` around lines 166 - 171,
Move the audit state variables from instance attributes to the arguments object
to avoid cross-contamination under gevent concurrency. Extend the
WhenRulesArguments class with new fields for failed_rule_names and then_failed,
then update the resolve_arguments method to assign these values to the arguments
object (e.g., arguments.failed_rule_names = failed_rule_names) instead of to
self._failed_rule_names and self._then_failed. Finally, update the execute
method (lines 190-193) to read these values from the arguments parameter
(arguments.failed_rule_names and arguments.then_failed) instead of from self.

haileyok added 5 commits June 17, 2026 06:24
Drops the pluggable LLM provider surface entirely: the worker lib/llm package
(base, tool-calling loop, tools + tests), the register_llm_provider hookspec and
bootstrap_llm_provider, the example Anthropic provider, docs/llm.md and its doc
references, and the anthropic dependency (re-locked out of uv.lock).
…put)

- etcd sources provider: dedup on the APPLIED hash and override
  mark_sources_applied so a failed compile self-heals on the next identical
  etcd re-delivery instead of wedging on stale rules (+ regression test).
- Kafka input: manual offset commit after each processed batch (at-least-once)
  instead of auto-commit-before-processing that could lose actions.
- coordinator stream: send a Nack when the context was marked nack instead of
  always acking; add BaseAckingContext.should_nack accessor.
- async CLI: surface a fatal sink-task error instead of reporting clean
  shutdown; fall back to the stdout sink (with a warning) when --with-plugins
  finds no registered async output sink.
The normal ack path consulted should_nack, but the shutdown / rule-reload /
uptime graceful-disconnect branches still always-acked. Compute ack/verdicts
once after the yield and use it on every finalize path, so a nacked action is
nacked regardless of which path finalizes it. Adds a graceful-disconnect nack test.
- AsyncKafkaInputStream funnels poll/commit/close through a single worker
  thread (ThreadPoolExecutor max_workers=1) so stop()'s close() can never race
  an in-flight poll() on the thread-unsafe kafka-python consumer (+ regression
  test that blocks in poll while stop() is requested).
- test_discovery_pool_creates_channels: make async so grpc.aio.insecure_channel
  binds to a running loop (was order-dependent 'no running event loop').
- coordinator stop() tests: provide an AsyncMock channel pool so stop()'s
  channel_pool.close() is awaited (and assert it).
On SIGTERM/SIGINT, stop the input stream first so the in-flight action can
finalize through the stream's own shutdown path (the coordinator stream
acks/nacks and graceful-disconnects after the current yield), then drain the
sink task with a bounded timeout, cancelling only as a fallback. Previously the
sink task was cancelled outright, which could skip the finalize block and leave
an in-flight action unacked.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py`:
- Line 158: The call to poll_entered.wait() in the run_in_executor block at line
158 does not check or assert the return value, which means if the wait times out
the test will continue without verifying that the poll actually started. Capture
the boolean return value from the poll_entered.wait(2.0) call and assert that it
is True to ensure the synchronization point was reached before the timeout,
confirming an in-flight poll existed before proceeding with the stop-path
assertion.
- Around line 34-40: Replace the type: ignore comment and type violations with a
Protocol-based approach. Create a Protocol class that defines the interface for
the stream object with a _stopped attribute, then change the self.stream type
annotation from object to use this Protocol instead of object. Remove the #
type: ignore[attr-defined] comment from the poll method where
self.stream._stopped is accessed. Additionally, locate any Sequence[Any] type
annotations in the file (specifically around line 139) and replace Any with the
appropriate concrete type or a Protocol definition to comply with the
disallow_any_generics mypy configuration.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: cd8bc8bb-dd6b-4fc0-81dc-2c2e8ec0cfa8

📥 Commits

Reviewing files that changed from the base of the PR and between 134d76c and 2fc6b4d.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (13)
  • docker-compose.yaml
  • docs/development/README.md
  • example_plugins/pyproject.toml
  • osprey_async_worker/src/osprey/async_worker/cli/main.py
  • osprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.py
  • osprey_async_worker/src/osprey/async_worker/lib/etcd/sources_provider.py
  • osprey_async_worker/src/osprey/async_worker/sinks/sink/input_stream.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_coordinator_input_stream.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_etcd_sources_provider.py
  • osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py
  • osprey_worker/src/osprey/worker/adaptor/hookspecs/osprey_hooks.py
  • osprey_worker/src/osprey/worker/adaptor/plugin_manager.py
  • osprey_worker/src/osprey/worker/sinks/utils/acking_contexts_base.py
💤 Files with no reviewable changes (3)
  • osprey_worker/src/osprey/worker/adaptor/hookspecs/osprey_hooks.py
  • example_plugins/pyproject.toml
  • osprey_worker/src/osprey/worker/adaptor/plugin_manager.py
✅ Files skipped from review due to trivial changes (1)
  • docs/development/README.md
🚧 Files skipped from review as they are similar to previous changes (6)
  • osprey_worker/src/osprey/worker/sinks/utils/acking_contexts_base.py
  • osprey_async_worker/src/osprey/async_worker/sinks/sink/input_stream.py
  • docker-compose.yaml
  • osprey_async_worker/src/osprey/async_worker/cli/main.py
  • osprey_async_worker/src/osprey/async_worker/lib/coordinator_input_stream.py
  • osprey_async_worker/src/osprey/async_worker/lib/etcd/sources_provider.py

Comment on lines +34 to +40
self.stream: object = None # set by the test to enable auto-stop

def poll(self, timeout_ms: int = 0, max_records: int = 0) -> Mapping[str, Sequence[_FakeRecord]]:
if self._batches:
return {'tp-0': self._batches.pop(0)}
if self.stream is not None:
self.stream._stopped = True # type: ignore[attr-defined]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Verify repository mypy typing constraints relevant to Any / ignores.
fd -a '^pyproject.toml$' | xargs -r rg -n '^\[tool\.mypy\]|disallow.*any|warn.*ignore|strict'

# Verify introduced Any / type-ignore occurrences in this changed test file.
rg -n --type=py '\bAny\b|#\s*type:\s*ignore' osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py

Repository: roostorg/osprey

Length of output: 990


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Read the test file to see context around lines 34-40 and 139
echo "=== Context around lines 34-40 ==="
sed -n '25,50p' osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py

echo -e "\n=== Context around lines 139 ==="
sed -n '130,150p' osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py

echo -e "\n=== Full imports and class definitions ==="
head -n 50 osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py

Repository: roostorg/osprey

Length of output: 3323


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check git status and recent changes to the test file
echo "=== Git diff for the test file ==="
git diff HEAD -- osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py | head -200

echo -e "\n=== Check if there are staged changes ==="
git diff --cached -- osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py | head -200

Repository: roostorg/osprey

Length of output: 137


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Check git log for recent changes to this file
echo "=== Recent commits touching the test file ==="
git log --oneline -20 -- osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py

echo -e "\n=== Show the last commit that touched this file ==="
git log -1 -p -- osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py | head -300

Repository: roostorg/osprey

Length of output: 7953


Address mypy violations: replace Sequence[Any] and # type: ignore with proper typing

This new test file introduces typing escapes at lines 40 and 139 that conflict with the repository's mypy configuration (disallow_any_generics = true):

  • Line 40: # type: ignore[attr-defined] suppresses a valid type error instead of using proper typing
  • Line 139: Sequence[Any] violates disallow_any_generics since Any is used in a generic type signature

Follow the coding guideline to avoid new # type: ignore and Any. Replace these with a Protocol-based approach and object type:

Suggested cleanup
-from typing import Any, List, Mapping, Sequence
+from typing import List, Mapping, Protocol, Sequence
+
+
+class _SupportsStoppedFlag(Protocol):
+    _stopped: bool
@@
-        self.stream: object = None  # set by the test to enable auto-stop
+        self.stream: _SupportsStoppedFlag | None = None  # set by the test to enable auto-stop
@@
-            self.stream._stopped = True  # type: ignore[attr-defined]
+            self.stream._stopped = True
@@
-        def poll(self, timeout_ms: int = 0, max_records: int = 0) -> Mapping[str, Sequence[Any]]:
+        def poll(self, timeout_ms: int = 0, max_records: int = 0) -> Mapping[str, Sequence[object]]:
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py`
around lines 34 - 40, Replace the type: ignore comment and type violations with
a Protocol-based approach. Create a Protocol class that defines the interface
for the stream object with a _stopped attribute, then change the self.stream
type annotation from object to use this Protocol instead of object. Remove the #
type: ignore[attr-defined] comment from the poll method where
self.stream._stopped is accessed. Additionally, locate any Sequence[Any] type
annotations in the file (specifically around line 139) and replace Any with the
appropriate concrete type or a Protocol definition to comply with the
disallow_any_generics mypy configuration.

Source: Coding guidelines

anext_task = asyncio.create_task(stream.__aiter__().__anext__())

# Wait until poll() is actually executing on the consumer's worker thread.
await asyncio.get_running_loop().run_in_executor(None, poll_entered.wait, 2.0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Assert poll-start synchronization explicitly to avoid false-pass behavior

Line 158 drops the boolean result of poll_entered.wait(...). If the wait times out, the test can continue without proving an in-flight poll existed before the stop-path assertion.

Suggested fix
-    await asyncio.get_running_loop().run_in_executor(None, poll_entered.wait, 2.0)
+    started = await asyncio.get_running_loop().run_in_executor(None, poll_entered.wait, 2.0)
+    assert started, 'poll() did not start within timeout'
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@osprey_async_worker/src/osprey/async_worker/tests/test_kafka_input_stream.py`
at line 158, The call to poll_entered.wait() in the run_in_executor block at
line 158 does not check or assert the return value, which means if the wait
times out the test will continue without verifying that the poll actually
started. Capture the boolean return value from the poll_entered.wait(2.0) call
and assert that it is True to ensure the synchronization point was reached
before the timeout, confirming an in-flight poll existed before proceeding with
the stop-path assertion.

haileyok added 5 commits June 17, 2026 13:35
The graceful drain timeout only covered awaiting the sink task; rules_sink.stop()
ran outside it, so a slow/stuck consumer.close() (Kafka) could hang shutdown
indefinitely. Run stop() + drain together under one wait_for budget, cancelling
both the drain and the sink task if it overruns.
4.13+ switched its metadata to the SPDX id PSF-2.0, which the allow-list
doesn't recognize (it lists the equivalent Python-2.0 / Python Software
Foundation License classifier). 4.12.2 still reports the allowed classifier
and satisfies pytest-asyncio 1.4.0's typing-extensions>=4.12 floor, so
pytest-asyncio stays at 1.4.0 (0.23.x is incompatible with pytest 9).
example_plugins/src/tests/test_register_async_plugins.py imports
osprey.async_worker, which the worker image deliberately excludes
(--no-install-package osprey-async-worker). In the integration-tests
container that import is unresolvable, so it aborted collection of the
whole suite. Relocate it to the async worker's own test package, where
osprey.async_worker is available.

@haileyok haileyok left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i most closely looked at things that touch files still relevant to gevent, and those changes seem fine. the async code runs well against bluesky firehose (ran for ~24 hours without problems) and is quite a bit more efficient than gevnt which is cool.

we'll want to add some docs about what this is and how to use it, but esp. since this is still experimental we can do that in a followup.

@cmttt cmttt merged commit 74c8c85 into roostorg:main Jun 25, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants