feat: improve query lifecycle queueing and admin configuration#80
feat: improve query lifecycle queueing and admin configuration#80amitgilad3 wants to merge 1 commit into
Conversation
…tocol queue timeouts, and expose queue timeout settings through the admin API and Studio UI. Also includes query lifecycle cleanup, persistence wiring, focused queue-timeout tests, and backlog documentation for distributed queueing and routing work.
📝 WalkthroughWalkthroughThis PR implements configurable queue wait timeouts for cluster groups and refactors the query dispatch system to centralize routing, translation, and parameter handling. It moves ChangesQueue timeout feature and dispatch refactoring
🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR extends QueryFlux’s query lifecycle tracking and configuration by adding per-cluster-group queue timeout support (wired/sync execution paths) and improving audit/metrics fidelity, while also refactoring routing/dispatch plumbing and moving GuardAction to queryflux-core for reuse.
Changes:
- Add per-cluster-group
queueTimeoutMsconfiguration end-to-end (DB migration + persistence + admin UI/types + live config wiring). - Refactor frontend routing/dispatch to centralize route resolution and propagate routing trace through execution paths.
- Introduce cancellation/slot-leak safety improvements for sync execution and re-home
GuardActionintoqueryflux-core.
Reviewed changes
Copilot reviewed 30 out of 31 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| queryflux-studio/lib/group-config-helpers.ts | Parse/serialize new queueTimeoutMs field for group upserts. |
| queryflux-studio/lib/api-types.ts | Add queueTimeoutMs to API types with docs. |
| queryflux-studio/components/groups-config-panel.tsx | Display group queue timeout in the groups table. |
| queryflux-studio/components/group-form-dialog.tsx | Add form field + validation for queue timeout. |
| crates/queryflux/src/main.rs | Populate live config map for per-group queue timeouts. |
| crates/queryflux-persistence/src/postgres/mod.rs | Read/write queue_timeout_ms in group config queries and upsert. |
| crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql | Add queue_timeout_ms column to cluster_group_configs. |
| crates/queryflux-persistence/src/metrics_store.rs | Remove local GuardAction definition (now in core). |
| crates/queryflux-persistence/src/lib.rs | Re-export GuardAction from core for backwards compatibility. |
| crates/queryflux-persistence/src/in_memory.rs | Persist/roundtrip queue_timeout_ms in in-memory store + test. |
| crates/queryflux-persistence/src/cluster_config.rs | Add queue_timeout_ms to persistence config structs + conversions + tests. |
| crates/queryflux-metrics/src/lib.rs | Re-export GuardAction from core (keep public API stable). |
| crates/queryflux-guardrails/src/lib.rs | Switch guard action type import to core. |
| crates/queryflux-guardrails/src/chain.rs | Switch guard action type import to core. |
| crates/queryflux-guardrails/Cargo.toml | Drop unnecessary dependency on persistence crate. |
| crates/queryflux-frontend/src/trino_http/trino_dispatch.rs | New Trino dispatch helpers for URI rewrite + async terminal finalization. |
| crates/queryflux-frontend/src/trino_http/mod.rs | Export new trino_dispatch module. |
| crates/queryflux-frontend/src/trino_http/handlers.rs | Use centralized routing + pass routing trace into dispatch/execution. |
| crates/queryflux-frontend/src/state.rs | Add group_queue_timeouts and queue_duration_ms to recorded outcomes. |
| crates/queryflux-frontend/src/snowflake/tests.rs | Update test live config construction for new field. |
| crates/queryflux-frontend/src/snowflake/sql_api/handlers.rs | Switch to route_and_execute helper. |
| crates/queryflux-frontend/src/snowflake/http/handlers/session.rs | Use centralized resolve_route for login routing. |
| crates/queryflux-frontend/src/snowflake/http/handlers/query.rs | Pass routing trace arg into execute_to_sink. |
| crates/queryflux-frontend/src/postgres_wire/mod.rs | Switch to route_and_execute (central routing). |
| crates/queryflux-frontend/src/mysql_wire/mod.rs | Switch to route_and_execute (central routing). |
| crates/queryflux-frontend/src/flight_sql/mod.rs | Switch to route_and_execute (central routing). |
| crates/queryflux-frontend/src/dispatch.rs | Centralize routing, add sync queue timeout loop, propagate routing trace, add cancellation/slot guards. |
| crates/queryflux-e2e-tests/src/harness.rs | Update test live config construction for new field. |
| crates/queryflux-core/src/query.rs | Move GuardAction into core query types. |
| crates/queryflux-core/src/config.rs | Add queue_timeout_ms to cluster group config schema/docs. |
| Cargo.lock | Remove unused dependency edge from guardrails. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| sql: executing | ||
| .translated_sql | ||
| .as_deref() | ||
| .unwrap_or(&executing.sql) | ||
| .to_string(), |
| translated_sql: if was_translated { | ||
| Some(executing.sql.clone()) | ||
| } else { | ||
| None | ||
| }, |
| backend_query_id: None, | ||
| status: QueryStatus::Failed, | ||
| queue_duration_ms: 0, | ||
| execution_ms: start.elapsed().as_millis() as u64, | ||
| rows: None, |
| QueryOutcome { | ||
| backend_query_id: None, | ||
| status: QueryStatus::Cancelled, | ||
| queue_duration_ms: 0, | ||
| execution_ms: elapsed, | ||
| rows: None, | ||
| error: Some("Client disconnected".to_string()), | ||
| routing_trace: None, | ||
| engine_stats: None, |
| /// Maximum time (ms) a wire-protocol query will wait for a free cluster slot before | ||
| /// returning an error. `None` → wait indefinitely (legacy behavior). | ||
| /// Only affects MySQL wire, Postgres wire, FlightSQL, and Snowflake HTTP. | ||
| /// Trino HTTP uses its own persistent queue with client-driven polling. | ||
| #[serde(default)] |
| queue_duration_ms: 0, | ||
| execution_ms: elapsed, | ||
| rows: None, | ||
| error: Some("Client disconnected".to_string()), | ||
| routing_trace: None, |
There was a problem hiding this comment.
CancellationGuard records Cancelled with hardcoded queue_duration_ms: 0 and routing_trace: None; can we thread setup.queue_duration_ms and routing_trace into it when constructed?
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-frontend/src/dispatch.rs around lines 759-812, update the
CancellationGuard struct/impl so that its Drop uses the real queue and routing
provenance instead of hardcoded values. Specifically, when CancellationGuard is
constructed in execute_to_sink (around lines 1475-1479), pass setup.queue_duration_ms
and routing_trace into CancellationGuard, store them on the guard, and then use them in
the QueryOutcome fields queue_duration_ms and routing_trace during Drop. Ensure
routing_trace is cloned (or otherwise owned) so it remains valid inside the tokio::spawn
closure.
Heads up!
Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription
| } | ||
| } | ||
|
|
||
| queued_backoff_delay(seq).await; |
There was a problem hiding this comment.
queue_timeout_ms can be exceeded by queued_backoff_delay(seq) since the timeout check runs before the sleep; can we cap it to the remaining time or re-check right after, so sync wire clients fail on time?
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-frontend/src/dispatch.rs around lines 936-994, inside
`setup_sync_query`’s cluster-slot acquisition loop, the timeout check happens before
`queued_backoff_delay(seq).await`, so even a tiny `queue_timeout_ms` can exceed the
limit by one full backoff sleep. Refactor the loop to cap the backoff sleep by the
remaining timeout (or skip/sleep-until using a deadline), and re-check
`queue_start.elapsed()` immediately after sleeping before continuing the loop. Ensure
that when the timeout is exceeded, the function returns the same timeout error without
waiting for the entire backoff tick.
Heads up!
Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription
| QueryOutcome { | ||
| backend_query_id: None, | ||
| status: QueryStatus::Failed, | ||
| queue_duration_ms: 0, | ||
| execution_ms: start.elapsed().as_millis() as u64, |
There was a problem hiding this comment.
queue_duration_ms is already measured above, but the translation-failure path still records 0, so queued queries that fail translation lose their wait time in query_history; should we pass the measured queue_duration_ms into that failure outcome?
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-frontend/src/dispatch.rs around lines 1056-1072 inside `async fn
setup_sync_query`’s `translate_and_prepare(...).await` Err(e) branch, the
`QueryOutcome` being recorded sets `queue_duration_ms: 0` despite `queue_duration_ms`
being computed earlier after the cluster slot acquisition loop. Change this failure path
to use the real measured `queue_duration_ms` value (the one derived from
`queue_start.elapsed()`) instead of 0, and make sure the variable is accessible in that
scope. This ensures translation-failed queries that queued still persist the correct
wait time in query_history.
Heads up!
Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription
| // Cancellation safety: if the future is dropped from here on (client disconnect), | ||
| // the guard records the query as Cancelled so it appears in query history. | ||
| // Disarmed below after the normal-path record_query call. | ||
| let mut cancel_guard = CancellationGuard::new(state.clone(), setup.ctx.clone(), setup.start); |
There was a problem hiding this comment.
Can we move CancellationGuard before setup_sync_query(...).await, otherwise disconnects during setup can drop the future before the Cancelled history row is written?
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-frontend/src/dispatch.rs around lines 1350-1506 (the execute_to_sink
function), the CancellationGuard is currently created after setup_sync_query(...).await,
so client disconnects during the sync queue loop or translation/preparation won’t be
recorded as QueryStatus::Cancelled. Refactor so cancellation tracking starts before
awaiting setup_sync_query (wrap the entire setup+execution future in an RAII/drop guard,
or construct the guard earlier and let it take ownership of recording once QueryContext
is available). Ensure the guard is disarmed on the normal success path after
state.record_query, and on setup_sync_query error/early returns make sure the guard
doesn’t incorrectly double-record or miss queue_duration_ms (record a Cancelled entry
with whatever fields are known, or postpone recording until ctx is constructed but still
keep the guard alive across the await).
Heads up!
Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription
| max_running_queries: cfg.max_running_queries as i64, | ||
| max_queued_queries: cfg.max_queued_queries.map(|v| v as i64), | ||
| queue_timeout_ms: cfg.queue_timeout_ms.map(|v| v as i64), | ||
| strategy, |
There was a problem hiding this comment.
queue_timeout_ms is cast with as i64 before the upsert and as u64 on reload, so values above i64::MAX or negative DB values can wrap; should we reject out-of-range queueTimeoutMs with a checked conversion before writing to Postgres?
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-persistence/src/cluster_config.rs around lines 230-250, inside
UpsertClusterGroupConfig::from_core, the assignment queue_timeout_ms:
cfg.queue_timeout_ms.map(|v| v as i64) performs an unchecked cast that can overflow or
wrap. Refactor this to use a checked conversion (or explicit validation) before building
the upsert payload: if the value doesn’t fit i64, reject the conversion (e.g., return
an error) or log and set it to None per the project’s error-handling conventions.
Also, in crates/queryflux-persistence/src/cluster_config.rs around lines 264-297, inside
ClusterGroupConfigRecord::to_core, replace queue_timeout_ms:
self.queue_timeout_ms.map(|v| v as u64) with a safe conversion that handles negative DB
values (log + None or error) to prevent silent wrapping back into u64. Add/extend tests
around the existing queue_timeout_ms tests (lines ~418-436) to cover an out-of-range
value (above i64::MAX) and a negative DB value.
Heads up!
Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
crates/queryflux-frontend/src/dispatch.rs (1)
521-643:⚠️ Potential issue | 🟠 Major | ⚡ Quick winSync dispatch via Trino HTTP still drops cancelled requests from history.
This branch only uses
ClusterSlotGuard. If the axum request future is dropped duringexecute_as_arrow, the slot is released, but no terminalrecord_querycall happens.execute_to_sinkalready fixed the same problem withCancellationGuard; the sync path here needs the same protection or cancelled Trino-HTTP requests silently disappear from query history.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/queryflux-frontend/src/dispatch.rs` around lines 521 - 643, The sync Trino-HTTP path uses only ClusterSlotGuard so if the axum request future is dropped during sync_adapter.execute_as_arrow the slot is released but record_query is never called; wrap the sync dispatch (before calling execute_as_arrow) with the same cancellation protection used by execute_to_sink (i.e. create a CancellationGuard instance around the QueryContext/QueryOutcome) so that a cancellation will run the terminal record_query logic, then drop the CancellationGuard only after record_query and slot.release() complete; reference ClusterSlotGuard, execute_as_arrow, CancellationGuard, execute_to_sink, record_query and slot.release() when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/queryflux-frontend/src/dispatch.rs`:
- Around line 788-809: The CancellationGuard drop currently writes a terminal
QueryOutcome with hardcoded queue_duration_ms: 0, routing_trace: None, and an
empty guard_actions list; update the drop task (impl Drop for CancellationGuard
-> drop) to preserve the measured metadata: pass the actual queue_duration_ms
that was computed earlier, include the resolved routing_trace produced by
execute_to_sink, and propagate any pre-execution guard actions and
was_guard_blocked flag from setup.guard_actions/guard state into QueryOutcome so
cancelled records retain the same queue/routing/guard metadata; apply the same
fix to the translation-failure helper-created records (the other block noted
around the 1056–1068 region) so they also use the measured queue_duration_ms,
routing_trace, and setup.guard_actions/was_guard_blocked when calling
state.record_query.
In `@crates/queryflux-frontend/src/trino_http/handlers.rs`:
- Around line 831-832: The final QueryOutcome currently hardcodes
queue_duration_ms: 0 in get_executing_statement (both success and failure
branches), which loses real queued time; update the QueryOutcome construction to
use the actual captured queue duration from the executing query
context/persistence record (e.g., use the executing query's stored queue
duration field or the persistence record returned to state.record_query) instead
of 0—locate the QueryOutcome creations in get_executing_statement and replace
queue_duration_ms: 0 with the appropriate value (for example
executing_query.queue_duration_ms or record.queue_duration_ms), preserving the
existing execution_ms/elapsed_ms handling.
In `@crates/queryflux-frontend/src/trino_http/trino_dispatch.rs`:
- Around line 127-197: finalize_trino_async_terminal_on_submit currently records
outcomes with routing_trace = None and queue_duration_ms = 0, losing
routing/dequeue metadata added on the fast-path submit; propagate those values
into the outcome before calling state.record_query. Modify dispatch_query (or
add fields on ExecutingQuery) so the routing_trace and queue_duration_ms
computed at dispatch are stored on ExecutingQuery (e.g., new fields
routing_trace: Option<_> and queue_duration_ms: u64) or passed into
finalize_trino_async_terminal_on_submit, then read those values and set
outcome.routing_trace and outcome.queue_duration_ms accordingly before
state.record_query(&ctx, outcome) in finalize_trino_async_terminal_on_submit.
In `@crates/queryflux-persistence/src/cluster_config.rs`:
- Line 244: The current unchecked casts for queue_timeout_ms (in the code that
sets queue_timeout_ms: cfg.queue_timeout_ms.map(|v| v as i64) and the reciprocal
self.queue_timeout_ms.map(|v| v as u64)) can overflow; replace both with checked
conversions using i64::try_from(...) and u64::try_from(...), propagate or return
an explicit error when conversion fails (do not use as), and update the
surrounding functions/constructors (the places building the cluster config and
the reverse conversion) to handle the Result/Option properly so overflow is
detected and reported instead of silently wrapping.
In
`@crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql`:
- Around line 3-4: The migration currently adds queue_timeout_ms to
cluster_group_configs but allows negative values; update the migration to
enforce non-negative values by first neutralizing existing negatives (e.g.,
UPDATE cluster_group_configs SET queue_timeout_ms = 0 WHERE queue_timeout_ms <
0;) and then add an explicit CHECK constraint (e.g., ADD CONSTRAINT
chk_cluster_group_configs_queue_timeout_ms_nonneg CHECK (queue_timeout_ms >= 0))
so the database rejects future negative values for column queue_timeout_ms on
table cluster_group_configs.
In `@queryflux-studio/components/group-form-dialog.tsx`:
- Around line 212-221: The validation for queue timeout (variables
queueTimeout/qt and qTimeout) uses parseInt which truncates values like "1.5" or
"1e3"; change the check to reject any non-integer strings by parsing with Number
(or using a strict /^\d+$/ test) and then using Number.isInteger on the parsed
value and ensuring it's >= 1 before assigning qTimeout; update the code path
that currently calls parseInt and the setError message (via setError) so inputs
like "1.5", "1e3", "-2", or non-numeric strings are rejected as invalid.
---
Outside diff comments:
In `@crates/queryflux-frontend/src/dispatch.rs`:
- Around line 521-643: The sync Trino-HTTP path uses only ClusterSlotGuard so if
the axum request future is dropped during sync_adapter.execute_as_arrow the slot
is released but record_query is never called; wrap the sync dispatch (before
calling execute_as_arrow) with the same cancellation protection used by
execute_to_sink (i.e. create a CancellationGuard instance around the
QueryContext/QueryOutcome) so that a cancellation will run the terminal
record_query logic, then drop the CancellationGuard only after record_query and
slot.release() complete; reference ClusterSlotGuard, execute_as_arrow,
CancellationGuard, execute_to_sink, record_query and slot.release() when making
the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7da6194e-ee12-47ad-b9b6-b7c57feb2bdd
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (30)
crates/queryflux-core/src/config.rscrates/queryflux-core/src/query.rscrates/queryflux-e2e-tests/src/harness.rscrates/queryflux-frontend/src/dispatch.rscrates/queryflux-frontend/src/flight_sql/mod.rscrates/queryflux-frontend/src/mysql_wire/mod.rscrates/queryflux-frontend/src/postgres_wire/mod.rscrates/queryflux-frontend/src/snowflake/http/handlers/query.rscrates/queryflux-frontend/src/snowflake/http/handlers/session.rscrates/queryflux-frontend/src/snowflake/sql_api/handlers.rscrates/queryflux-frontend/src/snowflake/tests.rscrates/queryflux-frontend/src/state.rscrates/queryflux-frontend/src/trino_http/handlers.rscrates/queryflux-frontend/src/trino_http/mod.rscrates/queryflux-frontend/src/trino_http/trino_dispatch.rscrates/queryflux-guardrails/Cargo.tomlcrates/queryflux-guardrails/src/chain.rscrates/queryflux-guardrails/src/lib.rscrates/queryflux-metrics/src/lib.rscrates/queryflux-persistence/src/cluster_config.rscrates/queryflux-persistence/src/in_memory.rscrates/queryflux-persistence/src/lib.rscrates/queryflux-persistence/src/metrics_store.rscrates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sqlcrates/queryflux-persistence/src/postgres/mod.rscrates/queryflux/src/main.rsqueryflux-studio/components/group-form-dialog.tsxqueryflux-studio/components/groups-config-panel.tsxqueryflux-studio/lib/api-types.tsqueryflux-studio/lib/group-config-helpers.ts
💤 Files with no reviewable changes (1)
- crates/queryflux-guardrails/Cargo.toml
| impl Drop for CancellationGuard { | ||
| fn drop(&mut self) { | ||
| if let Some(ctx) = self.ctx.take() { | ||
| let state = self.state.clone(); | ||
| let elapsed = self.start.elapsed().as_millis() as u64; | ||
| tokio::spawn(async move { | ||
| state.record_query( | ||
| &ctx, | ||
| QueryOutcome { | ||
| backend_query_id: None, | ||
| status: QueryStatus::Cancelled, | ||
| queue_duration_ms: 0, | ||
| execution_ms: elapsed, | ||
| rows: None, | ||
| error: Some("Client disconnected".to_string()), | ||
| routing_trace: None, | ||
| engine_stats: None, | ||
| guard_actions: vec![], | ||
| was_guard_blocked: false, | ||
| }, | ||
| ); | ||
| }); |
There was a problem hiding this comment.
Preserve routing and queue metadata on helper-owned terminal records.
Both the translation-failure record and CancellationGuard hardcode queue_duration_ms: 0 / routing_trace: None, even though this code already measured queue_duration_ms and execute_to_sink already resolved a routing_trace. Cancelled/failed sync queries will therefore lose the new history fields on exactly the branches that should still be persisted. The cancel path also drops any pre-execution guard actions collected into setup.guard_actions.
Also applies to: 1056-1068
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/queryflux-frontend/src/dispatch.rs` around lines 788 - 809, The
CancellationGuard drop currently writes a terminal QueryOutcome with hardcoded
queue_duration_ms: 0, routing_trace: None, and an empty guard_actions list;
update the drop task (impl Drop for CancellationGuard -> drop) to preserve the
measured metadata: pass the actual queue_duration_ms that was computed earlier,
include the resolved routing_trace produced by execute_to_sink, and propagate
any pre-execution guard actions and was_guard_blocked flag from
setup.guard_actions/guard state into QueryOutcome so cancelled records retain
the same queue/routing/guard metadata; apply the same fix to the
translation-failure helper-created records (the other block noted around the
1056–1068 region) so they also use the measured queue_duration_ms,
routing_trace, and setup.guard_actions/was_guard_blocked when calling
state.record_query.
| queue_duration_ms: 0, | ||
| execution_ms: elapsed_ms, |
There was a problem hiding this comment.
Propagate actual queue wait in final outcomes instead of hardcoding zero.
Line 831 and Line 867 set queue_duration_ms: 0 for both success and failure in get_executing_statement. That value is persisted by state.record_query and surfaced as Trino queued_time_millis, so queued queries will be recorded/reported as if they never waited. Use the captured queue duration from the executing query context/persistence record when constructing QueryOutcome.
Also applies to: 867-868
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/queryflux-frontend/src/trino_http/handlers.rs` around lines 831 - 832,
The final QueryOutcome currently hardcodes queue_duration_ms: 0 in
get_executing_statement (both success and failure branches), which loses real
queued time; update the QueryOutcome construction to use the actual captured
queue duration from the executing query context/persistence record (e.g., use
the executing query's stored queue duration field or the persistence record
returned to state.record_query) instead of 0—locate the QueryOutcome creations
in get_executing_statement and replace queue_duration_ms: 0 with the appropriate
value (for example executing_query.queue_duration_ms or
record.queue_duration_ms), preserving the existing execution_ms/elapsed_ms
handling.
| pub(crate) async fn finalize_trino_async_terminal_on_submit( | ||
| state: &Arc<AppState>, | ||
| cluster_manager: &Arc<dyn ClusterGroupManager>, | ||
| executing: &ExecutingQuery, | ||
| adapter: &Arc<dyn AsyncAdapter>, | ||
| session: &SessionContext, | ||
| protocol: FrontendProtocol, | ||
| body: &Bytes, | ||
| ) { | ||
| let elapsed_ms = (Utc::now() - executing.creation_time) | ||
| .num_milliseconds() | ||
| .max(0) as u64; | ||
|
|
||
| let was_translated = executing.translated_sql.is_some(); | ||
| let src_dialect = protocol.default_dialect(); | ||
| let ctx = QueryContext { | ||
| query_id: executing.id.clone(), | ||
| sql: executing | ||
| .translated_sql | ||
| .as_deref() | ||
| .unwrap_or(&executing.sql) | ||
| .to_string(), | ||
| session: session.clone(), | ||
| protocol, | ||
| group: executing.cluster_group.clone(), | ||
| cluster: executing.cluster_name.clone(), | ||
| cluster_group_config_id: executing.cluster_group_config_id, | ||
| cluster_config_id: executing.cluster_config_id, | ||
| engine_type: adapter.engine_type(), | ||
| src_dialect, | ||
| tgt_dialect: adapter.translation_target_dialect(), | ||
| was_translated, | ||
| translated_sql: if was_translated { | ||
| Some(executing.sql.clone()) | ||
| } else { | ||
| None | ||
| }, | ||
| query_tags: executing.query_tags.clone(), | ||
| query_params: vec![], | ||
| agent_context: executing.agent_context.clone(), | ||
| }; | ||
|
|
||
| let engine_stats = adapter.terminal_stats_from_body(body); | ||
| let (mut outcome, warn_msg) = trino_submit_terminal_outcome( | ||
| body, | ||
| elapsed_ms, | ||
| executing.backend_query_id.0.clone(), | ||
| engine_stats, | ||
| ); | ||
|
|
||
| let stored_actions: Vec<queryflux_core::query::GuardAction> = serde_json::from_value( | ||
| serde_json::Value::Array(executing.submitted_guard_actions.clone()), | ||
| ) | ||
| .unwrap_or_default(); | ||
| if !stored_actions.is_empty() { | ||
| outcome.guard_actions = stored_actions; | ||
| outcome.was_guard_blocked = executing.was_guard_blocked; | ||
| } | ||
|
|
||
| if let Some(msg) = warn_msg { | ||
| warn!(proxy_id = %executing.id, "{msg}"); | ||
| } | ||
|
|
||
| state | ||
| .metrics | ||
| .on_query_finished(&executing.cluster_group.0, &executing.cluster_name.0); | ||
| state.record_query(&ctx, outcome); | ||
| let _ = cluster_manager | ||
| .release_cluster(&executing.cluster_group, &executing.cluster_name) | ||
| .await; | ||
| let _ = state.persistence.delete(&executing.backend_query_id).await; |
There was a problem hiding this comment.
Immediate Trino terminalization drops the new outcome metadata.
finalize_trino_async_terminal_on_submit is the only terminal path when POST /v1/statement finishes without a nextUri, but it records the outcome with routing_trace: None and queue_duration_ms: 0. That means routed/dequeued queries lose the exact fields this PR is adding on the fast-path submit completion case. Thread those values in from dispatch_query (or persist them on ExecutingQuery) before calling record_query.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/queryflux-frontend/src/trino_http/trino_dispatch.rs` around lines 127
- 197, finalize_trino_async_terminal_on_submit currently records outcomes with
routing_trace = None and queue_duration_ms = 0, losing routing/dequeue metadata
added on the fast-path submit; propagate those values into the outcome before
calling state.record_query. Modify dispatch_query (or add fields on
ExecutingQuery) so the routing_trace and queue_duration_ms computed at dispatch
are stored on ExecutingQuery (e.g., new fields routing_trace: Option<_> and
queue_duration_ms: u64) or passed into finalize_trino_async_terminal_on_submit,
then read those values and set outcome.routing_trace and
outcome.queue_duration_ms accordingly before state.record_query(&ctx, outcome)
in finalize_trino_async_terminal_on_submit.
| members: cfg.members.clone(), | ||
| max_running_queries: cfg.max_running_queries as i64, | ||
| max_queued_queries: cfg.max_queued_queries.map(|v| v as i64), | ||
| queue_timeout_ms: cfg.queue_timeout_ms.map(|v| v as i64), |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify current unsafe casts and absence/presence of checked conversion for queue timeout paths.
rg -n 'queue_timeout_ms: .* as (i64|u64)' crates/queryflux-persistence/src/cluster_config.rs -C2
rg -n 'queue_timeout_ms: .*try_from' crates/queryflux-persistence/src/cluster_config.rs -C2Repository: lakeops-org/queryflux
Length of output: 759
Avoid lossy signed/unsigned casts for queue_timeout_ms.
crates/queryflux-persistence/src/cluster_config.rs uses unchecked casts that can overflow/wrap and silently corrupt timeout semantics:
queue_timeout_ms: cfg.queue_timeout_ms.map(|v| v as i64),(line 244)queue_timeout_ms: self.queue_timeout_ms.map(|v| v as u64),(line 290)
Replace these with checked conversions (i64::try_from / u64::try_from) and handle failures explicitly (no as).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/queryflux-persistence/src/cluster_config.rs` at line 244, The current
unchecked casts for queue_timeout_ms (in the code that sets queue_timeout_ms:
cfg.queue_timeout_ms.map(|v| v as i64) and the reciprocal
self.queue_timeout_ms.map(|v| v as u64)) can overflow; replace both with checked
conversions using i64::try_from(...) and u64::try_from(...), propagate or return
an explicit error when conversion fails (do not use as), and update the
surrounding functions/constructors (the places building the cluster config and
the reverse conversion) to handle the Result/Option properly so overflow is
detected and reported instead of silently wrapping.
| ALTER TABLE cluster_group_configs | ||
| ADD COLUMN queue_timeout_ms BIGINT; |
There was a problem hiding this comment.
Enforce non-negative queue_timeout_ms at the database boundary.
queue_timeout_ms is modeled as unsigned in core config, but this column currently accepts negative values. Add a CHECK constraint so invalid values cannot be persisted.
Suggested migration update
ALTER TABLE cluster_group_configs
- ADD COLUMN queue_timeout_ms BIGINT;
+ ADD COLUMN queue_timeout_ms BIGINT;
+
+ALTER TABLE cluster_group_configs
+ ADD CONSTRAINT cluster_group_configs_queue_timeout_ms_non_negative
+ CHECK (queue_timeout_ms IS NULL OR queue_timeout_ms >= 0);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ALTER TABLE cluster_group_configs | |
| ADD COLUMN queue_timeout_ms BIGINT; | |
| ALTER TABLE cluster_group_configs | |
| ADD COLUMN queue_timeout_ms BIGINT; | |
| ALTER TABLE cluster_group_configs | |
| ADD CONSTRAINT cluster_group_configs_queue_timeout_ms_non_negative | |
| CHECK (queue_timeout_ms IS NULL OR queue_timeout_ms >= 0); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql`
around lines 3 - 4, The migration currently adds queue_timeout_ms to
cluster_group_configs but allows negative values; update the migration to
enforce non-negative values by first neutralizing existing negatives (e.g.,
UPDATE cluster_group_configs SET queue_timeout_ms = 0 WHERE queue_timeout_ms <
0;) and then add an explicit CHECK constraint (e.g., ADD CONSTRAINT
chk_cluster_group_configs_queue_timeout_ms_nonneg CHECK (queue_timeout_ms >= 0))
so the database rejects future negative values for column queue_timeout_ms on
table cluster_group_configs.
| let qTimeout: number | null = null; | ||
| const qt = queueTimeout.trim(); | ||
| if (qt !== "") { | ||
| const n = parseInt(qt, 10); | ||
| if (!Number.isFinite(n) || n < 1) { | ||
| setError("Queue timeout must be empty or a positive integer (ms)."); | ||
| return; | ||
| } | ||
| qTimeout = n; | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify the current parser and demonstrate parseInt truncation behavior.
rg -n -C2 'const n = parseInt\(qt, 10\)|Queue timeout must be empty or a positive integer' queryflux-studio/components/group-form-dialog.tsx
node -e 'console.log("parseInt(1.5) =", parseInt("1.5",10), "; parseInt(1e3) =", parseInt("1e3",10))'Repository: lakeops-org/queryflux
Length of output: 365
Fix queue-timeout validation to reject non-integers (avoid parseInt truncation)
parseInt(qt, 10) truncates inputs like "1.5" and "1e3" to 1, so non-integer values can pass the “positive integer” requirement in queryflux-studio/components/group-form-dialog.tsx (around lines 212-221).
Suggested fix
- if (qt !== "") {
- const n = parseInt(qt, 10);
- if (!Number.isFinite(n) || n < 1) {
+ if (qt !== "") {
+ const n = Number(qt);
+ if (!Number.isInteger(n) || n < 1) {
setError("Queue timeout must be empty or a positive integer (ms).");
return;
}
qTimeout = n;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let qTimeout: number | null = null; | |
| const qt = queueTimeout.trim(); | |
| if (qt !== "") { | |
| const n = parseInt(qt, 10); | |
| if (!Number.isFinite(n) || n < 1) { | |
| setError("Queue timeout must be empty or a positive integer (ms)."); | |
| return; | |
| } | |
| qTimeout = n; | |
| } | |
| let qTimeout: number | null = null; | |
| const qt = queueTimeout.trim(); | |
| if (qt !== "") { | |
| const n = Number(qt); | |
| if (!Number.isInteger(n) || n < 1) { | |
| setError("Queue timeout must be empty or a positive integer (ms)."); | |
| return; | |
| } | |
| qTimeout = n; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@queryflux-studio/components/group-form-dialog.tsx` around lines 212 - 221,
The validation for queue timeout (variables queueTimeout/qt and qTimeout) uses
parseInt which truncates values like "1.5" or "1e3"; change the check to reject
any non-integer strings by parsing with Number (or using a strict /^\d+$/ test)
and then using Number.isInteger on the parsed value and ensuring it's >= 1
before assigning qTimeout; update the code path that currently calls parseInt
and the setError message (via setError) so inputs like "1.5", "1e3", "-2", or
non-numeric strings are rejected as invalid.
User description
Summary
Test plan
cargo test --workspace --exclude queryflux-translationcargo clippy --workspace --exclude queryflux-translation --all-targets -- -D warningscargo test -p queryflux-persistence queue_timeoutnpm --prefix queryflux-studio exec -- tsc -p queryflux-studio/tsconfig.json --noEmitnpm run lintinqueryflux-studioNote:
queryflux-translationtests are excluded because they require a local Python/sqlglot environment.Generated description
Below is a concise technical summary of the changes proposed in this PR:
Track wire-protocol queue lifecycles by centralizing routing/translation helpers (
resolve_route,route_and_execute) so every frontend flow recordsqueue_duration_ms, guard actions, and cancellation metadata onQueryOutcome. Harmonize per-groupqueue_timeout_msfrom config through persistence, live state, and Studio forms so admins can limit how long sync protocols wait for cluster slots.queue_timeout_msfrom cluster config through persistence stores, live state, and Studio admin UI so wire protocols can cap how long they wait for cluster slots while tests and harnesses carry the new metadata.Modified files (13)
Latest Contributors(0)
translate_and_prepare,resolve_route, androute_and_execute, carryrouting_trace/queue_duration_ms, reuse the sharedGuardAction, and guard against client cancellation before recording query outcomes.Modified files (12)
Latest Contributors(0)
GuardActionownership intoqueryflux-core, re-export it via metrics/persistence, and update guardrails so telemetry/metrics layers no longer depend onqueryflux-persistence.Modified files (8)
Latest Contributors(0)
Summary by CodeRabbit
New Features
nullmeans unlimited.User Interface