Skip to content

feat: improve query lifecycle queueing and admin configuration#80

Open
amitgilad3 wants to merge 1 commit into
mainfrom
feat/query-lifecycle-queueing
Open

feat: improve query lifecycle queueing and admin configuration#80
amitgilad3 wants to merge 1 commit into
mainfrom
feat/query-lifecycle-queueing

Conversation

@amitgilad3

@amitgilad3 amitgilad3 commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

User description

Summary

  • Track query queue duration through persisted query outcomes and history.
  • Add configurable wire-protocol queue timeouts, including admin API, DB migration, and Studio UI support.
  • Refactor Trino dispatch/query lifecycle handling and document follow-up tasks for distributed queueing and custom group routing.

Test plan

  • cargo test --workspace --exclude queryflux-translation
  • cargo clippy --workspace --exclude queryflux-translation --all-targets -- -D warnings
  • cargo test -p queryflux-persistence queue_timeout
  • npm --prefix queryflux-studio exec -- tsc -p queryflux-studio/tsconfig.json --noEmit
  • npm run lint in queryflux-studio

Note: queryflux-translation tests are excluded because they require a local Python/sqlglot environment.


Generated description

Below is a concise technical summary of the changes proposed in this PR:
Track wire-protocol queue lifecycles by centralizing routing/translation helpers (resolve_route, route_and_execute) so every frontend flow records queue_duration_ms, guard actions, and cancellation metadata on QueryOutcome. Harmonize per-group queue_timeout_ms from config through persistence, live state, and Studio forms so admins can limit how long sync protocols wait for cluster slots.

TopicDetails
Queue timeout config Expose per-group queue_timeout_ms from cluster config through persistence stores, live state, and Studio admin UI so wire protocols can cap how long they wait for cluster slots while tests and harnesses carry the new metadata.
Modified files (13)
  • crates/queryflux-core/src/config.rs
  • crates/queryflux-e2e-tests/src/harness.rs
  • crates/queryflux-frontend/src/snowflake/tests.rs
  • crates/queryflux-frontend/src/state.rs
  • crates/queryflux-persistence/src/cluster_config.rs
  • crates/queryflux-persistence/src/in_memory.rs
  • crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql
  • crates/queryflux-persistence/src/postgres/mod.rs
  • crates/queryflux/src/main.rs
  • queryflux-studio/components/group-form-dialog.tsx
  • queryflux-studio/components/groups-config-panel.tsx
  • queryflux-studio/lib/api-types.ts
  • queryflux-studio/lib/group-config-helpers.ts
Latest Contributors(0)
UserCommitDate
Dispatch lifecycle Unify dispatch helpers so all frontends reuse translate_and_prepare, resolve_route, and route_and_execute, carry routing_trace/queue_duration_ms, reuse the shared GuardAction, and guard against client cancellation before recording query outcomes.
Modified files (12)
  • crates/queryflux-frontend/src/dispatch.rs
  • crates/queryflux-frontend/src/flight_sql/mod.rs
  • crates/queryflux-frontend/src/mysql_wire/mod.rs
  • crates/queryflux-frontend/src/postgres_wire/mod.rs
  • crates/queryflux-frontend/src/snowflake/http/handlers/query.rs
  • crates/queryflux-frontend/src/snowflake/http/handlers/session.rs
  • crates/queryflux-frontend/src/snowflake/sql_api/handlers.rs
  • crates/queryflux-frontend/src/snowflake/tests.rs
  • crates/queryflux-frontend/src/state.rs
  • crates/queryflux-frontend/src/trino_http/handlers.rs
  • crates/queryflux-frontend/src/trino_http/mod.rs
  • crates/queryflux-frontend/src/trino_http/trino_dispatch.rs
Latest Contributors(0)
UserCommitDate
Guard action model Move GuardAction ownership into queryflux-core, re-export it via metrics/persistence, and update guardrails so telemetry/metrics layers no longer depend on queryflux-persistence.
Modified files (8)
  • Cargo.lock
  • crates/queryflux-core/src/query.rs
  • crates/queryflux-guardrails/Cargo.toml
  • crates/queryflux-guardrails/src/chain.rs
  • crates/queryflux-guardrails/src/lib.rs
  • crates/queryflux-metrics/src/lib.rs
  • crates/queryflux-persistence/src/lib.rs
  • crates/queryflux-persistence/src/metrics_store.rs
Latest Contributors(0)
UserCommitDate
Review this PR on Baz | Customize your next review

Summary by CodeRabbit

  • New Features

    • Added configurable queue timeout per cluster group to limit how long wire-protocol queries wait for available cluster slots; null means unlimited.
    • Introduced queue duration tracking to measure how long queries wait for cluster resources.
  • User Interface

    • Studio now supports viewing and configuring queue timeout settings for each cluster group.

…tocol queue timeouts, and expose queue timeout settings through the admin API and Studio UI.

Also includes query lifecycle cleanup, persistence wiring, focused queue-timeout tests, and backlog documentation for distributed queueing and routing work.
@coderabbitai

coderabbitai Bot commented Jun 10, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

This PR implements configurable queue wait timeouts for cluster groups and refactors the query dispatch system to centralize routing, translation, and parameter handling. It moves GuardAction to core types, extracts Trino HTTP dispatch logic into a dedicated module, and updates all wire protocol frontends to use unified helpers.

