Skip to content

Streaming join key-domain pushdown: prune left backfill by a snapshot of right-side join keys#1187

Open
sofiane-soufi wants to merge 4 commits into
timeplus-io:developfrom
sofiane-soufi:feature/streaming-join-key-domain-pushdown
Open

Streaming join key-domain pushdown: prune left backfill by a snapshot of right-side join keys#1187
sofiane-soufi wants to merge 4 commits into
timeplus-io:developfrom
sofiane-soufi:feature/streaming-join-key-domain-pushdown

Conversation

@sofiane-soufi

@sofiane-soufi sofiane-soufi commented Jun 8, 2026

Copy link
Copy Markdown
Contributor

Summary

This PR adds an opt-in query optimization for streaming INNER LATEST JOINs whose left side backfills from the historical store: at query start, a bounded snapshot of the right side's join-key domain is captured and pushed down into the left side's historical backfill scan as a key filter — enabling storage-level filtering, primary-key mark-range pruning, and (optionally) shard pruning. A boundary-gating protocol inside the join keeps results exactly equivalent to the unoptimized execution. Live (post-backfill) streaming reads are never pruned, so new right-side keys arriving after query start are matched normally going forward.

Everything is gated behind new settings that default off; with the settings off, planning and execution are unchanged.

Motivation

A common streaming pattern is a fact stream joined through a mapping (pivot) stream:

  • a high-volume, long-retention fact stream, partitioned/keyed by some partition key (e.g. partition_key), and
  • a small mapping stream that maps a user-facing entity key to the fact stream's partition keys (entity_key → partition_key, one-to-many, with new mappings arriving over time).

"Give me all facts for one entity, as a live stream" then requires resolving the entity through the mapping stream. There are two natural ways to write this today, and both have problems:

Option A — IN over a table-mode subquery:

SELECT ...
FROM facts
WHERE partition_key IN (SELECT partition_key FROM table(mapping) WHERE entity_key = {entity})

The table(mapping) subquery is evaluated once, in historical mode, at query start. A new mapping row arriving while the query is running is never picked up — facts for newly mapped partition keys are silently missing from the live stream. Functionally wrong for this use case.

Option B — streaming INNER LATEST JOIN against the mapping stream:

SELECT ...
FROM facts AS f
INNER LATEST JOIN
(
    SELECT partition_key FROM mapping WHERE entity_key = {entity}
) AS m
ON f.partition_key = m.partition_key
SETTINGS seek_to = 'earliest'

This is semantically correct — new mappings are picked up live. But the left backfill (seek_to = 'earliest') scans the entire history of the fact stream across all partition keys, only for the join to immediately discard everything that doesn't match the handful of mapping keys. On large fact streams this makes query startup take a very long time and burn I/O and CPU proportional to total history rather than to the entity's slice of it.

This PR makes Option B fast: since the join will only ever emit left backfill rows whose keys exist in the right side's snapshot (see the correctness model below), those keys can be pushed into the left scan and everything else skipped at the storage layer.

How it works

Planning

For an eligible join (streaming INNER LATEST JOIN, single equi-clause, non-bidirectional hash join, left side backfilling from the historical store via seek_to, exec_mode = normal), the analyzer:

  1. Builds a bounded snapshot of the right side's prefix: a separate, capped execution of the right-side plan up to per-shard stop sequence numbers, collecting the distinct join-key values (the key domain). Caps: streaming_join_key_domain_pushdown_max_rows / _max_bytes; exceeding a cap disables the optimization for the query (it falls back to a plain unfiltered backfill — never to partial filtering).
  2. Validates that the right prefix is deterministic and replayable (no stateful/non-deterministic functions, no aggregation/limit/sort/set-building steps, source identities matching across plan builds). Any doubt → fall back.
  3. Rebuilds the right-side plan bounded to the same snapshot sequence numbers, so the join's hash table at the gating boundary contains exactly the snapshotted keys.

Pruning layers (historical backfill only)

The key domain is applied to the left backfill in up to three independent, individually-safe layers:

  • Storage filter — the key set is pushed into the historical read as an IN (...) predicate (a tuple-IN for composite keys; or, for small domains ≤ streaming_join_key_domain_pushdown_disjunctive_filter_max_rows, an index-friendlier OR-of-equalities form), feeding index analysis; a defensive post-read filter step guarantees exactness even if the storage push is not applicable.
  • Primary-key mark pruning — when the join keys form a prefix of the left stream's sorting key, mark ranges that cannot contain any domain key are dropped.
  • Shard pruning (separate opt-in: enable_streaming_join_key_domain_shard_pruning) — when the left stream has a deterministic sharding key over the join keys, including composite sharding expressions such as city_hash64(key_a, key_b), historical reads skip shards that no domain key hashes to. Live reads remain on all shards.

Each layer requires an exact type match against physical columns and degrades to "no pruning" (never to wrong pruning) when anything doesn't line up.

Correctness model: boundary gating

Pruning the left backfill is only sound if the join is guaranteed to evaluate that backfill against exactly the snapshotted right side. The PR introduces a two-boundary protocol inside the streaming join:

  • a right boundary pauses right-side live sources at the snapshot stop sequence numbers until every join transform has consumed the snapshot (so the hash table is exactly the snapshot when backfill rows probe it), and
  • a left boundary delays right-side live data until the left historical backfill has fully completed (signalled by historical-end markers propagated through the pipeline, aligned across resize processors and parallel join lanes).

The result is a deterministic linearization — left backfill joins the right snapshot; live data joins everything thereafter — which produces the same rows as the unoptimized execution. Pushdown is only activated when the boundary installation is confirmed end-to-end; any bail-out at any stage disables both the gating and all pruning together.

Out-of-scope modes are excluded at plan time: materialized views and subscribe/recover (checkpointed) executions never engage the optimization (the execution mode signal is carried via query options and cannot be overridden by nested SETTINGS clauses), table(...) historical queries are unaffected, and join shapes other than the supported one are rejected conservatively.

Observability

New ProfileEvents:

Event Meaning
StreamingJoinKeyDomainBuilt A right-side key domain was built for a query
StreamingJoinKeyDomainStorageFilterPushed Key-domain filter pushed into a historical storage read
StreamingJoinKeyDomainPostFilterApplied Defensive post-read filter added to a backfill plan
StreamingJoinKeyDomainShardPruned A backfill read's shard set was reduced
StreamingJoinKeyDomainPrimaryKeyPruned A backfill read's PK mark ranges were reduced

Settings

Setting Default Description
enable_streaming_join_key_domain_pushdown false Master switch. Filters the streaming join left historical backfill from a bounded snapshot of right join keys. Live streaming reads are not pruned.
enable_streaming_join_key_domain_shard_pruning false Historical-only shard pruning on top of the pushdown. Live reads remain on all query shards.
streaming_join_key_domain_pushdown_max_rows 10000 Max right-side rows to snapshot. Exceeded → optimization disabled for the query.
streaming_join_key_domain_pushdown_max_bytes 16 MiB Max right-side bytes to snapshot. Exceeded → disabled. 0 = no byte limit.
streaming_join_key_domain_pushdown_disjunctive_filter_max_rows 256 Below this domain size, render the historical filter as an OR of key equalities (more index-friendly). 0 disables that form.

Scope and limitations

  • Applies to streaming INNER LATEST JOIN with a single conjunctive equi-clause — one or more AND'ed equality conditions, so composite join keys (e.g. ON l.key_a = r.key_a AND l.key_b = r.key_b) are fully supported, with the key domain tracked as a set of key tuples; other join types/strictnesses, OR'd join conditions, and joins with ON-filter conditions are unaffected.
  • Requires a backfill seek_to on the left side and exec_mode = normal (interactive queries). Materialized views and subscribe/recover executions run unoptimized.
  • The right-side prefix must be deterministic and bounded (caps above); aggregations, limits, set-building subqueries, stateful or non-deterministic functions on the right prefix disable the optimization.
  • Left-side nested subqueries propagate the pushdown only for plain identity projections (no WHERE/PREWHERE, expressions, DISTINCT, GROUP BY, etc. in the nested select) — anything else degrades safely to the unoptimized plan.
  • Pruning applies to the historical backfill only; live reads are never filtered.

Future considerations

The core mechanism here — snapshot one input's bounded key domain, push it into the other input's historical scan as storage / primary-key / shard pruning, and keep results exact via boundary gating — generalizes well beyond the single join shape this PR enables. Natural extensions, roughly in increasing order of effort:

  • Wider left-side propagation. Today the pushdown only threads through plain identity projections on the left. Teaching it to see through left WHERE/PREWHERE, key-preserving expressions, and DISTINCT/GROUP BY-on-the-key would cover common analytic shapes — e.g. a fact subquery that pre-filters on a non-key column before the join — that currently fall back to a full scan.
  • More join shapes. Extend from INNER LATEST JOIN to (a) ASOF / range joins by pushing the domain's [min, max] bound instead of an exact set, (b) OR'd equi-conditions by pushing the union of per-branch domains, and (c) the historical-backfill portions of regular stream-stream joins. Outer joins on the prunable side stay ineligible by construction (unmatched rows must still emit).
  • Bidirectional / mutual pruning. When both sides backfill from history, snapshot both key domains and prune each side by the other's keys (intersection), rather than the current one-directional right → left push.
  • Domains larger than the caps. Above streaming_join_key_domain_pushdown_max_rows, instead of disabling the optimization, fall back to an approximate-but-safe representation — a Bloom filter or a min/max range pushed as the storage filter, with the existing exact post-read filter still guaranteeing correctness. This lifts the optimization into high-cardinality mappings.
  • Materialized views and subscribe/recover. The biggest practical lever: today MV and checkpointed executions are excluded. Deriving the snapshot from checkpoint state (or re-deriving it deterministically on recover) would let long-running MV pipelines prune their initial historical catch-up — exactly the shape most production rollups take.
  • Snapshot reuse. Cache a right-side key-domain snapshot so multiple queries or MVs over the same mapping stream share one bounded scan instead of each rebuilding it.
  • Superset-safe live pre-filtering. Live reads must stay unfiltered for correctness (new keys arrive over time), but a continuously-grown Bloom / key-set could skip live blocks that provably contain no matchable key for very narrow joins — a CPU win on the hot path, as long as it only ever prunes provable non-matches.

Compatibility

  • All behavior is gated behind default-off settings; with them off, plans and pipelines are unchanged.
  • No checkpoint or wire-format breaks. One checkpointed component (WatermarkStamper) gained a new field, version-gated behind a min-version constant: old checkpoints recover cleanly on new binaries (verified empirically, including multi-substream PARTITION BY state written by a previous release), and post-recovery re-serialization stays on the recovered version for symmetric round-trips.
  • A pre-existing bug fixed in passing: ConcurrentHashJoin::joinLeftBlock dispatched left probe blocks by right-side key column positions; whenever left/right key ordinals differed in a parallel_hash join, rows were routed to the wrong shard (missed matches). Now dispatches by left positions. (Candidate for a separate backport.)

Benchmark

Environment. Apple M4 Max (16 cores, 128 GB RAM), macOS; release build of this branch (proton 3.0.21); fresh single-node server on a fresh data directory. Every variant was run 3 times; I/O and engagement metrics are server-side ProfileEvents deltas captured around each run, output rows are counted exactly, and the tables below report per-run medians. Repetitions agreed to within ~0.001 % on rows/bytes read; wall-clock figures are limited by the 0.2 s output-polling resolution of the harness.

Scenario (the motivation pattern above): fact stream of 100,000,000 rows over 10,000 partition keys (10,000 rows/key), 4 shards, sorting key = partition_key; mapping stream of 10,000 rows (every partition key mapped). Two selectivities are measured:

  • S1 — 0.05 % selectivity: one entity resolving to 5 partition keys, deliberately placed at the end of the sort order (worst-case placement for the baseline's first match).
  • S2 — 1 % selectivity: one entity resolving to 100 partition keys scattered uniformly across the key space (every 100th key — worst-case placement for granule pruning).

Setup:

CREATE STREAM bench_facts (partition_key uint64, payload uint64)
ENGINE = Stream(4, city_hash64(partition_key))
PRIMARY KEY partition_key;

CREATE STREAM bench_mapping (entity_key uint64, partition_key uint64);

-- 100M facts over 10,000 partition keys (10k rows/key)
INSERT INTO bench_facts (partition_key, payload)
SELECT number % 10000, number FROM numbers(100000000);

-- every partition key mapped; entity 42 owns the 5 keys at the end of the
-- sort order, entity 77 owns 100 keys scattered across the key space
INSERT INTO bench_mapping (entity_key, partition_key)
SELECT multi_if(number >= 9995, 42, number % 100 = 50, 77, 100000 + number), number
FROM numbers(10000);

Benchmark query (Option B from the motivation; the before runs set enable_streaming_join_key_domain_pushdown = 0):

SELECT f.partition_key, f.payload
FROM bench_facts AS f
INNER LATEST JOIN
(
    SELECT partition_key FROM bench_mapping WHERE entity_key = 42   -- 77 for S2
) AS m
ON f.partition_key = m.partition_key
SETTINGS
    seek_to = 'earliest',
    enable_streaming_join_key_domain_pushdown = 1,
    join_algorithm = 'parallel_hash',
    max_threads = 4,
    backfill_max_threads = 1;

S1 — 5 keys, 0.05 % selectivity (50,000 matching rows):

Metric Before (pushdown off) After (pushdown on) Gain
Rows read (backfill) 100,010,211 158,898 ~629× less
Bytes read (backfill) 1.60 GB 3.89 MB ~411× less
Backfill wall-clock (query start → backfill complete) 0.57 s 0.22 s ~2.6×
Time to first result 0.22 s 0.22 s unchanged
Peak memory (server, query) ~30 MB below sampling resolution
Rows emitted (correctness check) 50,000 50,000 identical output

With enable_streaming_join_key_domain_shard_pruning = 1, additionally one of the four shards was skipped entirely (ShardPruned = 1, storage filter pushed to the remaining 3 shards), with the same row/byte profile and identical output across all 3 repetitions.

S2 — 100 scattered keys, 1 % selectivity (1,000,000 matching rows):

Metric Before (pushdown off) After (pushdown on) Gain
Rows read (backfill) 100,010,210 25,548,639 ~3.9× less
Bytes read (backfill) 1.60 GB 613 MB ~2.6× less
Backfill wall-clock 0.59 s 0.59 s emission-bound (1 M output rows)
Rows emitted (correctness check) 1,000,000 1,000,000 identical output
Peak memory (server, query) ~31 MB ~46 MB both trivial

Reference floor. A one-shot historical table() query over the same 5-key set (SELECT count() FROM table(bench_facts) WHERE partition_key IN (...)) completes in 0.12 s — the pushdown brings the streaming backfill (0.22 s) within ~2× of the pure historical floor while keeping full live-join semantics, versus 0.57 s and a 100 M-row scan without it.

Engagement was verified per run via ProfileEvents deltas: baseline runs show all StreamingJoinKeyDomain* counters at 0; pushdown runs show Built = 1 and StorageFilterPushed = 4 (or 3 + ShardPruned = 1 with shard pruning).

Notes. The rows/bytes-read ratios are hardware-independent and are the durable result: baseline backfill I/O grows linearly with retained history, while pushdown I/O grows only with the matched slice (plus mark-granularity rounding), so the wall-clock gap widens on I/O-constrained deployments and longer retention. PrimaryKeyPruned stayed 0 because the pushed storage filter already pruned the same granules at index-analysis level; the dedicated PK pass adds value when the storage filter is not applicable. Wall-clock and TTFR figures carry ±0.2 s measurement resolution; rows/bytes/engagement counters are exact.

Testing

  • New smoke suite tests/stream/test_stream_smoke/0035_streaming_join_key_domain_pushdown.yaml — 24 cases, ids 0-23. Coverage includes positive pushdown with single/composite/aliased keys, empty right snapshots with live continuation, outer-join exclusion, row/byte/set-limit fallbacks, disjunctive and tuple-IN filter rendering, CTE/view/nested-subquery propagation, shard pruning, deterministic fallback for process-time and floating-point right prefixes, parallel_hash with differing key ordinals and max_threads=4, tumble-window-over-join with the 1-to-N expand topology, idle-after-backfill watermark delivery, versioned-kv right updates after snapshot, and empty-right-shard gating during live release.
  • Full smoke-suite run on the final branch state: 24/24 passed in 510 s, no retries, on a fresh server with a freshly rebuilt binary (all sources current). Prior milestones during development: 18/18 → 19/19 (×4) → 23/23 as coverage grew.
  • Verified empirically in earlier audit rounds beyond the suite: checkpoint recovery of multi-substream windowed state across binary versions; materialized-view-over-view configurations with nested SETTINGS clauses checkpointing healthily with the optimization correctly disabled; sustained-ingest emission cadence through expand resize topologies; feature-off behavior parity against the previous release binary.

Out-of-scope: float-key rejection and a pre-existing backfill race

Float-key rejection (a feature limitation). Floating-point join keys are rejected before pushdown engages — float/double equality is not a sound basis for set-membership filtering (NaN never equals itself, +0.0 and -0.0 compare equal yet hash differently, and rounding lets "the same" value be represented more than one way), so an IN-style key filter could silently drop rows the join should match. The optimization therefore degrades to the unoptimized plan whenever a join key is floating-point. A smoke case ("falls back for floating-point join keys") asserts both that pushdown stays off and that the normal streaming join still returns the correct live match.

A separate, pre-existing backfill race (to be filed independently). While building that test we found a correctness bug in the unoptimized streaming INNER LATEST JOIN historical backfill that is independent of this feature and not float-specific: with seek_to = 'earliest', both sides backfill, and a left historical row can probe the right hash table before the matching right row has been loaded — so it is silently dropped (INNER), even though the same join in table() mode matches. The outcome is a non-deterministic race: across repeated runs of an identical single-row repro on the unoptimized path, roughly 40–60 % of runs drop the row, for both floating-point and integer keys. This predates the PR and reproduces with the optimization disabled (i.e. on develop); it is being tracked as a separate issue. The float fallback test deliberately exercises the live path (a left row inserted after query start) rather than asserting on the racy historical-backfill path. As an aside, the boundary-gating protocol introduced here happens to make the backfill deterministic for eligible joins (the optimized path matched every run), so the optimization sidesteps the race rather than relying on it.

Notes for reviewers

  • The boundary protocol and its checkpoint interaction are deliberately conservative: barriers cannot be acknowledged while gating state is pending (guarded by LOGICAL_ERROR invariants that are unreachable under the exec-mode exclusion), and recovery force-releases boundaries.
  • All pruning consumers independently re-check the master setting, domain exactness, and boundary confirmation — partial application is impossible by construction; every bail-out path disables pruning and gating together.
  • On merge/rebase, keep VERSION_REVISION and PENDING_UNMUTE_WATERMARK_MIN_VERSION in lockstep at 157, and recheck that no release/develop revision already claimed 157.

…in keys

Add an opt-in optimization for streaming INNER LATEST JOIN whose left side
backfills from the historical store. At query start a bounded snapshot of the
right side's join-key domain is captured and pushed into the left backfill
scan as a storage filter, primary-key mark pruning, and optional shard
pruning. A two-boundary gating protocol inside the join keeps results exactly
equivalent to the unoptimized execution; live (post-backfill) reads are never
pruned, so right-side keys arriving after query start are matched normally.

Composite join keys are supported (the key domain is tracked as key tuples),
including composite sharding expressions such as city_hash64(key_a, key_b).
Floating-point join keys are rejected before pushdown engages and degrade to
the unoptimized plan.

All behavior is gated behind default-off settings:
  - enable_streaming_join_key_domain_pushdown
  - enable_streaming_join_key_domain_shard_pruning
  - streaming_join_key_domain_pushdown_max_rows / _max_bytes
  - streaming_join_key_domain_pushdown_disjunctive_filter_max_rows

One checkpointed component (WatermarkStamper) gains a version-gated field;
old checkpoints recover cleanly. Also fixes a pre-existing
ConcurrentHashJoin::joinLeftBlock bug that dispatched left probe blocks by
right-side key column positions, routing rows to the wrong shard when
left/right key ordinals differed in a parallel_hash join.

Adds smoke suite tests/stream/test_stream_smoke/0035_streaming_join_key_domain_pushdown.yaml (24 cases).
Bring the branch up to date with develop (11 commits) and resolve conflicts:

- cmake/autogenerated_versions.txt: bump VERSION_REVISION to 159. Upstream
  released 3.0.22 [157] and 3.0.23 [158] while this branch was open, so 157
  is no longer free; keep upstream's 3.0.23 patch line.
- WatermarkStamper: move the checkpoint gate
  PENDING_UNMUTE_WATERMARK_MIN_VERSION to 159 in lockstep, so the new field
  is introduced strictly above the highest shipped revision and checkpoints
  from 3.0.22/3.0.23 recover correctly as "no field".
- StorageStream.cpp / StreamShardStore.cpp: keep both sides' includes and
  ErrorCodes declarations. The key-domain join shard pruning coexists with
  upstream's IN-subquery / literal tuple-IN shard pruning (timeplus-io#1175); the two
  are complementary mechanisms.

Verified the three merged translation units compile.
@yokofly

yokofly commented Jun 8, 2026

Copy link
Copy Markdown
Collaborator

@codex review

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1702f00fd9

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/Processors/Transforms/Streaming/JoinTransform.cpp
@yl-lisen

yl-lisen commented Jun 8, 2026

Copy link
Copy Markdown
Collaborator

Some suggestions:

  • Avoid formatting unchanged code
  • The optimized code should be as independent as possible, reduce changes to existing code branches

A broad clang-format pass had reformatted code this PR does not change —
reflowed throw/exception calls, signatures collapsed onto one line, re-sorted
#include blocks, and brace-placement changes — adding noise to the diff in
ProfileEvents, QueryPipelineBuilder, ExpressionAnalyzer, and StorageStream.

Restore develop's formatting on all unchanged lines so the diff shows only
genuine changes. Each reverted hunk is whitespace-only (token-identical
before/after the revert) and the four files still compile; no behavior change.
@sofiane-soufi

sofiane-soufi commented Jun 8, 2026

Copy link
Copy Markdown
Contributor Author

Some suggestions:

  • Avoid formatting unchanged code
  • The optimized code should be as independent as possible, reduce changes to existing code branches

Avoid formatting unchanged code:

Good catch, fixed in d8d6606.

One note for transparency: some of develop's existing formatting is itself inconsistent, for example the doc-comment block in tryKeyValueJoin is mis-indented on develop (the comment sits at 8 spaces inside a 12-space block). Restoring develop's formatting on unchanged lines preserves that inconsistency rather than "fixing" unrelated code in this PR. It's probably worth a separate PR to run clang-format across develop so the baseline is actually clean.

Reduce the optimization's footprint inside existing functions by moving the
inlined logic into clearly-named helpers, leaving a single guarded call behind
(addresses review feedback to keep the optimization as independent as possible).
Behaviour-preserving — pure relocation, full build verified.

- QueryPipelineBuilder::joinPipelinesStreaming: the 56-line right-boundary
  source matching block -> matchRightSourcesForStreamingJoinBoundary().
- StorageStream::getShardsToRead: the shard-pruning block ->
  updateShardsWithStreamingJoinKeyDomainPruning().
- MergeTreeDataSelectExecutor: per-part PK mark filtering ->
  applyStreamingJoinPrimaryKeyPrefixFilter().
- StreamShardStore::readConcat: bounded snapshot-prefix plan construction ->
  StreamShardStore::buildBoundedPrefixPlanForSnapshotBoundary().
@sofiane-soufi

Copy link
Copy Markdown
Contributor Author

Some suggestions:

  • Avoid formatting unchanged code
  • The optimized code should be as independent as possible, reduce changes to existing code branches

The optimized code should be as independent as possible, reduce changes to existing code branches

Agreed, done in 135e052. I pulled the optimization logic that was inlined into existing functions out into clearly-named helpers, so each existing function now shows only a single guarded call where the feature hooks in. It's a pure relocation (behaviour-preserving), and the full proton build links clean.

What moved out of existing functions:

  • QueryPipelineBuilder::joinPipelinesStreaming: the 56-line right-boundary source-matching block → matchRightSourcesForStreamingJoinBoundary().
  • StorageStream::getShardsToRead: the shard-pruning block → updateShardsWithStreamingJoinKeyDomainPruning().
  • MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndices: the per-part PK mark filtering → applyStreamingJoinPrimaryKeyPrefixFilter().
  • StreamShardStore::readConcat: the bounded snapshot-prefix plan construction → StreamShardStore::buildBoundedPrefixPlanForSnapshotBoundary().

A few hooks are inherently interleaved with existing control flow and can't be cleanly extracted without a redesign: the boundary state machine in JoinTransform::work(), the watermark deferral in WatermarkStamper, and the marker propagation in ResizeProcessor::prepare(). Those already route through isolated helper methods on their own classes; what's left inline is genuine control-flow integration, so I left it as-is rather than hurt readability by forcing it out.

Happy to revisit any specific spot if you'd like it handled differently.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants