Streaming join key-domain pushdown: prune left backfill by a snapshot of right-side join keys#1187
Conversation
…in keys Add an opt-in optimization for streaming INNER LATEST JOIN whose left side backfills from the historical store. At query start a bounded snapshot of the right side's join-key domain is captured and pushed into the left backfill scan as a storage filter, primary-key mark pruning, and optional shard pruning. A two-boundary gating protocol inside the join keeps results exactly equivalent to the unoptimized execution; live (post-backfill) reads are never pruned, so right-side keys arriving after query start are matched normally. Composite join keys are supported (the key domain is tracked as key tuples), including composite sharding expressions such as city_hash64(key_a, key_b). Floating-point join keys are rejected before pushdown engages and degrade to the unoptimized plan. All behavior is gated behind default-off settings: - enable_streaming_join_key_domain_pushdown - enable_streaming_join_key_domain_shard_pruning - streaming_join_key_domain_pushdown_max_rows / _max_bytes - streaming_join_key_domain_pushdown_disjunctive_filter_max_rows One checkpointed component (WatermarkStamper) gains a version-gated field; old checkpoints recover cleanly. Also fixes a pre-existing ConcurrentHashJoin::joinLeftBlock bug that dispatched left probe blocks by right-side key column positions, routing rows to the wrong shard when left/right key ordinals differed in a parallel_hash join. Adds smoke suite tests/stream/test_stream_smoke/0035_streaming_join_key_domain_pushdown.yaml (24 cases).
Bring the branch up to date with develop (11 commits) and resolve conflicts: - cmake/autogenerated_versions.txt: bump VERSION_REVISION to 159. Upstream released 3.0.22 [157] and 3.0.23 [158] while this branch was open, so 157 is no longer free; keep upstream's 3.0.23 patch line. - WatermarkStamper: move the checkpoint gate PENDING_UNMUTE_WATERMARK_MIN_VERSION to 159 in lockstep, so the new field is introduced strictly above the highest shipped revision and checkpoints from 3.0.22/3.0.23 recover correctly as "no field". - StorageStream.cpp / StreamShardStore.cpp: keep both sides' includes and ErrorCodes declarations. The key-domain join shard pruning coexists with upstream's IN-subquery / literal tuple-IN shard pruning (timeplus-io#1175); the two are complementary mechanisms. Verified the three merged translation units compile.
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1702f00fd9
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
Some suggestions:
|
A broad clang-format pass had reformatted code this PR does not change — reflowed throw/exception calls, signatures collapsed onto one line, re-sorted #include blocks, and brace-placement changes — adding noise to the diff in ProfileEvents, QueryPipelineBuilder, ExpressionAnalyzer, and StorageStream. Restore develop's formatting on all unchanged lines so the diff shows only genuine changes. Each reverted hunk is whitespace-only (token-identical before/after the revert) and the four files still compile; no behavior change.
Avoid formatting unchanged code:Good catch, fixed in d8d6606. One note for transparency: some of develop's existing formatting is itself inconsistent, for example the doc-comment block in |
Reduce the optimization's footprint inside existing functions by moving the inlined logic into clearly-named helpers, leaving a single guarded call behind (addresses review feedback to keep the optimization as independent as possible). Behaviour-preserving — pure relocation, full build verified. - QueryPipelineBuilder::joinPipelinesStreaming: the 56-line right-boundary source matching block -> matchRightSourcesForStreamingJoinBoundary(). - StorageStream::getShardsToRead: the shard-pruning block -> updateShardsWithStreamingJoinKeyDomainPruning(). - MergeTreeDataSelectExecutor: per-part PK mark filtering -> applyStreamingJoinPrimaryKeyPrefixFilter(). - StreamShardStore::readConcat: bounded snapshot-prefix plan construction -> StreamShardStore::buildBoundedPrefixPlanForSnapshotBoundary().
The optimized code should be as independent as possible, reduce changes to existing code branchesAgreed, done in 135e052. I pulled the optimization logic that was inlined into existing functions out into clearly-named helpers, so each existing function now shows only a single guarded call where the feature hooks in. It's a pure relocation (behaviour-preserving), and the full What moved out of existing functions:
A few hooks are inherently interleaved with existing control flow and can't be cleanly extracted without a redesign: the boundary state machine in Happy to revisit any specific spot if you'd like it handled differently. |
Summary
This PR adds an opt-in query optimization for streaming
INNER LATEST JOINs whose left side backfills from the historical store: at query start, a bounded snapshot of the right side's join-key domain is captured and pushed down into the left side's historical backfill scan as a key filter — enabling storage-level filtering, primary-key mark-range pruning, and (optionally) shard pruning. A boundary-gating protocol inside the join keeps results exactly equivalent to the unoptimized execution. Live (post-backfill) streaming reads are never pruned, so new right-side keys arriving after query start are matched normally going forward.Everything is gated behind new settings that default off; with the settings off, planning and execution are unchanged.
Motivation
A common streaming pattern is a fact stream joined through a mapping (pivot) stream:
partition_key), andentity_key → partition_key, one-to-many, with new mappings arriving over time)."Give me all facts for one entity, as a live stream" then requires resolving the entity through the mapping stream. There are two natural ways to write this today, and both have problems:
Option A —
INover a table-mode subquery:The
table(mapping)subquery is evaluated once, in historical mode, at query start. A new mapping row arriving while the query is running is never picked up — facts for newly mapped partition keys are silently missing from the live stream. Functionally wrong for this use case.Option B — streaming
INNER LATEST JOINagainst the mapping stream:This is semantically correct — new mappings are picked up live. But the left backfill (
seek_to = 'earliest') scans the entire history of the fact stream across all partition keys, only for the join to immediately discard everything that doesn't match the handful of mapping keys. On large fact streams this makes query startup take a very long time and burn I/O and CPU proportional to total history rather than to the entity's slice of it.This PR makes Option B fast: since the join will only ever emit left backfill rows whose keys exist in the right side's snapshot (see the correctness model below), those keys can be pushed into the left scan and everything else skipped at the storage layer.
How it works
Planning
For an eligible join (streaming
INNER LATEST JOIN, single equi-clause, non-bidirectional hash join, left side backfilling from the historical store viaseek_to,exec_mode = normal), the analyzer:streaming_join_key_domain_pushdown_max_rows/_max_bytes; exceeding a cap disables the optimization for the query (it falls back to a plain unfiltered backfill — never to partial filtering).Pruning layers (historical backfill only)
The key domain is applied to the left backfill in up to three independent, individually-safe layers:
IN (...)predicate (a tuple-INfor composite keys; or, for small domains ≤streaming_join_key_domain_pushdown_disjunctive_filter_max_rows, an index-friendlier OR-of-equalities form), feeding index analysis; a defensive post-read filter step guarantees exactness even if the storage push is not applicable.enable_streaming_join_key_domain_shard_pruning) — when the left stream has a deterministic sharding key over the join keys, including composite sharding expressions such ascity_hash64(key_a, key_b), historical reads skip shards that no domain key hashes to. Live reads remain on all shards.Each layer requires an exact type match against physical columns and degrades to "no pruning" (never to wrong pruning) when anything doesn't line up.
Correctness model: boundary gating
Pruning the left backfill is only sound if the join is guaranteed to evaluate that backfill against exactly the snapshotted right side. The PR introduces a two-boundary protocol inside the streaming join:
The result is a deterministic linearization — left backfill joins the right snapshot; live data joins everything thereafter — which produces the same rows as the unoptimized execution. Pushdown is only activated when the boundary installation is confirmed end-to-end; any bail-out at any stage disables both the gating and all pruning together.
Out-of-scope modes are excluded at plan time: materialized views and subscribe/recover (checkpointed) executions never engage the optimization (the execution mode signal is carried via query options and cannot be overridden by nested
SETTINGSclauses),table(...)historical queries are unaffected, and join shapes other than the supported one are rejected conservatively.Observability
New ProfileEvents:
StreamingJoinKeyDomainBuiltStreamingJoinKeyDomainStorageFilterPushedStreamingJoinKeyDomainPostFilterAppliedStreamingJoinKeyDomainShardPrunedStreamingJoinKeyDomainPrimaryKeyPrunedSettings
enable_streaming_join_key_domain_pushdownfalseenable_streaming_join_key_domain_shard_pruningfalsestreaming_join_key_domain_pushdown_max_rows10000streaming_join_key_domain_pushdown_max_bytes16 MiB0= no byte limit.streaming_join_key_domain_pushdown_disjunctive_filter_max_rows2560disables that form.Scope and limitations
INNER LATEST JOINwith a single conjunctive equi-clause — one or more AND'ed equality conditions, so composite join keys (e.g.ON l.key_a = r.key_a AND l.key_b = r.key_b) are fully supported, with the key domain tracked as a set of key tuples; other join types/strictnesses, OR'd join conditions, and joins with ON-filter conditions are unaffected.seek_toon the left side andexec_mode = normal(interactive queries). Materialized views and subscribe/recover executions run unoptimized.Future considerations
The core mechanism here — snapshot one input's bounded key domain, push it into the other input's historical scan as storage / primary-key / shard pruning, and keep results exact via boundary gating — generalizes well beyond the single join shape this PR enables. Natural extensions, roughly in increasing order of effort:
WHERE/PREWHERE, key-preserving expressions, andDISTINCT/GROUP BY-on-the-key would cover common analytic shapes — e.g. a fact subquery that pre-filters on a non-key column before the join — that currently fall back to a full scan.INNER LATEST JOINto (a)ASOF/ range joins by pushing the domain's[min, max]bound instead of an exact set, (b) OR'd equi-conditions by pushing the union of per-branch domains, and (c) the historical-backfill portions of regular stream-stream joins. Outer joins on the prunable side stay ineligible by construction (unmatched rows must still emit).streaming_join_key_domain_pushdown_max_rows, instead of disabling the optimization, fall back to an approximate-but-safe representation — a Bloom filter or a min/max range pushed as the storage filter, with the existing exact post-read filter still guaranteeing correctness. This lifts the optimization into high-cardinality mappings.Compatibility
WatermarkStamper) gained a new field, version-gated behind a min-version constant: old checkpoints recover cleanly on new binaries (verified empirically, including multi-substreamPARTITION BYstate written by a previous release), and post-recovery re-serialization stays on the recovered version for symmetric round-trips.ConcurrentHashJoin::joinLeftBlockdispatched left probe blocks by right-side key column positions; whenever left/right key ordinals differed in aparallel_hashjoin, rows were routed to the wrong shard (missed matches). Now dispatches by left positions. (Candidate for a separate backport.)Benchmark
Environment. Apple M4 Max (16 cores, 128 GB RAM), macOS; release build of this branch (proton 3.0.21); fresh single-node server on a fresh data directory. Every variant was run 3 times; I/O and engagement metrics are server-side
ProfileEventsdeltas captured around each run, output rows are counted exactly, and the tables below report per-run medians. Repetitions agreed to within ~0.001 % on rows/bytes read; wall-clock figures are limited by the 0.2 s output-polling resolution of the harness.Scenario (the motivation pattern above): fact stream of 100,000,000 rows over 10,000 partition keys (10,000 rows/key), 4 shards, sorting key =
partition_key; mapping stream of 10,000 rows (every partition key mapped). Two selectivities are measured:Setup:
Benchmark query (Option B from the motivation; the before runs set
enable_streaming_join_key_domain_pushdown = 0):S1 — 5 keys, 0.05 % selectivity (50,000 matching rows):
With
enable_streaming_join_key_domain_shard_pruning = 1, additionally one of the four shards was skipped entirely (ShardPruned = 1, storage filter pushed to the remaining 3 shards), with the same row/byte profile and identical output across all 3 repetitions.S2 — 100 scattered keys, 1 % selectivity (1,000,000 matching rows):
Reference floor. A one-shot historical
table()query over the same 5-key set (SELECT count() FROM table(bench_facts) WHERE partition_key IN (...)) completes in 0.12 s — the pushdown brings the streaming backfill (0.22 s) within ~2× of the pure historical floor while keeping full live-join semantics, versus 0.57 s and a 100 M-row scan without it.Engagement was verified per run via ProfileEvents deltas: baseline runs show all
StreamingJoinKeyDomain*counters at 0; pushdown runs showBuilt = 1andStorageFilterPushed = 4(or 3 +ShardPruned = 1with shard pruning).Testing
tests/stream/test_stream_smoke/0035_streaming_join_key_domain_pushdown.yaml— 24 cases, ids 0-23. Coverage includes positive pushdown with single/composite/aliased keys, empty right snapshots with live continuation, outer-join exclusion, row/byte/set-limit fallbacks, disjunctive and tuple-IN filter rendering, CTE/view/nested-subquery propagation, shard pruning, deterministic fallback for process-time and floating-point right prefixes,parallel_hashwith differing key ordinals andmax_threads=4, tumble-window-over-join with the 1-to-N expand topology, idle-after-backfill watermark delivery, versioned-kv right updates after snapshot, and empty-right-shard gating during live release.SETTINGSclauses checkpointing healthily with the optimization correctly disabled; sustained-ingest emission cadence through expand resize topologies; feature-off behavior parity against the previous release binary.Out-of-scope: float-key rejection and a pre-existing backfill race
Float-key rejection (a feature limitation). Floating-point join keys are rejected before pushdown engages —
float/doubleequality is not a sound basis for set-membership filtering (NaNnever equals itself,+0.0and-0.0compare equal yet hash differently, and rounding lets "the same" value be represented more than one way), so anIN-style key filter could silently drop rows the join should match. The optimization therefore degrades to the unoptimized plan whenever a join key is floating-point. A smoke case ("falls back for floating-point join keys") asserts both that pushdown stays off and that the normal streaming join still returns the correct live match.A separate, pre-existing backfill race (to be filed independently). While building that test we found a correctness bug in the unoptimized streaming
INNER LATEST JOINhistorical backfill that is independent of this feature and not float-specific: withseek_to = 'earliest', both sides backfill, and a left historical row can probe the right hash table before the matching right row has been loaded — so it is silently dropped (INNER), even though the same join intable()mode matches. The outcome is a non-deterministic race: across repeated runs of an identical single-row repro on the unoptimized path, roughly 40–60 % of runs drop the row, for both floating-point and integer keys. This predates the PR and reproduces with the optimization disabled (i.e. ondevelop); it is being tracked as a separate issue. The float fallback test deliberately exercises the live path (a left row inserted after query start) rather than asserting on the racy historical-backfill path. As an aside, the boundary-gating protocol introduced here happens to make the backfill deterministic for eligible joins (the optimized path matched every run), so the optimization sidesteps the race rather than relying on it.Notes for reviewers
LOGICAL_ERRORinvariants that are unreachable under the exec-mode exclusion), and recovery force-releases boundaries.VERSION_REVISIONandPENDING_UNMUTE_WATERMARK_MIN_VERSIONin lockstep at 157, and recheck that no release/develop revision already claimed 157.