Changes

Queue timeout feature and dispatch refactoring

Layer / File(s) Summary
Core types: GuardAction migration and queue timeout config
crates/queryflux-core/src/query.rs, crates/queryflux-core/src/config.rs, crates/queryflux-guardrails/src/chain.rs, crates/queryflux-guardrails/src/lib.rs, crates/queryflux-metrics/src/lib.rs, crates/queryflux-persistence/src/metrics_store.rs, crates/queryflux-persistence/src/lib.rs
Moves GuardAction from persistence to core types with metadata fields, adds queue_timeout_ms: Option<u64> to ClusterGroupConfig, and updates imports across the codebase.
State and outcome models
crates/queryflux-frontend/src/state.rs
LiveConfig gains group_queue_timeouts map, QueryOutcome adds queue_duration_ms, and QueryContext becomes Clone-able. Record query now uses actual queue duration instead of constant zero.
Dispatch refactoring: routing, translation, parameter prep helpers
crates/queryflux-frontend/src/dispatch.rs
Introduces shared helpers for live-config snapshotting, centralized SQL translation with conditional parameter interpolation, and public resolve_route function. Adds route_and_execute entrypoint combining routing and sink execution. Updates dispatch_query to accept routing_trace and use centralized translation.
Sync execution: queue timeout loop, slot/cancellation guards
crates/queryflux-frontend/src/dispatch.rs
Adds explicit queue timeout handling in setup_sync_query with exponential backoff. Introduces ClusterSlotGuard for proper resource cleanup and CancellationGuard RAII type that records Cancelled outcome when sync futures drop before completion.
Trino HTTP: extract dispatch module
crates/queryflux-frontend/src/trino_http/trino_dispatch.rs, crates/queryflux-frontend/src/trino_http/mod.rs
New module with rewrite_trino_uri, trino_submit_terminal_outcome, and finalize_trino_async_terminal_on_submit functions handling Trino-specific dispatch logic, response parsing, and resource cleanup.
Wire protocol frontends: route_and_execute adoption
crates/queryflux-frontend/src/flight_sql/mod.rs, crates/queryflux-frontend/src/mysql_wire/mod.rs, crates/queryflux-frontend/src/postgres_wire/mod.rs, crates/queryflux-frontend/src/snowflake/sql_api/handlers.rs
Flight SQL, MySQL, and Postgres wire protocols replace manual routing + execute_to_sink calls with unified route_and_execute helper. Snowflake SQL API similarly consolidates routing and execution.
Trino HTTP handlers: routing refactor and trace propagation
crates/queryflux-frontend/src/trino_http/handlers.rs
post_statement uses resolve_route for auth-aware routing and propagates routing_trace to dispatch/execution. Removes inline fallback group logic. Updates get_queued_statement and get_executing_statement to use core GuardAction and include queue_duration_ms in outcome records.
execute_to_sink updates: routing trace and queue duration
crates/queryflux-frontend/src/dispatch.rs, crates/queryflux-frontend/src/snowflake/http/handlers/query.rs, crates/queryflux-frontend/src/snowflake/http/handlers/session.rs
Updates execute_to_sink signature to accept routing_trace parameter, switches guard accumulator to core GuardAction, and includes routing_trace and queue_duration_ms in final outcome records. Introduces proper CancellationGuard lifecycle management.
Persistence: database schema and ORM
crates/queryflux-persistence/src/cluster_config.rs, crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql, crates/queryflux-persistence/src/postgres/mod.rs
Adds queue_timeout_ms field to ClusterGroupConfigRecord and UpsertClusterGroupConfig. Implements from_core and to_core conversions. Adds Postgres migration and updates SELECT/upsert SQL to persist/retrieve timeout values.
Persistence: in-memory implementation and tests
crates/queryflux-persistence/src/in_memory.rs
In-memory persistence now propagates queue_timeout_ms and includes tests verifying round-trip storage/retrieval.
Server initialization: startup and hot-reload
crates/queryflux/src/main.rs
Main.rs builds group_queue_timeouts HashMap from cluster groups' queue_timeout_ms at startup and during hot-reloads. Ensures queue-timeout settings stay synced with configuration.
Test harness initialization
crates/queryflux-e2e-tests/src/harness.rs, crates/queryflux-frontend/src/snowflake/tests.rs
Test environments initialize LiveConfig.group_queue_timeouts with empty HashMap.
Studio admin UI: queue timeout configuration
queryflux-studio/lib/api-types.ts, queryflux-studio/components/group-form-dialog.tsx, queryflux-studio/components/groups-config-panel.tsx, queryflux-studio/lib/group-config-helpers.ts
TypeScript types add queueTimeoutMs field. GroupFormDialog adds input for queue timeout with validation. GroupsConfigPanel displays timeout column with ms formatting. Helpers normalize and convert queueTimeoutMs between camelCase and snake_case.

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • lakeops-org/queryflux#53: Both PRs modify crates/queryflux-frontend/src/dispatch.rs around the execute_to_sink flow and query execution paths, so the changes are directly related at the code level.

Suggested reviewers

  • jonisar

Poem

A rabbit hops through queues so fine,
Timeout defaults to "wait in line,"
Guards now live in core so bright,
Routes and traces flow just right! 🐰✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main changes: adding queue timeout configuration and tracking, plus related lifecycle improvements. It is concise, clear, and directly related to the substantial changeset.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/query-lifecycle-queueing

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends QueryFlux’s query lifecycle tracking and configuration by adding per-cluster-group queue timeout support (wired/sync execution paths) and improving audit/metrics fidelity, while also refactoring routing/dispatch plumbing and moving GuardAction to queryflux-core for reuse.

Changes:

  • Add per-cluster-group queueTimeoutMs configuration end-to-end (DB migration + persistence + admin UI/types + live config wiring).
  • Refactor frontend routing/dispatch to centralize route resolution and propagate routing trace through execution paths.
  • Introduce cancellation/slot-leak safety improvements for sync execution and re-home GuardAction into queryflux-core.

Reviewed changes

Copilot reviewed 30 out of 31 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
queryflux-studio/lib/group-config-helpers.ts Parse/serialize new queueTimeoutMs field for group upserts.
queryflux-studio/lib/api-types.ts Add queueTimeoutMs to API types with docs.
queryflux-studio/components/groups-config-panel.tsx Display group queue timeout in the groups table.
queryflux-studio/components/group-form-dialog.tsx Add form field + validation for queue timeout.
crates/queryflux/src/main.rs Populate live config map for per-group queue timeouts.
crates/queryflux-persistence/src/postgres/mod.rs Read/write queue_timeout_ms in group config queries and upsert.
crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql Add queue_timeout_ms column to cluster_group_configs.
crates/queryflux-persistence/src/metrics_store.rs Remove local GuardAction definition (now in core).
crates/queryflux-persistence/src/lib.rs Re-export GuardAction from core for backwards compatibility.
crates/queryflux-persistence/src/in_memory.rs Persist/roundtrip queue_timeout_ms in in-memory store + test.
crates/queryflux-persistence/src/cluster_config.rs Add queue_timeout_ms to persistence config structs + conversions + tests.
crates/queryflux-metrics/src/lib.rs Re-export GuardAction from core (keep public API stable).
crates/queryflux-guardrails/src/lib.rs Switch guard action type import to core.
crates/queryflux-guardrails/src/chain.rs Switch guard action type import to core.
crates/queryflux-guardrails/Cargo.toml Drop unnecessary dependency on persistence crate.
crates/queryflux-frontend/src/trino_http/trino_dispatch.rs New Trino dispatch helpers for URI rewrite + async terminal finalization.
crates/queryflux-frontend/src/trino_http/mod.rs Export new trino_dispatch module.
crates/queryflux-frontend/src/trino_http/handlers.rs Use centralized routing + pass routing trace into dispatch/execution.
crates/queryflux-frontend/src/state.rs Add group_queue_timeouts and queue_duration_ms to recorded outcomes.
crates/queryflux-frontend/src/snowflake/tests.rs Update test live config construction for new field.
crates/queryflux-frontend/src/snowflake/sql_api/handlers.rs Switch to route_and_execute helper.
crates/queryflux-frontend/src/snowflake/http/handlers/session.rs Use centralized resolve_route for login routing.
crates/queryflux-frontend/src/snowflake/http/handlers/query.rs Pass routing trace arg into execute_to_sink.
crates/queryflux-frontend/src/postgres_wire/mod.rs Switch to route_and_execute (central routing).
crates/queryflux-frontend/src/mysql_wire/mod.rs Switch to route_and_execute (central routing).
crates/queryflux-frontend/src/flight_sql/mod.rs Switch to route_and_execute (central routing).
crates/queryflux-frontend/src/dispatch.rs Centralize routing, add sync queue timeout loop, propagate routing trace, add cancellation/slot guards.
crates/queryflux-e2e-tests/src/harness.rs Update test live config construction for new field.
crates/queryflux-core/src/query.rs Move GuardAction into core query types.
crates/queryflux-core/src/config.rs Add queue_timeout_ms to cluster group config schema/docs.
Cargo.lock Remove unused dependency edge from guardrails.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +144 to +148
sql: executing
.translated_sql
.as_deref()
.unwrap_or(&executing.sql)
.to_string(),
Comment on lines +159 to +163
translated_sql: if was_translated {
Some(executing.sql.clone())
} else {
None
},
Comment on lines 1059 to 1063
backend_query_id: None,
status: QueryStatus::Failed,
queue_duration_ms: 0,
execution_ms: start.elapsed().as_millis() as u64,
rows: None,
Comment on lines +796 to +804
QueryOutcome {
backend_query_id: None,
status: QueryStatus::Cancelled,
queue_duration_ms: 0,
execution_ms: elapsed,
rows: None,
error: Some("Client disconnected".to_string()),
routing_trace: None,
engine_stats: None,
Comment on lines +558 to +562
/// Maximum time (ms) a wire-protocol query will wait for a free cluster slot before
/// returning an error. `None` → wait indefinitely (legacy behavior).
/// Only affects MySQL wire, Postgres wire, FlightSQL, and Snowflake HTTP.
/// Trino HTTP uses its own persistent queue with client-driven polling.
#[serde(default)]
Comment on lines +799 to +803
queue_duration_ms: 0,
execution_ms: elapsed,
rows: None,
error: Some("Client disconnected".to_string()),
routing_trace: None,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CancellationGuard records Cancelled with hardcoded queue_duration_ms: 0 and routing_trace: None; can we thread setup.queue_duration_ms and routing_trace into it when constructed?

Severity

Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-frontend/src/dispatch.rs around lines 759-812, update the
CancellationGuard struct/impl so that its Drop uses the real queue and routing
provenance instead of hardcoded values. Specifically, when CancellationGuard is
constructed in execute_to_sink (around lines 1475-1479), pass setup.queue_duration_ms
and routing_trace into CancellationGuard, store them on the guard, and then use them in
the QueryOutcome fields queue_duration_ms and routing_trace during Drop. Ensure
routing_trace is cloned (or otherwise owned) so it remains valid inside the tokio::spawn
closure.

Heads up!

Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription

}
}

queued_backoff_delay(seq).await;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue_timeout_ms can be exceeded by queued_backoff_delay(seq) since the timeout check runs before the sleep; can we cap it to the remaining time or re-check right after, so sync wire clients fail on time?

Severity

Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-frontend/src/dispatch.rs around lines 936-994, inside
`setup_sync_query`’s cluster-slot acquisition loop, the timeout check happens before
`queued_backoff_delay(seq).await`, so even a tiny `queue_timeout_ms` can exceed the
limit by one full backoff sleep. Refactor the loop to cap the backoff sleep by the
remaining timeout (or skip/sleep-until using a deadline), and re-check
`queue_start.elapsed()` immediately after sleeping before continuing the loop. Ensure
that when the timeout is exceeded, the function returns the same timeout error without
waiting for the entire backoff tick.

Heads up!

Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines 1058 to 1062
QueryOutcome {
backend_query_id: None,
status: QueryStatus::Failed,
queue_duration_ms: 0,
execution_ms: start.elapsed().as_millis() as u64,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue_duration_ms is already measured above, but the translation-failure path still records 0, so queued queries that fail translation lose their wait time in query_history; should we pass the measured queue_duration_ms into that failure outcome?

Severity

Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-frontend/src/dispatch.rs around lines 1056-1072 inside `async fn
setup_sync_query`’s `translate_and_prepare(...).await` Err(e) branch, the
`QueryOutcome` being recorded sets `queue_duration_ms: 0` despite `queue_duration_ms`
being computed earlier after the cluster slot acquisition loop. Change this failure path
to use the real measured `queue_duration_ms` value (the one derived from
`queue_start.elapsed()`) instead of 0, and make sure the variable is accessible in that
scope. This ensures translation-failed queries that queued still persist the correct
wait time in query_history.

Heads up!

Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines +1475 to +1478
// Cancellation safety: if the future is dropped from here on (client disconnect),
// the guard records the query as Cancelled so it appears in query history.
// Disarmed below after the normal-path record_query call.
let mut cancel_guard = CancellationGuard::new(state.clone(), setup.ctx.clone(), setup.start);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move CancellationGuard before setup_sync_query(...).await, otherwise disconnects during setup can drop the future before the Cancelled history row is written?

Severity

Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-frontend/src/dispatch.rs around lines 1350-1506 (the execute_to_sink
function), the CancellationGuard is currently created after setup_sync_query(...).await,
so client disconnects during the sync queue loop or translation/preparation won’t be
recorded as QueryStatus::Cancelled. Refactor so cancellation tracking starts before
awaiting setup_sync_query (wrap the entire setup+execution future in an RAII/drop guard,
or construct the guard earlier and let it take ownership of recording once QueryContext
is available). Ensure the guard is disarmed on the normal success path after
state.record_query, and on setup_sync_query error/early returns make sure the guard
doesn’t incorrectly double-record or miss queue_duration_ms (record a Cancelled entry
with whatever fields are known, or postpone recording until ctx is constructed but still
keep the guard alive across the await).

Heads up!

Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription

Comment on lines 242 to 245
max_running_queries: cfg.max_running_queries as i64,
max_queued_queries: cfg.max_queued_queries.map(|v| v as i64),
queue_timeout_ms: cfg.queue_timeout_ms.map(|v| v as i64),
strategy,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue_timeout_ms is cast with as i64 before the upsert and as u64 on reload, so values above i64::MAX or negative DB values can wrap; should we reject out-of-range queueTimeoutMs with a checked conversion before writing to Postgres?

Severity

Want Baz to fix this for you? Activate Fixer

Other fix methods

Fix in Cursor

Prompt for AI Agents
Before applying, verify this suggestion against the current code. In
crates/queryflux-persistence/src/cluster_config.rs around lines 230-250, inside
UpsertClusterGroupConfig::from_core, the assignment queue_timeout_ms:
cfg.queue_timeout_ms.map(|v| v as i64) performs an unchecked cast that can overflow or
wrap. Refactor this to use a checked conversion (or explicit validation) before building
the upsert payload: if the value doesn’t fit i64, reject the conversion (e.g., return
an error) or log and set it to None per the project’s error-handling conventions.
Also, in crates/queryflux-persistence/src/cluster_config.rs around lines 264-297, inside
ClusterGroupConfigRecord::to_core, replace queue_timeout_ms:
self.queue_timeout_ms.map(|v| v as u64) with a safe conversion that handles negative DB
values (log + None or error) to prevent silent wrapping back into u64. Add/extend tests
around the existing queue_timeout_ms tests (lines ~418-436) to cover an out-of-range
value (above i64::MAX) and a negative DB value.

Heads up!

Your free trial ends in 3 days.
To keep getting your PRs reviewed by Baz, update your team's subscription

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
crates/queryflux-frontend/src/dispatch.rs (1)

521-643: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Sync dispatch via Trino HTTP still drops cancelled requests from history.

This branch only uses ClusterSlotGuard. If the axum request future is dropped during execute_as_arrow, the slot is released, but no terminal record_query call happens. execute_to_sink already fixed the same problem with CancellationGuard; the sync path here needs the same protection or cancelled Trino-HTTP requests silently disappear from query history.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/queryflux-frontend/src/dispatch.rs` around lines 521 - 643, The sync
Trino-HTTP path uses only ClusterSlotGuard so if the axum request future is
dropped during sync_adapter.execute_as_arrow the slot is released but
record_query is never called; wrap the sync dispatch (before calling
execute_as_arrow) with the same cancellation protection used by execute_to_sink
(i.e. create a CancellationGuard instance around the QueryContext/QueryOutcome)
so that a cancellation will run the terminal record_query logic, then drop the
CancellationGuard only after record_query and slot.release() complete; reference
ClusterSlotGuard, execute_as_arrow, CancellationGuard, execute_to_sink,
record_query and slot.release() when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@crates/queryflux-frontend/src/dispatch.rs`:
- Around line 788-809: The CancellationGuard drop currently writes a terminal
QueryOutcome with hardcoded queue_duration_ms: 0, routing_trace: None, and an
empty guard_actions list; update the drop task (impl Drop for CancellationGuard
-> drop) to preserve the measured metadata: pass the actual queue_duration_ms
that was computed earlier, include the resolved routing_trace produced by
execute_to_sink, and propagate any pre-execution guard actions and
was_guard_blocked flag from setup.guard_actions/guard state into QueryOutcome so
cancelled records retain the same queue/routing/guard metadata; apply the same
fix to the translation-failure helper-created records (the other block noted
around the 1056–1068 region) so they also use the measured queue_duration_ms,
routing_trace, and setup.guard_actions/was_guard_blocked when calling
state.record_query.

In `@crates/queryflux-frontend/src/trino_http/handlers.rs`:
- Around line 831-832: The final QueryOutcome currently hardcodes
queue_duration_ms: 0 in get_executing_statement (both success and failure
branches), which loses real queued time; update the QueryOutcome construction to
use the actual captured queue duration from the executing query
context/persistence record (e.g., use the executing query's stored queue
duration field or the persistence record returned to state.record_query) instead
of 0—locate the QueryOutcome creations in get_executing_statement and replace
queue_duration_ms: 0 with the appropriate value (for example
executing_query.queue_duration_ms or record.queue_duration_ms), preserving the
existing execution_ms/elapsed_ms handling.

In `@crates/queryflux-frontend/src/trino_http/trino_dispatch.rs`:
- Around line 127-197: finalize_trino_async_terminal_on_submit currently records
outcomes with routing_trace = None and queue_duration_ms = 0, losing
routing/dequeue metadata added on the fast-path submit; propagate those values
into the outcome before calling state.record_query. Modify dispatch_query (or
add fields on ExecutingQuery) so the routing_trace and queue_duration_ms
computed at dispatch are stored on ExecutingQuery (e.g., new fields
routing_trace: Option<_> and queue_duration_ms: u64) or passed into
finalize_trino_async_terminal_on_submit, then read those values and set
outcome.routing_trace and outcome.queue_duration_ms accordingly before
state.record_query(&ctx, outcome) in finalize_trino_async_terminal_on_submit.

In `@crates/queryflux-persistence/src/cluster_config.rs`:
- Line 244: The current unchecked casts for queue_timeout_ms (in the code that
sets queue_timeout_ms: cfg.queue_timeout_ms.map(|v| v as i64) and the reciprocal
self.queue_timeout_ms.map(|v| v as u64)) can overflow; replace both with checked
conversions using i64::try_from(...) and u64::try_from(...), propagate or return
an explicit error when conversion fails (do not use as), and update the
surrounding functions/constructors (the places building the cluster config and
the reverse conversion) to handle the Result/Option properly so overflow is
detected and reported instead of silently wrapping.

In
`@crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql`:
- Around line 3-4: The migration currently adds queue_timeout_ms to
cluster_group_configs but allows negative values; update the migration to
enforce non-negative values by first neutralizing existing negatives (e.g.,
UPDATE cluster_group_configs SET queue_timeout_ms = 0 WHERE queue_timeout_ms <
0;) and then add an explicit CHECK constraint (e.g., ADD CONSTRAINT
chk_cluster_group_configs_queue_timeout_ms_nonneg CHECK (queue_timeout_ms >= 0))
so the database rejects future negative values for column queue_timeout_ms on
table cluster_group_configs.

In `@queryflux-studio/components/group-form-dialog.tsx`:
- Around line 212-221: The validation for queue timeout (variables
queueTimeout/qt and qTimeout) uses parseInt which truncates values like "1.5" or
"1e3"; change the check to reject any non-integer strings by parsing with Number
(or using a strict /^\d+$/ test) and then using Number.isInteger on the parsed
value and ensuring it's >= 1 before assigning qTimeout; update the code path
that currently calls parseInt and the setError message (via setError) so inputs
like "1.5", "1e3", "-2", or non-numeric strings are rejected as invalid.

---

Outside diff comments:
In `@crates/queryflux-frontend/src/dispatch.rs`:
- Around line 521-643: The sync Trino-HTTP path uses only ClusterSlotGuard so if
the axum request future is dropped during sync_adapter.execute_as_arrow the slot
is released but record_query is never called; wrap the sync dispatch (before
calling execute_as_arrow) with the same cancellation protection used by
execute_to_sink (i.e. create a CancellationGuard instance around the
QueryContext/QueryOutcome) so that a cancellation will run the terminal
record_query logic, then drop the CancellationGuard only after record_query and
slot.release() complete; reference ClusterSlotGuard, execute_as_arrow,
CancellationGuard, execute_to_sink, record_query and slot.release() when making
the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 7da6194e-ee12-47ad-b9b6-b7c57feb2bdd

📥 Commits

Reviewing files that changed from the base of the PR and between 4a97a25 and d7b3054.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (30)
  • crates/queryflux-core/src/config.rs
  • crates/queryflux-core/src/query.rs
  • crates/queryflux-e2e-tests/src/harness.rs
  • crates/queryflux-frontend/src/dispatch.rs
  • crates/queryflux-frontend/src/flight_sql/mod.rs
  • crates/queryflux-frontend/src/mysql_wire/mod.rs
  • crates/queryflux-frontend/src/postgres_wire/mod.rs
  • crates/queryflux-frontend/src/snowflake/http/handlers/query.rs
  • crates/queryflux-frontend/src/snowflake/http/handlers/session.rs
  • crates/queryflux-frontend/src/snowflake/sql_api/handlers.rs
  • crates/queryflux-frontend/src/snowflake/tests.rs
  • crates/queryflux-frontend/src/state.rs
  • crates/queryflux-frontend/src/trino_http/handlers.rs
  • crates/queryflux-frontend/src/trino_http/mod.rs
  • crates/queryflux-frontend/src/trino_http/trino_dispatch.rs
  • crates/queryflux-guardrails/Cargo.toml
  • crates/queryflux-guardrails/src/chain.rs
  • crates/queryflux-guardrails/src/lib.rs
  • crates/queryflux-metrics/src/lib.rs
  • crates/queryflux-persistence/src/cluster_config.rs
  • crates/queryflux-persistence/src/in_memory.rs
  • crates/queryflux-persistence/src/lib.rs
  • crates/queryflux-persistence/src/metrics_store.rs
  • crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql
  • crates/queryflux-persistence/src/postgres/mod.rs
  • crates/queryflux/src/main.rs
  • queryflux-studio/components/group-form-dialog.tsx
  • queryflux-studio/components/groups-config-panel.tsx
  • queryflux-studio/lib/api-types.ts
  • queryflux-studio/lib/group-config-helpers.ts
💤 Files with no reviewable changes (1)
  • crates/queryflux-guardrails/Cargo.toml

Comment on lines +788 to +809
impl Drop for CancellationGuard {
fn drop(&mut self) {
if let Some(ctx) = self.ctx.take() {
let state = self.state.clone();
let elapsed = self.start.elapsed().as_millis() as u64;
tokio::spawn(async move {
state.record_query(
&ctx,
QueryOutcome {
backend_query_id: None,
status: QueryStatus::Cancelled,
queue_duration_ms: 0,
execution_ms: elapsed,
rows: None,
error: Some("Client disconnected".to_string()),
routing_trace: None,
engine_stats: None,
guard_actions: vec![],
was_guard_blocked: false,
},
);
});

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Preserve routing and queue metadata on helper-owned terminal records.

Both the translation-failure record and CancellationGuard hardcode queue_duration_ms: 0 / routing_trace: None, even though this code already measured queue_duration_ms and execute_to_sink already resolved a routing_trace. Cancelled/failed sync queries will therefore lose the new history fields on exactly the branches that should still be persisted. The cancel path also drops any pre-execution guard actions collected into setup.guard_actions.

Also applies to: 1056-1068

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/queryflux-frontend/src/dispatch.rs` around lines 788 - 809, The
CancellationGuard drop currently writes a terminal QueryOutcome with hardcoded
queue_duration_ms: 0, routing_trace: None, and an empty guard_actions list;
update the drop task (impl Drop for CancellationGuard -> drop) to preserve the
measured metadata: pass the actual queue_duration_ms that was computed earlier,
include the resolved routing_trace produced by execute_to_sink, and propagate
any pre-execution guard actions and was_guard_blocked flag from
setup.guard_actions/guard state into QueryOutcome so cancelled records retain
the same queue/routing/guard metadata; apply the same fix to the
translation-failure helper-created records (the other block noted around the
1056–1068 region) so they also use the measured queue_duration_ms,
routing_trace, and setup.guard_actions/was_guard_blocked when calling
state.record_query.

Comment on lines +831 to 832
queue_duration_ms: 0,
execution_ms: elapsed_ms,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Propagate actual queue wait in final outcomes instead of hardcoding zero.

Line 831 and Line 867 set queue_duration_ms: 0 for both success and failure in get_executing_statement. That value is persisted by state.record_query and surfaced as Trino queued_time_millis, so queued queries will be recorded/reported as if they never waited. Use the captured queue duration from the executing query context/persistence record when constructing QueryOutcome.

Also applies to: 867-868

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/queryflux-frontend/src/trino_http/handlers.rs` around lines 831 - 832,
The final QueryOutcome currently hardcodes queue_duration_ms: 0 in
get_executing_statement (both success and failure branches), which loses real
queued time; update the QueryOutcome construction to use the actual captured
queue duration from the executing query context/persistence record (e.g., use
the executing query's stored queue duration field or the persistence record
returned to state.record_query) instead of 0—locate the QueryOutcome creations
in get_executing_statement and replace queue_duration_ms: 0 with the appropriate
value (for example executing_query.queue_duration_ms or
record.queue_duration_ms), preserving the existing execution_ms/elapsed_ms
handling.

Comment on lines +127 to +197
pub(crate) async fn finalize_trino_async_terminal_on_submit(
state: &Arc<AppState>,
cluster_manager: &Arc<dyn ClusterGroupManager>,
executing: &ExecutingQuery,
adapter: &Arc<dyn AsyncAdapter>,
session: &SessionContext,
protocol: FrontendProtocol,
body: &Bytes,
) {
let elapsed_ms = (Utc::now() - executing.creation_time)
.num_milliseconds()
.max(0) as u64;

let was_translated = executing.translated_sql.is_some();
let src_dialect = protocol.default_dialect();
let ctx = QueryContext {
query_id: executing.id.clone(),
sql: executing
.translated_sql
.as_deref()
.unwrap_or(&executing.sql)
.to_string(),
session: session.clone(),
protocol,
group: executing.cluster_group.clone(),
cluster: executing.cluster_name.clone(),
cluster_group_config_id: executing.cluster_group_config_id,
cluster_config_id: executing.cluster_config_id,
engine_type: adapter.engine_type(),
src_dialect,
tgt_dialect: adapter.translation_target_dialect(),
was_translated,
translated_sql: if was_translated {
Some(executing.sql.clone())
} else {
None
},
query_tags: executing.query_tags.clone(),
query_params: vec![],
agent_context: executing.agent_context.clone(),
};

let engine_stats = adapter.terminal_stats_from_body(body);
let (mut outcome, warn_msg) = trino_submit_terminal_outcome(
body,
elapsed_ms,
executing.backend_query_id.0.clone(),
engine_stats,
);

let stored_actions: Vec<queryflux_core::query::GuardAction> = serde_json::from_value(
serde_json::Value::Array(executing.submitted_guard_actions.clone()),
)
.unwrap_or_default();
if !stored_actions.is_empty() {
outcome.guard_actions = stored_actions;
outcome.was_guard_blocked = executing.was_guard_blocked;
}

if let Some(msg) = warn_msg {
warn!(proxy_id = %executing.id, "{msg}");
}

state
.metrics
.on_query_finished(&executing.cluster_group.0, &executing.cluster_name.0);
state.record_query(&ctx, outcome);
let _ = cluster_manager
.release_cluster(&executing.cluster_group, &executing.cluster_name)
.await;
let _ = state.persistence.delete(&executing.backend_query_id).await;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Immediate Trino terminalization drops the new outcome metadata.

finalize_trino_async_terminal_on_submit is the only terminal path when POST /v1/statement finishes without a nextUri, but it records the outcome with routing_trace: None and queue_duration_ms: 0. That means routed/dequeued queries lose the exact fields this PR is adding on the fast-path submit completion case. Thread those values in from dispatch_query (or persist them on ExecutingQuery) before calling record_query.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/queryflux-frontend/src/trino_http/trino_dispatch.rs` around lines 127
- 197, finalize_trino_async_terminal_on_submit currently records outcomes with
routing_trace = None and queue_duration_ms = 0, losing routing/dequeue metadata
added on the fast-path submit; propagate those values into the outcome before
calling state.record_query. Modify dispatch_query (or add fields on
ExecutingQuery) so the routing_trace and queue_duration_ms computed at dispatch
are stored on ExecutingQuery (e.g., new fields routing_trace: Option<_> and
queue_duration_ms: u64) or passed into finalize_trino_async_terminal_on_submit,
then read those values and set outcome.routing_trace and
outcome.queue_duration_ms accordingly before state.record_query(&ctx, outcome)
in finalize_trino_async_terminal_on_submit.

members: cfg.members.clone(),
max_running_queries: cfg.max_running_queries as i64,
max_queued_queries: cfg.max_queued_queries.map(|v| v as i64),
queue_timeout_ms: cfg.queue_timeout_ms.map(|v| v as i64),

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify current unsafe casts and absence/presence of checked conversion for queue timeout paths.
rg -n 'queue_timeout_ms: .* as (i64|u64)' crates/queryflux-persistence/src/cluster_config.rs -C2
rg -n 'queue_timeout_ms: .*try_from' crates/queryflux-persistence/src/cluster_config.rs -C2

Repository: lakeops-org/queryflux

Length of output: 759


Avoid lossy signed/unsigned casts for queue_timeout_ms.

crates/queryflux-persistence/src/cluster_config.rs uses unchecked casts that can overflow/wrap and silently corrupt timeout semantics:

  • queue_timeout_ms: cfg.queue_timeout_ms.map(|v| v as i64), (line 244)
  • queue_timeout_ms: self.queue_timeout_ms.map(|v| v as u64), (line 290)

Replace these with checked conversions (i64::try_from / u64::try_from) and handle failures explicitly (no as).

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@crates/queryflux-persistence/src/cluster_config.rs` at line 244, The current
unchecked casts for queue_timeout_ms (in the code that sets queue_timeout_ms:
cfg.queue_timeout_ms.map(|v| v as i64) and the reciprocal
self.queue_timeout_ms.map(|v| v as u64)) can overflow; replace both with checked
conversions using i64::try_from(...) and u64::try_from(...), propagate or return
an explicit error when conversion fails (do not use as), and update the
surrounding functions/constructors (the places building the cluster config and
the reverse conversion) to handle the Result/Option properly so overflow is
detected and reported instead of silently wrapping.

Comment on lines +3 to +4
ALTER TABLE cluster_group_configs
ADD COLUMN queue_timeout_ms BIGINT;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Enforce non-negative queue_timeout_ms at the database boundary.

queue_timeout_ms is modeled as unsigned in core config, but this column currently accepts negative values. Add a CHECK constraint so invalid values cannot be persisted.

Suggested migration update
 ALTER TABLE cluster_group_configs
-    ADD COLUMN queue_timeout_ms BIGINT;
+    ADD COLUMN queue_timeout_ms BIGINT;
+
+ALTER TABLE cluster_group_configs
+    ADD CONSTRAINT cluster_group_configs_queue_timeout_ms_non_negative
+    CHECK (queue_timeout_ms IS NULL OR queue_timeout_ms >= 0);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ALTER TABLE cluster_group_configs
ADD COLUMN queue_timeout_ms BIGINT;
ALTER TABLE cluster_group_configs
ADD COLUMN queue_timeout_ms BIGINT;
ALTER TABLE cluster_group_configs
ADD CONSTRAINT cluster_group_configs_queue_timeout_ms_non_negative
CHECK (queue_timeout_ms IS NULL OR queue_timeout_ms >= 0);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@crates/queryflux-persistence/src/postgres/migrations/20260610000001_group_queue_timeout.sql`
around lines 3 - 4, The migration currently adds queue_timeout_ms to
cluster_group_configs but allows negative values; update the migration to
enforce non-negative values by first neutralizing existing negatives (e.g.,
UPDATE cluster_group_configs SET queue_timeout_ms = 0 WHERE queue_timeout_ms <
0;) and then add an explicit CHECK constraint (e.g., ADD CONSTRAINT
chk_cluster_group_configs_queue_timeout_ms_nonneg CHECK (queue_timeout_ms >= 0))
so the database rejects future negative values for column queue_timeout_ms on
table cluster_group_configs.

Comment on lines +212 to +221
let qTimeout: number | null = null;
const qt = queueTimeout.trim();
if (qt !== "") {
const n = parseInt(qt, 10);
if (!Number.isFinite(n) || n < 1) {
setError("Queue timeout must be empty or a positive integer (ms).");
return;
}
qTimeout = n;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the current parser and demonstrate parseInt truncation behavior.
rg -n -C2 'const n = parseInt\(qt, 10\)|Queue timeout must be empty or a positive integer' queryflux-studio/components/group-form-dialog.tsx
node -e 'console.log("parseInt(1.5) =", parseInt("1.5",10), "; parseInt(1e3) =", parseInt("1e3",10))'

Repository: lakeops-org/queryflux

Length of output: 365


Fix queue-timeout validation to reject non-integers (avoid parseInt truncation)

parseInt(qt, 10) truncates inputs like "1.5" and "1e3" to 1, so non-integer values can pass the “positive integer” requirement in queryflux-studio/components/group-form-dialog.tsx (around lines 212-221).

Suggested fix
-    if (qt !== "") {
-      const n = parseInt(qt, 10);
-      if (!Number.isFinite(n) || n < 1) {
+    if (qt !== "") {
+      const n = Number(qt);
+      if (!Number.isInteger(n) || n < 1) {
         setError("Queue timeout must be empty or a positive integer (ms).");
         return;
       }
       qTimeout = n;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let qTimeout: number | null = null;
const qt = queueTimeout.trim();
if (qt !== "") {
const n = parseInt(qt, 10);
if (!Number.isFinite(n) || n < 1) {
setError("Queue timeout must be empty or a positive integer (ms).");
return;
}
qTimeout = n;
}
let qTimeout: number | null = null;
const qt = queueTimeout.trim();
if (qt !== "") {
const n = Number(qt);
if (!Number.isInteger(n) || n < 1) {
setError("Queue timeout must be empty or a positive integer (ms).");
return;
}
qTimeout = n;
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@queryflux-studio/components/group-form-dialog.tsx` around lines 212 - 221,
The validation for queue timeout (variables queueTimeout/qt and qTimeout) uses
parseInt which truncates values like "1.5" or "1e3"; change the check to reject
any non-integer strings by parsing with Number (or using a strict /^\d+$/ test)
and then using Number.isInteger on the parsed value and ensuring it's >= 1
before assigning qTimeout; update the code path that currently calls parseInt
and the setError message (via setError) so inputs like "1.5", "1e3", "-2", or
non-numeric strings are rejected as invalid.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants