diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bf004cd..f4176e2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,3 +58,29 @@ jobs: - name: Run tests run: uv run pytest tests/ -v --cov=src/clgraph --cov-report=term-missing + + notebooks: + name: Example Notebooks + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.12' + + - name: Install Graphviz + run: sudo apt-get update && sudo apt-get install -y graphviz + + - name: Install uv + run: pip install uv + + - name: Install dependencies + run: | + uv sync + uv pip install -e ".[dev]" + + - name: Run example notebooks (skip LLM) + run: uv run python run_all_notebooks.py --skip-llm diff --git a/docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md b/docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md index 2572151..555c338 100644 --- a/docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md +++ b/docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md @@ -1,9 +1,29 @@ # CDC/SCD Pipeline: Gap Analysis & Example Notebook -**Date:** 2026-04-13 +**Date:** 2026-04-13 (updated 2026-04-14) **TODO item:** B — `examples/cdc_scd_pipeline.ipynb` **Goal:** Stress-test clgraph with a realistic CDC/SCD Type 2 pipeline, surface gaps in column-lineage capture, fix what's practical, and deliver a showcase notebook that honestly documents what works and what doesn't. +## Progress Summary + +**All 10 gaps closed.** No remaining open issues. + +| Closed | Fix | +|--------|-----| +| Gap 1 | Struct dot-access fallback for unresolvable table refs with `nested_path`/`access_type="struct"` (67859c7) | +| Gap 2 | Promote qualify metadata from subquery-based dedup `WHERE rn = 1` pattern (622e651) | +| Gap 3/10 | MERGE condition-gating edges with `merge_column_role='condition'` (778f918) | +| Gap 4 | Statement-scoped self-read nodes for self-referencing targets (5ef8dad) | +| Gap 5 | Verified: literal-only columns appear as terminal nodes with zero upstream edges | +| Gap 6 | Verified: `current_timestamp()` columns appear as output nodes, no incoming edges | +| Gap 7 | Tagged predicate edges for JOIN ON columns with `is_join_predicate=True` (12f2b62) | +| Gap 8 | WHERE filter lineage with `is_where_filter=True` and `where_condition` edges (bf6972d) | +| Gap 9 | Literal-bound MERGE ON predicate extraction as `merge_match_filter` edges (dfbd6b7) | + +Design docs: [Gap 4](2026-04-13-gap4-self-referencing-target-design.md), [Gap 7](2026-04-13-gap7-join-predicate-columns-design.md), [Gaps 1+2+8](2026-04-14-gaps-1-2-8-design.md) + +Test suites: `test_struct_dot_access.py` (Gap 1), `test_subquery_dedup_qualify.py` (Gap 2), `test_where_filter_lineage.py` (Gap 8), `test_cdc_scd_pipeline.py` (integration), `test_join_predicate_columns.py` (Gap 7) + ## Background clgraph already has MERGE parsing (`query_parser._parse_merge_statement`) that captures target, source, match columns, and matched/not-matched actions with per-column mappings. An example exists at `examples/merge_lineage.ipynb`. What is *not* yet proven is whether clgraph produces **correct and complete** column lineage for the specific MERGE patterns that appear in real-world CDC/SCD Type 2 pipelines. @@ -95,18 +115,18 @@ Each gap will be tested by running the fixture through `Pipeline` and inspecting - **I** — *Parses but lineage incomplete* (missing edges) - **F** — *Fails to parse* -| # | Gap | Where | Expected classification | -|---|-----|-------|------------------------| -| 1 | Struct field access on CDC envelope: `after.id` → `staging.id` | L1→L2 | I (may work via existing struct support, needs verification) | -| 2 | Dedup pattern: `QUALIFY rn = 1` / `WHERE rn = 1` after `ROW_NUMBER()` | L2 | Likely works (`qualify_lineage` exists); verify not dropped through CTE | -| 3 | **MERGE WHEN MATCHED trigger columns** — condition `t.name <> s.name` should contribute to the lineage of `end_time` and `is_active`, not just the assigned exprs | L3a Step 1 | **P / I** (likely current behavior only records assigned exprs) | -| 4 | **Self-referencing target** — Step 2 `LEFT JOIN dim_customer` on a table Step 1 just mutated. Pipeline lineage must treat the same table as both an input and output between statements | L3a | I (pipeline_lineage_builder behavior unknown) | -| 5 | Literal-only columns (`'Y' AS is_active`, `TIMESTAMP '9999-...'`) — should appear as terminal nodes, not be silently dropped | L3a Step 2 | Likely works; verify | -| 6 | `current_timestamp()` — function-only source (no column deps) | L3a both steps | Likely works; verify | -| 7 | `BETWEEN d.start_time AND d.end_time` in JOIN ON — does `fact_orders.customer_city_at_order` lineage include `d.start_time/end_time` as condition columns? | L3→fact | **P / I** (join predicate columns often omitted from column lineage) | -| 8 | `WHERE t.id IS NULL OR (...)` as sentinel for new-vs-versioned inserts — does the NULL branch get recorded? | L3a Step 2 | I (expected; logical branch not in column graph today) | -| 9 | MERGE's `ON t.id = s.id AND t.is_active = 'Y'` with literal predicate — match_columns extraction looks at `EQ` pairs only; literal-bound predicate may be dropped | L3a Step 1 | I | -| 10 | Does MERGE's close-action (UPDATE of `end_time`) show `is_active` as a **dependency** of `end_time` via the WHEN MATCHED condition? Impact analysis depends on this | L3a Step 1 | P (suspected) | +| # | Gap | Where | Expected classification | Status | +|---|-----|-------|------------------------|--------| +| 1 | Struct field access on CDC envelope: `after.id` → `staging.id` | L1→L2 | **I** — sqlglot parses `after.id` as `Column(table="after", name="id")`, indistinguishable from table-qualified ref. | **Closed** (67859c7) — struct fallback emits edges with `nested_path`/`access_type="struct"` when `Column.table` doesn't resolve | +| 2 | Dedup pattern: `QUALIFY rn = 1` / `WHERE rn = 1` after `ROW_NUMBER()` | L2 | **I** — Subquery-based dedup did not propagate qualify metadata to final output. | **Closed** (622e651) — detects ranking functions in subquery + `WHERE rn = 1` outer filter, promotes qualify metadata | +| 3 | **MERGE WHEN MATCHED trigger columns** — condition `t.name <> s.name` should contribute to the lineage of `end_time` and `is_active`, not just the assigned exprs | L3a Step 1 | **P / I** (likely current behavior only records assigned exprs) | **Closed** (778f918) | +| 4 | **Self-referencing target** — Step 2 `LEFT JOIN dim_customer` on a table Step 1 just mutated. Pipeline lineage must treat the same table as both an input and output between statements | L3a | I (pipeline_lineage_builder behavior unknown) | **Closed** (5ef8dad) | +| 5 | Literal-only columns (`'Y' AS is_active`, `TIMESTAMP '9999-...'`) — should appear as terminal nodes, not be silently dropped | L3a Step 2 | Verified: terminal nodes with zero upstream edges. Minor: `is_literal` flag not set (only used for VALUES clauses). | **Closed** (verified) | +| 6 | `current_timestamp()` — function-only source (no column deps) | L3a both steps | Verified: output nodes with `node_type=expression`, no incoming edges. Works in both MERGE UPDATE and INSERT contexts. | **Closed** (verified) | +| 7 | `BETWEEN d.start_time AND d.end_time` in JOIN ON — does `fact_orders.customer_city_at_order` lineage include `d.start_time/end_time` as condition columns? | L3→fact | **P / I** (join predicate columns often omitted from column lineage) | **Closed** (12f2b62) | +| 8 | `WHERE t.id IS NULL OR (...)` as sentinel for new-vs-versioned inserts — does the NULL branch get recorded? | L3a Step 2 | **I** — WHERE clause columns not tracked in column lineage | **Closed** (bf6972d) — `where_filter` edges with `is_where_filter=True` and `where_condition` metadata | +| 9 | MERGE's `ON t.id = s.id AND t.is_active = 'Y'` with literal predicate — match_columns extraction looks at `EQ` pairs only; literal-bound predicate may be dropped | L3a Step 1 | I | **Closed** (dfbd6b7) | +| 10 | Does MERGE's close-action (UPDATE of `end_time`) show `is_active` as a **dependency** of `end_time` via the WHEN MATCHED condition? Impact analysis depends on this | L3a Step 1 | P (suspected) | **Closed** (778f918, same fix as Gap 3) | **Probably-already-works (sanity checks, not gaps):** cross-CTE propagation (fixed by 8aaa454), column extraction for `DATE(order_ts)`, `SUM(amount)` aggregates, GROUP BY columns. @@ -122,30 +142,31 @@ Three artifacts, in order: ### 2. Gap fixes (scope-limited, each in its own commit) Only fixes that are **localized and low-risk** land in this effort. Larger architectural changes (e.g., introducing a new edge type for "predicate-conditional" columns) are documented as follow-ups in `TODO.md`, not attempted here. -Likely in-scope fixes: -- Gap 3/10: extend `_parse_merge_statement` to record WHEN MATCHED `condition` columns as inputs to each assigned target column's mapping. -- Gap 9: generalize match_columns extraction to skip literal-bound predicates cleanly rather than dropping the whole predicate. - -Likely out-of-scope (documented as follow-ups): -- Gap 7 (join-predicate columns in column lineage) — this is a project-wide convention question, not a CDC-specific fix. -- Gap 8 (logical-branch lineage from NULL sentinels) — needs a new edge semantic. +All fixes completed (each in its own commit): +- **Gap 1** (67859c7): Struct dot-access fallback — when `Column.table` doesn't resolve to a known table/alias/subquery in scope, emits a lineage edge with `nested_path` and `access_type="struct"` using the first base table from the dependency chain as the source. Handles recursive base table resolution for CDC-like subquery patterns. +- **Gap 2** (622e651): Subquery-based dedup promotion — detects the common pattern (ROW_NUMBER/RANK/DENSE_RANK/NTILE in subquery + `WHERE rn = 1` in outer query) and promotes qualify metadata (`is_qualify_column`, `qualify_context`, `qualify_function`) to the outer unit. Adds `ranking_window_columns` to `QueryUnit` for cross-unit metadata propagation. +- **Gap 3/10** (778f918): Extended MERGE parsing to extract column references from WHEN MATCHED conditions and emit edges with `merge_column_role='condition'`. Condition columns (e.g., `staging.name`) now appear as upstream inputs to assigned target columns (e.g., `dim_customer.end_time`). +- **Gap 4** (5ef8dad): Implemented statement-scoped table versioning with `{query_id}:self_read:{table}.{col}` naming. Self-read nodes represent the prior table state, enabling correct lineage when the same table is both input and output across pipeline statements. +- **Gap 7** (12f2b62): Emits tagged predicate edges for JOIN ON clause columns with `is_join_predicate=True`, `join_condition`, and `join_side` metadata. Supports equi-joins, range/BETWEEN, function-wrapped predicates, and multi-join chains. +- **Gap 8** (bf6972d): WHERE filter lineage — columns referenced in WHERE clauses now produce `where_filter` edges to all non-star output columns, with `is_where_filter=True` and `where_condition` metadata. Subquery columns within WHERE are excluded from the outer query's predicates. Also fixes `trace_forward` BFS to treat nodes as terminals when all outgoing targets are already visited. +- **Gap 9** (dfbd6b7): Extracts literal-bound ON predicates in MERGE (e.g., `t.is_active = 'Y'`) and emits lineage edges with `merge_match_filter` edge type and `merge_column_role="condition"`. ### 3. Showcase notebook (`examples/cdc_scd_pipeline.ipynb`) Structure: 1. **Narrative intro** — what a CDC/SCD2 pipeline is, why it matters, what we're testing. 2. **The pipeline SQL** — the 6 statements above, with comments pointing out the interesting structures. 3. **Build + visualize** — `Pipeline(...)` → lineage graph rendered (GraphViz), table-level DAG. -4. **What clgraph captures** — 4-5 concrete impact-analysis queries (e.g., "what downstream columns depend on `raw_customer_cdc.after.city`?"), showing them returning correct results. -5. **Known limitations** — honest subsection listing the remaining gaps (7, 8, and any others not fixed), each with a short SQL snippet and what the ideal answer would be. Links to the xfail tests. +4. **What clgraph captures** — concrete impact-analysis queries (e.g., "what downstream columns depend on `raw_customer_cdc.after.city`?"), showcasing all 10 resolved gaps: struct dot-access (1), dedup qualify promotion (2), MERGE condition-gating (3/10), self-referencing targets (4), literal terminals (5), function-only sources (6), JOIN predicate columns (7), WHERE filter lineage (8), and MERGE literal predicates (9). +5. **Edge semantics showcase** — demonstrate the new edge types and metadata: `access_type="struct"`, `is_qualify_column`, `merge_column_role`, `is_join_predicate`, `is_where_filter`, `merge_match_filter`. ## Acceptance Criteria - [ ] `tests/test_cdc_scd_pipeline.py` exists, runs in CI, and has tests for every row in the gap table. -- [ ] Each xfail is tagged with a specific gap number and a reason string. +- [ ] All 10 gap tests pass (no xfails remaining). - [ ] Every fix commit has: failing test → fix → passing test (TDD, per repo convention). - [ ] The notebook runs end-to-end via `run_all_notebooks.py` with no errors. -- [ ] The notebook's "Known limitations" section matches the current xfail list (no silent discrepancies). -- [ ] `TODO.md` updated: item B checked off; follow-up gaps listed as their own new entries. +- [ ] The notebook showcases all 10 resolved gaps with concrete lineage queries. +- [ ] `TODO.md` updated: item B checked off. ## Out of Scope @@ -157,26 +178,26 @@ Structure: ## Risks & Open Questions -1. **Risk: gap 3/10 fix has wider blast radius than expected.** The MERGE WHEN-MATCHED condition column inputs may already be partially captured elsewhere (e.g., in `match_columns`) in a different way; we need to not duplicate or contradict that. Mitigation: read `lineage_builder`'s MERGE handling before editing `query_parser`. -2. **Open: how should "trigger columns" be represented in the column graph?** Options: (a) same edge type as assignment inputs — simplest, but flattens semantics; (b) new edge attribute `role: "trigger" | "assignment"`. Recommend (a) for this iteration — keep it simple, revisit if users ask for the distinction. -3. **Open: should the staging CTE preserve `op` column for dim?** CDC deletes usually produce SCD2 tombstones (`is_active='N', end_time=now()`), not hard deletes. For this iteration we filter `op IN ('c','u')` and list delete-handling as a follow-up gap. +1. **~~Risk: gap 3/10 fix has wider blast radius than expected.~~** **RESOLVED.** The fix uses `merge_column_role` to distinguish condition vs assignment edges, avoiding duplication with `match_columns`. No blast radius issues encountered. +2. **~~Open: how should "trigger columns" be represented in the column graph?~~** **RESOLVED.** Chose option (b): edges carry `merge_column_role='condition'` vs `merge_column_role='assignment'`, giving users the ability to distinguish trigger vs assignment semantics without flattening. +3. **Open: should the staging CTE preserve `op` column for dim?** CDC deletes usually produce SCD2 tombstones (`is_active='N', end_time=now()`), not hard deletes. For this iteration we filter `op IN ('c','u')` and list delete-handling as a follow-up gap. **Analysis:** The `op` column is selected into `staging_customer_latest` but never referenced by the downstream MERGE or INSERT statements — it's present but unused. The test fixture in `test_cdc_scd_pipeline.py` assumes `staging_customer_latest` already exists, so the staging CTE is not yet tested. This is a data modeling choice, not a lineage bug. Reconsider if future requirements need full-envelope preservation or delete-handling. ### Additional risks surfaced from code inspection -4. **Risk: gap 3/10 may already be half-plumbed, which makes the "fix" subtler than it looks.** `trace_strategies.trace_merge_columns` (`trace_strategies.py:186-229`) already stamps each MERGE `ColumnEdge` with `merge_condition` (raw SQL of the WHEN clause) and `merge_action`. The condition is therefore *recorded as edge metadata* but the columns *referenced by that condition* are not emitted as upstream inputs of the assigned target columns. The fix is not "start capturing the condition" — it's "parse the stored condition into column refs and add them as edges." Mitigation: the parser already has a column-extraction pass we can reuse; don't re-implement it in the MERGE path. +4. **~~Risk: gap 3/10 may already be half-plumbed.~~** **RESOLVED.** The fix (778f918) parses the stored WHEN condition into column refs and adds them as condition-gating edges with `merge_column_role='condition'`. Reused the existing column-extraction pass as recommended. -5. **Risk: gap 4 (self-referencing target) is load-bearing and unmitigated today.** Confirmed: clgraph has *no* multi-statement table versioning — `depends_on_tables` / `depends_on_units` in `pipeline_lineage_builder.py:76-108` reference tables by name only, with no N-vs-N+1 snapshot distinction. Step 2's `LEFT JOIN dim_customer t` will collapse onto the same node Step 1 just wrote, producing a self-loop in the pipeline graph. This is the dominant correctness issue for SCD2 and is *not* in the "likely in-scope fixes" list. Decide explicitly: either (a) accept the self-loop and document it as gap 4's known limitation in the notebook, or (b) promote gap 4 to in-scope and design a minimal "statement-index-scoped table ref" extension. Recommend (a) for this iteration — the architectural change for (b) is out of proportion with the stated deliverable. +5. **~~Risk: gap 4 (self-referencing target) is load-bearing and unmitigated.~~** **RESOLVED.** Promoted to in-scope; implemented statement-scoped table versioning with self-read nodes (`{query_id}:self_read:{table}.{col}`). Design doc: `2026-04-13-gap4-self-referencing-target-design.md`. Tests in `test_cdc_scd_pipeline.py` (16 test classes). -6. **Risk: gap 7 is worse than the design implies.** Design says "join predicate columns often omitted." Code search confirms JOIN ON predicate columns produce **zero** lineage edges today (no handling in `lineage_builder`). The fact-layer BETWEEN join therefore produces no evidence of `dim_customer.start_time/end_time` influence on `fact_orders.customer_city_at_order` beyond the equi-join on `customer_id`. The test assertion should be a hard `xfail` with a clear message, not a soft "incomplete" — and the notebook's "known limitations" section needs an explicit callout so users don't silently trust the temporal join. +6. **~~Risk: gap 7 is worse than the design implies.~~** **RESOLVED.** Implemented tagged predicate edges for JOIN ON clause columns with `is_join_predicate=True` metadata. Design doc: `2026-04-13-gap7-join-predicate-columns-design.md`. Tests in `test_join_predicate_columns.py` (38 test methods). -7. **Risk: gap 9 framing is inaccurate.** `match_columns` extraction (`query_parser.py:612-619`) walks EQ nodes and pairs columns — a literal-bound EQ like `t.is_active = 'Y'` produces an EQ with a literal on one side, which the current code likely filters (column-column pairs only) but may or may not drop cleanly. Before writing the fix, add a unit test that asserts current behavior on `ON t.id = s.id AND t.is_active = 'Y'`, then decide. The design's claim that "literal-bound predicate may be dropped" needs to be reduced to fact before coding. +7. **~~Risk: gap 9 framing is inaccurate.~~** **RESOLVED.** Fix (dfbd6b7) extracts literal-bound ON predicates and emits `merge_match_filter` edges. Unit tests confirm behavior on `ON t.id = s.id AND t.is_active = 'Y'`. -8. **Risk: struct-column node naming is underspecified for test assertions.** `_extract_nested_path_from_expression` (`lineage_utils.py:335-395`) emits a `(table_ref, column_name, json_path, json_function, nested_path, access_type)` tuple for `after.id` — meaning the column node shape is not a single string but a structured reference. Tests that assert `"after.id"` as a node label will be brittle. Pin the exact shape by reading one existing struct-access test before writing fixture assertions. +8. **~~Risk: struct-column node naming is underspecified for test assertions.~~** **RESOLVED.** Gap 1 fix (67859c7) implements the struct fallback using the same `nested_path`/`access_type` metadata pattern. Tests in `test_struct_dot_access.py` assert `edge.nested_path == ".id"`, `edge.access_type == "struct"`, consistent with existing `test_struct_array_subscript.py` conventions. -9. **Risk: CI execution model.** The notebook acceptance says "runs via `run_all_notebooks.py` with no errors." clgraph is a static parser, so the SQL does not need a live Delta/Databricks engine — but any cell that calls `spark.sql(...)`, `dbutils`, or prints a rendered Delta table will fail in CI. Keep all cells to: (a) strings of SQL, (b) `Pipeline(...)` calls, (c) `pyvis`/graphviz renders against file output only. +9. **Risk: CI execution model.** **OPEN — integration gap, not correctness bug.** All 25 existing notebooks use static SQL + `Pipeline()` only (no `spark.sql()`, `dbutils`). Convention is sound. However, `run_all_notebooks.py` is **not wired into CI** — `.github/workflows/ci.yml` runs only `uv run pytest tests/`. The acceptance criterion "runs via `run_all_notebooks.py`" is not enforced. Action: add CI step `python run_all_notebooks.py --skip-llm` after CDC/SCD notebook is created. -10. **Open: de-duplicate with existing MERGE tests.** `tests/test_merge_statements.py` already covers basic MERGE parsing, match_columns, matched/not-matched actions, and edge properties. The new `tests/test_cdc_scd_pipeline.py` should cover *only* the CDC-pipeline-shaped assertions (multi-statement, self-reference, envelope flatten, SCD2-pair semantics) and reference existing tests for the single-statement primitives. Otherwise the gap backlog becomes ambiguous. +10. **~~Open: de-duplicate with existing MERGE tests.~~** **RESOLVED.** Verified: `test_merge_statements.py` covers single-statement MERGE parsing (match_columns, action column mappings). `test_cdc_scd_pipeline.py` covers multi-statement cross-query semantics (self-reference, topological sort, cross-query edges). Zero overlap — properly scoped by design. -11. **Open: QUALIFY assertion target.** QUALIFY partition/order columns are emitted via edges with `context="qualify_partition"` / `context="qualify_order"` and `is_qualify_column=True` (`lineage_builder.py:376-442`). When writing gap 2's test, assert on `is_qualify_column`/`qualify_context` metadata, not on the plain node set — that's what will catch the "silently dropped through CTE" regression the design worries about. +11. **~~Open: QUALIFY assertion target.~~** **RESOLVED.** Gap 2 fix (622e651) promotes qualify metadata from subquery-based dedup patterns. Tests in `test_subquery_dedup_qualify.py` use the same metadata assertion pattern (`is_qualify_column`, `qualify_context`, `qualify_function`) as `test_qualify_clause.py`. -12. **Open: should gap 4 (self-reference) and gap 7 (join-predicate columns) graduate to their own design docs?** Both are cross-cutting architectural shifts (statement-scoped table refs; a new edge semantic for predicate-conditional columns). If we leave them as one-liners in `TODO.md`, they will rot. Recommend: this effort closes with two short follow-up design stubs, not just backlog entries. +12. **~~Open: should gap 4 and gap 7 graduate to their own design docs?~~** **RESOLVED.** Both now have dedicated design docs in `docs/superpowers/specs/`. diff --git a/docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md b/docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md index 7095f93..e09847e 100644 --- a/docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md +++ b/docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md @@ -48,7 +48,7 @@ For Step 2 (`INSERT INTO dim_customer ... LEFT JOIN dim_customer`), `destination **2. Single `TableNode` per name in `table.py:160-164`.** `TableDependencyGraph.add_table` deduplicates by name. Both the Step 1 MERGE target and the Step 2 INSERT target resolve to the same `TableNode("dim_customer")`. That node has a single `created_by` slot (set by the first DDL; MERGE/INSERT are DML so they go to `modified_by`). There is no concept of "version N" vs "version N+1." -**3. Shared column naming in `pipeline_lineage_builder.py:734-736`.** +**3. Shared column naming in `pipeline_lineage_builder.py:708-736` (`_make_full_name`, physical-table return at line 736).** `_make_full_name` for physical table columns returns `{table_name}.{column_name}` with no query-scoped prefix. Both Step 1's input `dim_customer.id` and Step 2's output `dim_customer.id` map to the same `full_name`, so the pipeline graph has exactly one `ColumnNode` for each column of `dim_customer`, shared across all statements. **4. Topological sort self-dependency in `table.py:188-204`.** @@ -121,8 +121,8 @@ staging_customer_latest.id ──► dim_customer@v2.id (Step 2 INSERT) **User-facing graph shape:** ``` staging.id ──► dim_customer.id (Step 1 output) -dim_customer.id ──[snapshot]──► q1:snapshot:dim_customer.id -q1:snapshot:dim_customer.id ──► dim_customer.id (Step 2 output, via INSERT) +dim_customer.id ──[snapshot]──► {query_id}:snapshot:dim_customer.id +{query_id}:snapshot:dim_customer.id ──► dim_customer.id (Step 2 output, via INSERT) staging.id ──► dim_customer.id (Step 2 output) ``` @@ -131,36 +131,36 @@ staging.id ──► dim_customer.id (Step 2 output) ### Option C: Read-Before-Write Detection with Ordered DML Edges -**Idea:** Do not create new nodes. Instead, recognize self-referencing tables at the pipeline level, stop excluding them from `source_tables`, and annotate the resulting column edges with a `statement_order` attribute. The self-read edges are distinguished from self-write edges by their `statement_order` and a new `edge_role` ("prior_state_read" vs "write"). +**Idea:** Do not create new nodes. Instead, recognize self-referencing tables at the pipeline level, stop excluding them from `source_tables`, and annotate the resulting column edges with a `statement_order` attribute. The self-read edges are distinguished from normal edges by their `statement_order` and a new `edge_role` (`"prior_state_read"` for within-query self-reads, `"cross_query_self_ref"` for cross-query wiring, `None` for normal edges). **Data model change:** - `ParsedQuery` gains `self_referenced_tables: Set[str]` (tables that appear in both `destination_table` and `source_tables` for the same query, or across consecutive queries targeting the same table). -- `ColumnEdge` gains `statement_order: Optional[int]` and `edge_role: Optional[str]` (values: `"prior_state_read"`, `"write"`, or `None` for normal edges). +- `ColumnEdge` gains `statement_order: Optional[int]` and `edge_role: Optional[str]` (values: `"prior_state_read"`, `"cross_query_self_ref"`, or `None` for normal edges). - `TableNode` gains `self_referencing_queries: List[str]` tracking which queries read-before-write. **Scope of code touched:** - `multi_query.py:286`: remove the `table_name == destination_table` filter; instead, when a source table matches the destination, add it to both `source_tables` and `self_referenced_tables`. - `table.py:188-204`: when building query dependencies for a self-referencing query, the query depends on all *prior* modifications to the same table, but not on itself. Add cycle-prevention logic. - `pipeline_lineage_builder.py`: - - `_infer_table_name` (line 680-700): for input columns from a self-referenced table, qualify the node name with `{query_id}:self_read:{table_name}` to avoid colliding with the output node. + - `_infer_table_name` (starts at line ~654): for input columns from a self-referenced table, qualify the node name with `{query_id}:self_read:{table_name}` to avoid colliding with the output node. - `_make_full_name`: add a branch for self-read input nodes. - `_add_query_edges`: stamp edges from self-read nodes with `edge_role="prior_state_read"` and `statement_order`. - `_add_cross_query_edges`: connect the prior output (from the previous statement) to the self-read input node of the current statement. - `models.py`: add `statement_order` and `edge_role` to `ColumnEdge`; add `self_referenced_tables` to `ParsedQuery`. - Exporters: include new edge attributes in JSON/GraphViz output. -**User-facing graph shape:** +**User-facing graph shape** (using concrete query IDs `query_0` for Step 1, `query_1` for Step 2): ``` ┌── staging.id ──────────────────┐ │ ▼ -staging.id ──► dim_customer.id ──► q1:self_read:dim_customer.id ──► dim_customer.id - (Step 1 output) (Step 2 reads prior state) (Step 2 output) +staging.id ──► dim_customer.id ──► query_1:self_read:dim_customer.id ──► dim_customer.id + (Step 1 output) (Step 2 reads prior state) (Step 2 output) ``` -The self-read node is query-scoped and represents "dim_customer as it existed before this query ran." Cross-query edges connect Step 1's output `dim_customer.id` to Step 2's `q1:self_read:dim_customer.id`. +The self-read node is query-scoped and represents "dim_customer as it existed before this query ran." Cross-query edges connect Step 1's output `dim_customer.id` to Step 2's `{query_id}:self_read:dim_customer.id`. **Pros:** Minimal new concepts; edges carry explicit semantics; physical output nodes keep their canonical names; no version numbering scheme. -**Cons:** Self-read nodes are still a new node variant; `q1:self_read:dim_customer.id` naming needs renderer support. +**Cons:** Self-read nodes are still a new node variant; `{query_id}:self_read:dim_customer.id` naming needs renderer support. ## Recommendation: Option C (Read-Before-Write with Ordered DML Edges) @@ -192,47 +192,96 @@ No edge from `dim_customer` to itself. The self-read in Step 2 is lost. **After (Option C):** ``` -Step 1 (MERGE): - staging_customer_latest.id ──[merge,write]──► dim_customer.id - staging_customer_latest.name ──[merge,write]──► dim_customer.name - staging_customer_latest.city ──[merge,write]──► dim_customer.city - dim_customer.id ──[merge,prior_state_read]──► dim_customer.end_time - dim_customer.is_active ──[merge,prior_state_read]──► dim_customer.end_time +Step 1 (MERGE, query_0): + staging_customer_latest.id ──► dim_customer.id (is_merge=true, edge_role=None) + staging_customer_latest.name ──► dim_customer.name (is_merge=true, edge_role=None) + staging_customer_latest.city ──► dim_customer.city (is_merge=true, edge_role=None) + dim_customer.id ──► dim_customer.end_time (is_merge=true, edge_role=None) + dim_customer.is_active ──► dim_customer.end_time (is_merge=true, edge_role=None) Cross-query (Step 1 output -> Step 2 self-read): - dim_customer.id ──[cross_query]──► step2:self_read:dim_customer.id - dim_customer.is_active ──[cross_query]──► step2:self_read:dim_customer.is_active - -Step 2 (INSERT): - staging_customer_latest.id ──[write]──► dim_customer.id - staging_customer_latest.name ──[write]──► dim_customer.name - step2:self_read:dim_customer.id ──[prior_state_read]──► dim_customer.id - step2:self_read:dim_customer.is_active ──[prior_state_read]──► dim_customer.is_active - step2:self_read:dim_customer.name ──[prior_state_read]──► dim_customer.name + dim_customer.id ──► query_1:self_read:dim_customer.id (edge_role=cross_query_self_ref) + dim_customer.is_active ──► query_1:self_read:dim_customer.is_active + (edge_role=cross_query_self_ref) + +Step 2 (INSERT, query_1): + staging_customer_latest.id ──► dim_customer.id (edge_role=None) + staging_customer_latest.name ──► dim_customer.name (edge_role=None) + query_1:self_read:dim_customer.id ──► dim_customer.id (edge_role=prior_state_read) + query_1:self_read:dim_customer.is_active ──► dim_customer.is_active + (edge_role=prior_state_read) + query_1:self_read:dim_customer.name ──► dim_customer.name (edge_role=prior_state_read) ``` +Note: `is_merge_operation` and `merge_action` are existing `ColumnEdge` fields; `edge_role` is the new field added by this design. They are independent attributes on the same edge, not comma-separated compound values. + The key difference: `dim_customer` columns that Step 2 reads from the LEFT JOIN flow through query-scoped `self_read` nodes, which are connected to Step 1's output via cross-query edges. The graph is a DAG (no self-loops). ## Implementation Sketch -### Phase 1: Stop filtering self-references +### Phase 1: Stop filtering self-references (source-scope only) File: `multi_query.py`, method `_extract_source_tables`. -Change: when `table_name == destination_table`, do not `continue`. Instead, add the table to both `source_tables` and a new field `self_referenced_tables` on `ParsedQuery`. +Change: when `table_name == destination_table`, check whether the `Table` node appears in a source scope (FROM, JOIN, USING subquery) rather than in the target slot. `sqlglot.ast.find_all(exp.Table)` walks into the target slot (`Insert.this`, `Merge.this`, `Update.this`), so naively removing the filter would mark every INSERT/MERGE as self-referencing even when the body never reads the target. + +**Detection heuristic:** a Table node is a self-reference if it is a descendant of one of the source-scope AST containers: `exp.From`, `exp.Join`, `exp.Subquery`, or `exp.Merge.args["using"]` — but NOT if its immediate parent chain leads to `Insert.this`, `Merge.this`, `Update.this`, or `Create.this`. ```python -if table_name == destination_table: - self_referenced = True - # Still add to source_tables — it IS a source +# Build set of Table nodes that sit in the target slot (not source scope). +# IMPORTANT: for INSERT INTO t (col1, col2) SELECT ... and CREATE TABLE t (col1 ...), +# sqlglot wraps the target in an exp.Schema (not exp.Table). The Schema's .this +# child is the actual exp.Table. We must collect both the Schema and its inner +# Table to avoid false-positive self-reference detection. +target_table_nodes = set() +if isinstance(ast, (exp.Create, exp.Insert, exp.Merge, exp.Update, exp.Delete)): + if ast.this: + # ast.this may be exp.Table, exp.Schema, or other node + target_table_nodes.add(id(ast.this)) + # Also collect all Table nodes nested inside the target slot + # (handles Schema -> Table, Schema -> Table with column list, etc.) + for t in ast.this.find_all(exp.Table): + target_table_nodes.add(id(t)) + +for table_node in ast.find_all(exp.Table): + table_name = self._get_table_name(table_node, tokenizer) + if not table_name: + continue + # Skip Table nodes that are the target slot itself + if id(table_node) in target_table_nodes: + continue + # Self-reference: table appears in source scope with same name as destination + if table_name == destination_table: + self_referenced_tables.add(table_name) + # Still add to source_tables — it IS a source + # ... existing CTE alias filter ... + tables.add(table_name) ``` +**Note on DELETE statements:** `_extract_operation_and_destination` (`multi_query.py:212-257`) currently does not handle `exp.Delete`. The following changes are required: + +(a) Add `DELETE = "DELETE"` to the `SQLOperation` enum (`models.py:817-841`) and include `DELETE` in `SQLOperation.is_dml()` so it is recognized as a DML operation. Note: the existing `DELETE_AND_INSERT = "DELETE+INSERT"` (`models.py:835`) is a pre-existing composite operation for the atomic delete-then-insert pattern, parsed as a single unit. The new `DELETE` is for standalone DELETE statements. Both enum values coexist and serve distinct purposes. `DELETE` must be added to the `is_dml()` method's member list alongside the existing DML operations. + +(b) Add an `exp.Delete` branch to `_extract_operation_and_destination` that extracts the target table name from `exp.Delete.this` and returns `(SQLOperation.DELETE, table_name)`. + +(c) Add `exp.Delete` to the `isinstance` check at `_extract_source_tables` line 274 (the one that sets `destination_table` via `_extract_operation_and_destination`), not just the `target_table_nodes` check. Without this, DELETE statements will not populate `destination_table`, and the self-reference detection logic will not fire for DELETE targets. + +Without these three changes, DELETE statements will not be recognized as targeting a table, and Test 7 (DELETE-then-INSERT) will fail. + +This ensures `INSERT INTO dim_customer SELECT ... FROM staging` does NOT mark `dim_customer` as self-referenced (the Table node is in the target slot only), while `INSERT INTO dim_customer SELECT ... FROM staging LEFT JOIN dim_customer` DOES (the JOIN's Table node is in source scope). + +Acceptance criterion #3 updated to match: `_extract_source_tables` does not filter `destination_table` from `source_tables` when the table appears in a **source scope** (FROM/JOIN/USING), as detected by AST node identity rather than name matching alone. + ### Phase 2: Cycle-safe dependency graph +#### Phase 2a: `_build_query_dependencies` self-exclusion + File: `table.py`, method `_build_query_dependencies`. Change: when computing deps for a query, exclude the query itself from its own dependency set. A query that writes to and reads from `dim_customer` should depend on *prior* queries that wrote `dim_customer`, not on itself. +Note: this self-exclusion applies **globally** to all queries in `_build_query_dependencies`, not only to self-referencing ones. This is correct because a query should never depend on itself regardless of the reason. The guard is harmless for non-self-referencing queries (their `query_id` never appears in their own `modified_by`). + ```python if table_node.created_by and table_node.created_by != query_id: deps[query_id].add(table_node.created_by) @@ -241,17 +290,134 @@ for mod_id in table_node.modified_by: deps[query_id].add(mod_id) ``` +#### Phase 2b: `_build_table_dependencies` self-exclusion + +File: `table.py`, method `_build_table_dependencies` (lines 206-243). + +This method builds table-level dependencies and is used by `get_dependencies`, `get_downstream`, and `get_execution_order`. Without a fix, a self-referencing table like `dim_customer` would list itself as its own dependency (since it appears in both source and destination roles). This creates a spurious self-dependency cycle at the table level even after Phase 2a fixes the query level. + +Change: when iterating source tables for a given table, skip entries where `source_table == table_name`: + +```python +for source_table in query.source_tables: + if source_table == table_name: + continue # skip self-dependency + if source_table in self.tables: + deps[table_name].add(source_table) +``` + ### Phase 3: Self-read node creation -File: `pipeline_lineage_builder.py`, methods `_infer_table_name` and `_make_full_name`. +Files: `pipeline_lineage_builder.py`, methods `_make_full_name`, `_add_query_columns`, and `_is_physical_table_column`. + +**3a. Naming (`_make_full_name`, line 708).** Add a branch before the physical-table check: if the node is an input-layer column whose inferred table is in `query.self_referenced_tables`, return query-scoped naming instead of shared physical naming. + +```python +def _make_full_name(self, node: ColumnNode, query: ParsedQuery) -> str: + table_name = self._infer_table_name(node, query) + unit_id = node.unit_id + + # Self-read input columns get query-scoped naming to avoid colliding + # with the physical output node for the same table.column. + # + # Fallback: _infer_table_name may return None for ambiguous columns + # (e.g., when no alias qualifier is present). For input-layer columns + # in a query with self-referenced tables, fall back to node.table_name + # (the raw alias/table name from the SQL AST) before giving up. + if table_name is None and node.layer == "input": + candidate = node.table_name + if candidate and candidate in getattr(query, "self_referenced_tables", set()): + table_name = candidate + + if ( + node.layer == "input" + and table_name + and table_name in getattr(query, "self_referenced_tables", set()) + ): + return f"{query.query_id}:self_read:{table_name}.{node.column_name}" + + # ... existing physical / CTE / subquery branches unchanged ... +``` + +When `_infer_table_name` returns `None` for an input-layer column in a query with self-referenced tables, the implementation attempts to match by `node.table_name` (the raw alias from the SQL AST) before falling through to physical-table naming. Without this fallback, ambiguous columns would silently receive shared physical naming, causing the exact node collapse this design is intended to fix. The fallback is safe because `node.table_name` is only used when it matches a known self-referenced table; for non-self-referenced tables, the existing physical-table path applies as before. + +The `getattr` guard ensures backward compatibility if `self_referenced_tables` is not yet populated on older `ParsedQuery` instances (e.g., during incremental rollout or tests that construct `ParsedQuery` manually without the new field). + +**3b. Node attributes (`_add_query_columns`, line 285).** When creating the `ColumnNode` for a self-read input column, set `node_type="self_read"` on the pipeline-level node (not the query-lineage-level node). This enables renderers to display a human-friendly label like "dim_customer (prior state)." `operation` retains the original query-lineage-level `node.node_type` (e.g., `"direct_column"`, `"expression"`); only the pipeline-level `node_type` is set to `"self_read"`. + +**Diagnostic note:** In `_add_query_columns`, when a column with a physical-table `full_name` is dropped due to the deduplication guard (`if full_name in pipeline.columns: continue`) AND the current query has non-empty `self_referenced_tables`, emit a debug-level log warning. This helps diagnose cases where self-read detection fails and a self-read column silently falls back to physical naming, colliding with an existing output node. + +**Recommendation:** Both Phase 3a (`_make_full_name`) and Phase 3b (`_add_query_columns`) independently compute whether a node is a self-read column using slightly different expressions. To avoid drift if one evolves without the other, extract a shared helper `_is_self_read_column(node, query) -> bool` that encapsulates the self-read detection logic (`node.layer == "input"` and inferred table name is in `query.self_referenced_tables`). Both call sites should delegate to this helper. + +```python +# Inside the node creation loop in _add_query_columns: +is_self_read = ( + node.layer == "input" + and (self._infer_table_name(node, query) or node.table_name) + in getattr(query, "self_referenced_tables", set()) +) + +column = ColumnNode( + column_name=node.column_name, + table_name=self._infer_table_name(node, query) or node.table_name, + full_name=full_name, + query_id=query.query_id, + unit_id=node.unit_id, + node_type="self_read" if is_self_read else node.node_type, + layer=node.layer, + # ... remaining fields unchanged ... +) +``` + +**3d. Alias resolution for self-referenced tables.** When SQL aliases a self-referenced table (e.g., `LEFT JOIN dim_customer t`), the query-lineage-level `ColumnNode` has `table_name="t"` (the alias), not `"dim_customer"`. Phase 3a's fallback checks `node.table_name in self_referenced_tables`, but `self_referenced_tables` contains `"dim_customer"`, not `"t"`. Without alias resolution, aliased self-references are never detected as self-reads. + +Alias-to-table resolution belongs in `MultiQueryParser`, where `_get_table_name` and the `TemplateTokenizer` are available. The resolved mapping is stored on `ParsedQuery` so that `PipelineLineageBuilder` can consume it without needing AST-walking or tokenizer access. + +**During parsing** (`MultiQueryParser._parse_single_query` or `_extract_source_tables`): after populating `self_referenced_tables`, build the alias mapping in the same scope that already walks `exp.Table` nodes and calls `_get_table_name`. + +```python +# Inside _extract_source_tables (or _parse_single_query), after self_referenced_tables is populated: +self_ref_aliases: Dict[str, str] = {} +for table_node in ast.find_all(exp.Table): + resolved_name = self._get_table_name(table_node, tokenizer) + if resolved_name in self_referenced_tables: + alias = table_node.alias + if alias: + self_ref_aliases[alias] = resolved_name + +# Store on ParsedQuery: +# parsed_query.self_ref_aliases = self_ref_aliases +``` + +`ParsedQuery` gains a new field: `self_ref_aliases: Dict[str, str]` — a mapping from SQL alias to resolved table name, populated only for tables in `self_referenced_tables`. -Change: for input-layer columns whose table is in `query.self_referenced_tables`, produce a query-scoped name like `{query_id}:self_read:{table_name}.{column_name}` instead of the shared physical name. +**At `PipelineLineageBuilder` level** (Phase 3a fallback and Phase 3b): read `query.self_ref_aliases` directly — no AST walking or `_get_table_name` calls needed. -### Phase 4: Cross-query wiring +```python +# In the self-read check (Phase 3a fallback and condition): +candidate = table_name or node.table_name +resolved = query.self_ref_aliases.get(candidate, candidate) +if node.layer == "input" and resolved in query.self_referenced_tables: + return f"{query.query_id}:self_read:{resolved}.{node.column_name}" +``` + +The `self_ref_aliases` mapping must be available to both Phase 3a (`_make_full_name`) and Phase 3b (`_add_query_columns`). Store it as a per-query local or pass it through the shared helper `_is_self_read_column`. All `candidate in self_referenced_tables` checks in Phases 3a and 3b must resolve through this alias mapping first. + +**3c. Physical-table exclusion (`_is_physical_table_column`, line 757).** Self-read input columns must NOT be classified as physical table columns (which would route them to shared naming in `_make_full_name`). The current `_make_full_name` code calls `_is_physical_table_column` at line 732 and branches on it at line 734 **before** the self-read check would execute. Therefore, `_make_full_name` must be restructured so the self-read branch (from Phase 3a) is evaluated **before** the existing `_is_physical_table_column` call. The Phase 3a pseudocode already shows this correct ordering (the self-read check appears first); the implementation must match that ordering by moving the `_is_physical_table_column` call below the self-read branch. + +### Phase 4: Cross-query wiring (column-granular, topo-ordered) File: `pipeline_lineage_builder.py`, method `_add_cross_query_edges`. -Change: after the existing cross-query edge logic, add a pass that connects prior-statement output columns to self-read input columns. For each self-referenced table in a query, find the most recent query that wrote to that table (from topological order), and connect its output columns to the self-read nodes. +Change: the self-read cross-query wiring needs its own dedicated loop (or a separate method such as `_add_self_read_cross_query_edges`), because the existing `_add_cross_query_edges` loop starts with `if not table_node.created_by: continue`, which skips DML-only tables. MERGE/INSERT targets have `created_by=None` (they use `modified_by`), so the existing loop filters out the exact tables this feature targets. Do not attempt to add self-read wiring inside the existing loop. + +The new loop connects prior-statement output columns to self-read input columns. For each self-referenced table in a query, find the most recent query **that wrote each specific column** (from topological order, not `modified_by` list order), and connect that column's output node to the self-read input node. This lookup assumes all output nodes already exist when Phase 4 executes; this is guaranteed by the existing `PipelineLineageBuilder.build()` call sequence, which calls `_add_query_columns` (creating all output nodes) before `_add_cross_query_edges` (wiring cross-query relationships). + +**Note on DELETE queries:** DELETE queries produce no column lineage because `_extract_select_from_query` returns `None` for `exp.Delete`. The cross-query wiring loop naturally skips them (no output nodes to connect from). No special handling is needed — the loop simply finds no output columns for DELETE queries when searching for "most recent writer" of a given column. + +Column-granular lookup matters when multiple prior statements write to the same table but touch different columns. Example: Step 1 writes `id` + `name`, Step 2 writes only `name`, Step 3 self-reads both — Step 3's `self_read:id` should wire to Step 1 (only writer of `id`), while `self_read:name` wires to Step 2 (most recent writer of `name`). + +Implementation: the `sorted_query_ids` list from `PipelineLineageBuilder.build` (line 68, topological sort output) is the authoritative execution order. Pass it to the cross-query edge builder. For each self-read column, walk `sorted_query_ids` in reverse up to (but not including) the current query, and find the most recent query whose output nodes include that `{table_name}.{column_name}`. Connect that output node to the self-read input node. If no prior writer is found for a column, connect to the original source-state node (the physical input column from the table's pre-pipeline state). If no such pre-pipeline source-state node exists in `pipeline.columns` (e.g., the table is only ever self-referenced and was never an explicit source in a non-self-referencing query), the implementation must create one — a physical input `ColumnNode` with `full_name="{table_name}.{column_name}"`, `layer="input"`, representing the table's state before the pipeline began. ### Phase 5: Edge annotation @@ -259,6 +425,33 @@ File: `models.py`, class `ColumnEdge`. Change: add `statement_order: Optional[int] = None` and `edge_role: Optional[str] = None`. Populate these during Phase 3 and Phase 4. +`statement_order` is the **topological sort index** (0-based position in the `sorted_query_ids` list from `PipelineLineageBuilder.build`), not the insertion order or `query_id` string. This matches the execution-order semantics required by Phase 4 and aligns with Attack 8's design revision. + +`statement_order` is populated on **all** edges (not only self-read edges) for consistency — every edge carries the topo sort index of the query that produced it. This enables consumers to sort or filter edges by execution order without checking `edge_role` first. To make this possible, `PipelineLineageBuilder` stores `sorted_query_ids` as an instance attribute (set once during `build()` after topological sort completes), and `_add_query_edges` reads `self.sorted_query_ids` to look up the index for the current `query_id` when stamping each edge. + +`edge_role` values: `"prior_state_read"` (self-read edge within a query), `"cross_query_self_ref"` (cross-query edge connecting prior output to self-read input), or `None` (normal edge). These values are string literals, consistent with the existing `edge_type` convention on `ColumnEdge` (which also uses string literals like `"lineage"`, `"rename"`, etc.). An enum is not introduced to avoid a new import dependency for a three-value field. + +### Phase 6: `get_self_read_columns` convenience method + +File: `pipeline.py`, class `Pipeline`. + +```python +def get_self_read_columns(self, table_name: str) -> list[ColumnNode]: + return [ + node + for node in self.column_graph.nodes + if node.node_type == "self_read" and node.table_name == table_name + ] +``` + +This method filters the pipeline's column graph for nodes tagged `node_type="self_read"` with a matching `table_name`, providing a convenient API for acceptance criterion 7. It returns an empty list for tables that are never self-referenced. + +**`get_column` disambiguation:** After this change, `get_column(table_name, column_name)` could match both an output node and a self-read node for the same table and column. To avoid nondeterministic results, `get_column` should prefer `layer="output"` over `layer="input"` when multiple candidates match, regardless of whether `query_id` is specified. Even with a `query_id`, there can be two matches for the same `(table_name, column_name, query_id)` tuple: a self-read input node and an output node. Implementation: filter candidates as before, then prefer `layer="output"` over `layer="input"` when multiple matches exist. + +Note on backward compatibility: `ColumnEdge.__eq__` (`models.py:632-639`) compares only `from_node`, `to_node`, and `edge_type` — not all fields. `__hash__` uses the same three fields. Adding optional fields with `None` defaults does not change equality or hash behavior. All existing test assertions use either attribute access (`edge.is_merge_operation`) or keyword construction (`ColumnEdge(from_node=..., to_node=..., edge_type=...)`) — neither is affected. Verified by grep: 14 `ColumnEdge(` constructions across `test_metadata_propagation.py` and `test_pipeline_diff.py`, all use keyword arguments with no positional args. + +`ColumnNode.__hash__` and `__eq__` use `full_name` only, so self-read nodes with unique `{query_id}:self_read:*` names introduce no collision risk with existing physical or CTE column nodes. + ## Scope ### In scope @@ -273,27 +466,30 @@ Change: add `statement_order: Optional[int] = None` and `edge_role: Optional[str - **Cross-notebook state.** Each `Pipeline` instance is self-contained; table state does not carry across separate `Pipeline()` calls. - **Idempotent re-runs.** The design assumes statements execute once in order. Re-execution semantics (e.g., Airflow retries) are not modeled. - **Gap 7 (join-predicate columns).** The self-read nodes will not capture join-predicate influence on output columns. That is a separate design. -- **MERGE within a single statement that reads its own target.** A single MERGE statement's `USING` subquery referencing the target table is already handled by the query parser's alias resolution within one `QueryUnit`. This design addresses *cross-statement* self-reference only. +- **MERGE within a single statement that reads its own target via USING.** A single MERGE statement's `USING` subquery referencing the target table is already handled by the query parser's alias resolution within one `QueryUnit`. The USING clause creates an explicit source alias, so the target and source are already distinct within the single-query lineage builder. This non-goal applies specifically to MERGE's USING clause. **Note:** single-statement `INSERT INTO t SELECT ... FROM t` is **in scope** — it has a genuine self-reference (the FROM/JOIN reads the target's prior state) that is not resolved by alias separation. Test 9 covers this case. The distinction: MERGE's USING clause already separates target from source by design; INSERT's FROM clause does not. ## Acceptance Criteria ### Structural -1. `ParsedQuery` has a `self_referenced_tables: Set[str]` field. +1. `ParsedQuery` has a `self_referenced_tables: Set[str]` field and a `self_ref_aliases: Dict[str, str]` field (mapping SQL aliases to resolved table names for self-referenced tables only, populated during parsing). 2. `ColumnEdge` has `statement_order: Optional[int]` and `edge_role: Optional[str]` fields. -3. `_extract_source_tables` does not filter `destination_table` from `source_tables` when the table appears in the query body (not just as the INSERT/MERGE target). +3. `_extract_source_tables` does not filter `destination_table` from `source_tables` when the table appears in a **source scope** (FROM/JOIN/USING), as detected by AST node identity (target-slot vs. source-scope), not name matching alone. Table nodes in the target slot (`Insert.this`, `Merge.this`, etc.) are still excluded. 4. `_build_query_dependencies` does not create self-dependency cycles. 5. Self-read input columns use query-scoped naming (`{query_id}:self_read:{table}.{column}`). +6. Self-read `ColumnNode` instances have `node_type="self_read"`. +7. `Pipeline` exposes `get_self_read_columns(table_name: str) -> List[ColumnNode]` returning all self-read nodes for a given physical table. +8. `Pipeline.trace_column_forward` and `Pipeline.trace_column_backward` traverse all edge roles (including `"prior_state_read"` and `"cross_query_self_ref"`) by default. Note: the underlying BFS functions in `lineage_tracer.py` are module-level functions, not methods on a `LineageTracer` class. Verify that these functions traverse all edge types without filtering by `edge_role`. ### Functional -6. For the SCD2 two-step fixture (MERGE + INSERT on `dim_customer`), the pipeline graph contains: +9. For the SCD2 two-step fixture (MERGE + INSERT on `dim_customer`), the pipeline graph contains: - Edges from `staging_customer_latest` columns to `dim_customer` output columns (both steps). - Self-read nodes for `dim_customer` columns read by Step 2. - Cross-query edges from Step 1's `dim_customer` output to Step 2's self-read nodes. - No self-loop edges (no edge where `from_node.table_name == to_node.table_name` on the same physical node with no intermediate). -7. Impact analysis: "what depends on `staging_customer_latest.city`?" returns `dim_customer.city` (both steps) and `dim_customer.end_time` (Step 1 close), `dim_customer.is_active` (Step 1 close). -8. Backward lineage: "where does `dim_customer.id` come from?" returns both `staging_customer_latest.id` (direct) and the self-read chain through prior `dim_customer.id`. +10. Impact analysis: "what depends on `staging_customer_latest.city`?" returns `dim_customer.city` (both steps) and `dim_customer.end_time` (Step 1 close), `dim_customer.is_active` (Step 1 close). +11. Backward lineage: "where does `dim_customer.id` come from?" returns both `staging_customer_latest.id` (direct) and the self-read chain through prior `dim_customer.id`. ### Test Plan @@ -302,9 +498,12 @@ Tests live in `tests/test_cdc_scd_pipeline.py` (created by the parent design's d **Test 1: Self-reference detected.** Given the SCD2 two-step SQL, assert that Step 2's `ParsedQuery.self_referenced_tables` contains `"dim_customer"` and `ParsedQuery.source_tables` contains `"dim_customer"`. -**Test 2: No topological cycle.** +**Test 2a: No topological cycle.** Given the SCD2 two-step SQL, assert that `pipeline.table_graph.topological_sort()` succeeds (does not raise `CycleError`) and returns Step 1 before Step 2. +**Test 2b: Direct self-exclusion in `_build_query_dependencies`.** +Given the SCD2 two-step SQL, call `pipeline.table_graph._build_query_dependencies()` directly and assert that `deps["query_1"]` does not contain `"query_1"` (no self-dependency). This tests the Phase 2 fix independently of topological sort's cycle-handling behavior. + **Test 3: Self-read nodes exist.** Assert that the pipeline column graph contains nodes matching `*:self_read:dim_customer.*` with `layer="input"`. @@ -318,10 +517,38 @@ Assert that no edge in `pipeline.column_graph.edges` has `from_node.full_name == Starting from `staging_customer_latest.city`, forward-traverse the graph and assert `dim_customer.city` is reachable through both Step 1 and Step 2 paths. **Test 7: DELETE-then-INSERT pattern.** -Same shape as SCD2 but using `DELETE FROM dim_customer WHERE ...` followed by `INSERT INTO dim_customer SELECT ...`. Assert self-read nodes and cross-query edges are created. +Using `DELETE FROM dim_customer WHERE ...` followed by `INSERT INTO dim_customer SELECT ... FROM staging LEFT JOIN dim_customer ...`. Assert: +(a) DELETE is recognized as DML targeting `dim_customer` (requires `_extract_operation_and_destination` to handle `exp.Delete`). +(b) INSERT's self-read nodes exist for the `dim_customer` columns referenced in the LEFT JOIN. +(c) INSERT's self-read nodes wire to the original source-state `dim_customer` columns (pre-pipeline), not to DELETE output — because DELETE produces no column lineage (no output columns). +(d) No cross-query edges from DELETE to INSERT exist (since DELETE has no column output nodes to connect from). **Test 8: Non-self-referencing pipeline unchanged.** -A pipeline with `CREATE TABLE a AS SELECT ... FROM b; CREATE TABLE c AS SELECT ... FROM a` should produce zero self-read nodes and zero `edge_role="prior_state_read"` edges. Regression guard. +A pipeline with `CREATE TABLE a AS SELECT ... FROM b; CREATE TABLE c AS SELECT ... FROM a` should produce zero self-read nodes, zero `edge_role="prior_state_read"` edges, **and `self_referenced_tables == set()` on every `ParsedQuery`**. This locks down both the detection step (ParsedQuery field) and the wiring step (graph nodes/edges) independently. + +**Test 9: Single-statement self-reference.** +`INSERT INTO t SELECT a, b FROM source LEFT JOIN t ON source.id = t.id` as a single-query pipeline. Assert self-read nodes exist for `t.id` and no self-loop edges. + +**Test 10: `statement_order` reflects topo sort output for independent statements.** +Submit two DML statements that both target `dim_customer` but have no inter-dependency (neither reads a table the other creates — e.g., both read only from `staging_customer_latest`). Since there is no dependency edge between them, topo sort may return them in any valid order. The test asserts that `statement_order` on edges matches the topo sort output (whatever it is), not the submission order. Specifically: retrieve `sorted_query_ids` from the pipeline's topological sort, then for each edge verify that `edge.statement_order` equals the index of `edge.query_id` in `sorted_query_ids`. Note: for independent statements the topo sort order may coincide with submission order — the test validates the assignment mechanism, not forced reordering. Forced reordering requires an explicit dependency between statements (covered by Tests 2a, 4, and 11). + +**Test 11: Column-granular cross-query wiring (three-step chain).** +Three-query pipeline: Step 1 writes `{id, name, city}` to `dim_customer`, Step 2 writes only `{name}` to `dim_customer`, Step 3 self-reads `{id, name}` from `dim_customer`. Assert that Step 3's `self_read:id` wires to Step 1's output (the only writer of `id`) and `self_read:name` wires to Step 2's output (the most recent writer of `name`). This validates Phase 4's per-column "most recent writer" resolution. + +**Test 12: INSERT with explicit column list does not spuriously self-reference.** +`INSERT INTO dim_customer (id, name, city) SELECT s.id, s.name, s.city FROM staging s` (no self-read in the body). Assert `self_referenced_tables == set()` — the target's `exp.Schema`-wrapped Table node must be excluded by Phase 1's `target_table_nodes` set. + +**Test 13: MERGE with USING does not spuriously self-reference.** +`MERGE INTO dim_customer t USING staging s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.name = s.name WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.id, s.name)` as a single-query pipeline. Assert `self_referenced_tables == set()`. The MERGE's USING clause already separates target from source by design (the target `t` is resolved via `Merge.this`, which is in the target slot). This validates the non-goal boundary claim that MERGE's USING-based self-reference is handled by existing alias resolution and should not trigger self-read node creation. Additionally, the test should include a structural assertion: verify that `_extract_source_tables` returns only `{"staging"}` (not `{"staging", "dim_customer"}`) for the MERGE statement, confirming the target's `exp.Table` node is correctly identified as target-slot-only and excluded from `source_tables`. + +**Test 14: `get_self_read_columns` API.** +Given the SCD2 two-step SQL, assert that `pipeline.get_self_read_columns("dim_customer")` returns a non-empty list of `ColumnNode` instances, each with `node_type="self_read"` and `table_name="dim_customer"`. For the non-self-referencing pipeline from Test 8, assert it returns an empty list. + +**Test 15: LineageTracer traverses self-read edges.** +Given the SCD2 two-step SQL, use `pipeline.trace_column_forward("staging_customer_latest", "city")` and assert the result includes `dim_customer.city` reached via the self-read path (through `query_1:self_read:dim_customer.*` nodes). Similarly, `pipeline.trace_column_backward("dim_customer", "id")` must include the self-read chain. This validates acceptance criterion 8 (Pipeline traversal). + +**Test 16: Aliased self-reference detected.** +`INSERT INTO dim_customer SELECT s.id, s.name, s.city, s.email FROM staging s LEFT JOIN dim_customer t ON s.id = t.id WHERE t.id IS NULL` as a single-query pipeline. The alias `t` refers to `dim_customer`. Assert that self-read nodes are created for the aliased reference (e.g., `query_0:self_read:dim_customer.id`), not silently missed because `node.table_name == "t"` fails to match `self_referenced_tables == {"dim_customer"}`. This validates Phase 3d's alias resolution. ## Hostile Review @@ -329,9 +556,9 @@ A pipeline with `CREATE TABLE a AS SELECT ... FROM b; CREATE TABLE c AS SELECT . **Concern:** Users who call `pipeline.get_column("dim_customer", "id")` or iterate `pipeline.columns` filtering by `table_name == "dim_customer"` will not find the self-read nodes. Impact queries that walk edges will silently miss the self-read path. -**Response:** This is valid. The `get_column` API already accepts an optional `query_id` parameter (`pipeline.py:201`). Self-read nodes should be findable via `get_column("dim_customer", "id", query_id="step2")`. Additionally, impact-analysis traversal must be updated to follow cross-query edges through self-read nodes. The `LineageTracer` (lazy-loaded in `pipeline.py:148`) must be audited to confirm it traverses `edge_role="prior_state_read"` edges. +**Response:** This is valid. The `get_column` API already accepts an optional `query_id` parameter (`pipeline.py:201`). Self-read nodes should be findable via `get_column("dim_customer", "id", query_id="query_1")`. Additionally, impact-analysis traversal must be updated to follow cross-query edges through self-read nodes. The `LineageTracer` (lazy-loaded in `pipeline.py:148`) must be audited to confirm it traverses `edge_role="prior_state_read"` edges. -**Design revision:** Add a convenience method `pipeline.get_self_read_columns(table_name)` that returns all self-read nodes for a given physical table. Document that `pipeline.columns` includes self-read nodes (they are not hidden). Ensure `LineageTracer.trace_forward` and `trace_backward` traverse all edge roles by default. +**Design revision:** Add a convenience method `pipeline.get_self_read_columns(table_name)` that returns all self-read nodes for a given physical table. Document that `pipeline.columns` includes self-read nodes (they are not hidden). Ensure `LineageTracer.trace_column_forward` and `trace_column_backward` traverse all edge roles by default. ### Attack 2: Graph rendering blow-up — self-read nodes double the node count for self-referenced tables @@ -339,7 +566,7 @@ A pipeline with `CREATE TABLE a AS SELECT ... FROM b; CREATE TABLE c AS SELECT . **Response:** Accepted limitation with mitigation. The number of self-read nodes scales with `(number of self-referencing statements - 1) * columns_read`, not total columns. In practice, SCD2 Step 2 reads only the match columns (id, is_active) from the prior state, not all 50 columns. The column extractor only creates input nodes for columns actually referenced in the SQL. -For rendering, add an optional `collapse_self_reads=True` parameter to the GraphViz/pyvis exporters that merges self-read nodes back into their physical counterparts (restoring the current collapsed view) for visual simplicity while keeping the full graph for programmatic queries. +For rendering, a future follow-up could add an optional `collapse_self_reads=True` parameter to the GraphViz/pyvis exporters that merges self-read nodes back into their physical counterparts (restoring the current collapsed view) for visual simplicity while keeping the full graph for programmatic queries. **This exporter enhancement is deferred to a separate PR** to keep this change focused on correctness. The core graph is always complete; rendering collapse is a presentation concern. **Tracked as:** Deferred Item D1 in the Deferred Work section below. ### Attack 3: Performance on large pipelines — extra pass over all queries for self-reference detection @@ -353,11 +580,11 @@ For rendering, add an optional `collapse_self_reads=True` parameter to the Graph **Response:** This is a real concern but is inherent to static analysis. clgraph does not execute SQL; it infers lineage from the SQL text. The "read prior state" assumption is correct for the dominant use case (Databricks/Delta SCD2) and is a reasonable default for all dialects. For dialects where MERGE has different visibility semantics, the lineage is still a useful approximation. -**Design revision:** Add a `dialect_note` to the self-read edge metadata explaining the assumption: "Assumes snapshot isolation: self-read sees table state before this statement executed." This is informational, not behavioral. +**Design revision:** Add a `dialect_note` to the self-read edge metadata explaining the assumption: "Assumes snapshot isolation: self-read sees table state before this statement executed." This is informational, not behavioral. **Tracked as:** Deferred Item D2 in the Deferred Work section below. The `dialect_note` field is a nice-to-have that does not affect correctness; it can land in a follow-up PR without blocking this design. ### Attack 5: User mental-model confusion — what IS a self-read node? -**Concern:** Users see `step2:self_read:dim_customer.id` in the graph and don't understand what it means. The naming convention (`query_id:self_read:table.column`) is internal jargon. +**Concern:** Users see `{query_id}:self_read:dim_customer.id` in the graph and don't understand what it means. The naming convention (`query_id:self_read:table.column`) is internal jargon. **Response:** Valid. The naming follows the existing convention for CTE nodes (`query_id:cte:name.column`) and subquery nodes (`query_id:subq:name.column`), both of which are already exposed in the graph. Users who understand CTE nodes will understand self-read nodes by analogy. @@ -367,9 +594,13 @@ For rendering, add an optional `collapse_self_reads=True` parameter to the Graph **Concern:** Adding `statement_order` and `edge_role` to `ColumnEdge` means every test that constructs or asserts on edges must account for new optional fields. If defaults are `None`, existing tests should pass, but assertion helpers that do exact-match comparisons may break. -**Response:** Both fields default to `None`. The `ColumnEdge` dataclass uses keyword arguments with defaults, so existing construction code is unaffected. Assertion helpers in tests use attribute access (`edge.is_merge_operation`, `edge.merge_action`), not positional matching. No existing test should break. +**Response:** Both fields default to `None`. The `ColumnEdge` dataclass uses keyword arguments with defaults, so existing construction code is unaffected. -**Verification:** grep all test files for `ColumnEdge(` construction and `assert.*edge` patterns to confirm no exact-match comparisons exist. +**Verification (completed):** Grep results confirm safety: +- 14 `ColumnEdge(` constructions across `test_metadata_propagation.py` (12) and `test_pipeline_diff.py` (2). All use keyword arguments (`from_node=..., to_node=..., edge_type=...`), no positional args. +- `ColumnEdge.__eq__` (`models.py:632-639`) compares only `from_node`, `to_node`, and `edge_type` — not all fields. `__hash__` (`models.py:629-630`) uses the same three fields. +- All test edge assertions use attribute access (`edge.is_merge_operation`, `edge.qualify_function`, etc.) or count checks (`len(edges) == N`), not full-object equality comparisons. +- No existing test should break. ### Attack 7: What happens when the same table is self-referenced within a SINGLE statement? @@ -389,10 +620,19 @@ This is actually **desirable** — a single `INSERT INTO t SELECT ... FROM t` do **Design revision:** Phase 4 (cross-query wiring) must use the topological sort order, not `modified_by` list order, to determine the "most recent prior writer." The `sorted_query_ids` list from `PipelineLineageBuilder.build` (line 68) is the correct ordering. Store it and pass it to the cross-query edge builder. -### Revised Test Plan Addition +*(Tests 9–15 are in the main Test Plan section above.)* -**Test 9: Single-statement self-reference.** -`INSERT INTO t SELECT a, b FROM source LEFT JOIN t ON source.id = t.id` as a single-query pipeline. Assert self-read nodes exist for `t.id` and no self-loop edges. +## Deferred Work + +Items deferred from this design to keep the PR focused on correctness. Each should be tracked as a follow-up issue or PR. + +**D1: Exporter collapse mode for self-read nodes.** +Add an optional `collapse_self_reads=True` parameter to GraphViz/pyvis exporters that merges self-read nodes back into their physical counterparts for visual simplicity. Origin: Attack 2. + +**D2: `dialect_note` metadata on self-read edges.** +Add a `dialect_note: Optional[str]` field to `ColumnEdge` populated with "Assumes snapshot isolation: self-read sees table state before this statement executed." for self-read edges. Informational only, no behavioral impact. Origin: Attack 4. + +## Known Limitations -**Test 10: Out-of-order query submission.** -Submit Step 2 (INSERT) before Step 1 (MERGE) in the pipeline constructor. Assert that topological sort still places MERGE before INSERT and self-read wiring is correct. +**Unqualified columns in self-referencing queries remain ambiguous.** +When a self-referencing query references columns without a table qualifier (e.g., bare `id` instead of `t.id`) and the query has multiple source tables, `_infer_table_name` may return `None` and `node.table_name` may also be `None`. In this case, the column falls through to physical naming and will not receive self-read treatment. This is consistent with existing behavior for any multi-source query where unqualified columns are ambiguous — it is not a regression introduced by this design. Users should qualify columns with table aliases in self-referencing queries for correct lineage tracking. diff --git a/docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md b/docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md index c09f7c3..35cf578 100644 --- a/docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md +++ b/docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md @@ -36,7 +36,7 @@ for join in joins: self._parse_from_sources(join, unit, depth) ``` -`_parse_from_sources` (`query_parser.py:1370-1415`) processes the `join.this` (the table/subquery being joined) to extract table names, aliases, and subqueries. The ON clause (`join.args.get("on")`) is **never accessed** in the non-MERGE path. No column references from the ON clause are extracted or stored on the `QueryUnit`. +`_parse_from_sources` (`query_parser.py:693`) processes the `join.this` (the table/subquery being joined) to extract table names, aliases, and subqueries. The ON clause (`join.args.get("on")`) is **never accessed** in the non-MERGE path. No column references from the ON clause are extracted or stored on the `QueryUnit`. ### 2. The QueryUnit has no field for join predicates @@ -46,7 +46,7 @@ for join in joins: `lineage_builder.py:130-158` processes each unit by extracting output columns (`_extract_output_columns`) and tracing their dependencies (`_trace_column_dependencies`). Both operate exclusively on SELECT-list expressions. The ON clause is not examined. -`_extract_source_column_refs` (`lineage_builder.py:889-948`) walks expression ASTs to find `exp.Column` nodes -- but it only receives SELECT-list expressions, never ON-clause expressions. +`_extract_source_column_refs` (`lineage_builder.py:889-974`) walks expression ASTs to find `exp.Column` nodes -- but it only receives SELECT-list expressions, never ON-clause expressions. ### 4. MERGE already has the pattern we need @@ -96,7 +96,7 @@ join_side: Optional[str] = None # "left" or "right" (which side of the jo **Code touched:** - `models.py`: add three fields to `ColumnEdge`. -- `query_parser.py`: in `_parse_select_statement`, after parsing JOINs, extract ON-clause column refs and store on `QueryUnit` as `join_predicates: List[JoinPredicateInfo]`. +- `query_parser.py`: in `_parse_select_unit`, after parsing JOINs, extract ON-clause column refs and store on `QueryUnit` as `join_predicates: List[JoinPredicateInfo]`. - `models.py`: add `JoinPredicateInfo` dataclass (join_condition_sql, left_columns, right_columns, join_type). - `lineage_builder.py`: in `_process_unit`, after step 4 (trace dependencies), add a new step to create join-predicate edges. For each join predicate, for each projected output column from the joined table, emit an edge from each ON-clause-only column to that output column with `is_join_predicate=True`. - `trace_strategies.py`: no change needed (regular column tracing stays the same). @@ -217,9 +217,9 @@ raw_orders.customer_id ───[P]────────> output.customer_cit raw_orders.order_ts ──────[P]────────> output.customer_city (left-side pred -> right-side projected) ``` -This is the final recommended shape. Predicate columns from both sides connect to projected columns from the *opposite* side of the join (the side whose row selection they constrain). Self-side predicate columns (e.g., `d.start_time` constraining which `d` row is selected) also connect to their own side's projected columns, since they constrain the row that contributes the value. +This is the final recommended shape. -**Final refined rule:** all ON-clause columns connect to all projected output columns from the join's non-FROM side (the joined table). For multi-way joins, each JOIN's predicate columns connect to projected columns from that specific JOIN's right-side table. +**Final refined rule:** all ON-clause columns (from both sides of the join) connect to all projected output columns that are sourced from the JOIN's right-side table. For multi-way joins, each JOIN's predicate columns connect to projected columns from that specific JOIN's right-side table. ## Scope @@ -230,12 +230,13 @@ This is the final recommended shape. Predicate columns from both sides connect t - Emit `is_join_predicate=True` edges from predicate columns to projected output columns. - Support: equi-joins, range joins (BETWEEN), theta joins (>, <, >=, <=, <>), function-wrapped join keys (e.g., `UPPER(a.name) = UPPER(b.name)`), compound predicates (AND/OR). - All existing dialects (bigquery, postgres, snowflake, databricks, duckdb, spark, trino, redshift, mysql). +- **Interaction with Gap 4 self-referencing targets.** When a self-referencing query uses a JOIN whose right-side table is the self-referenced target, predicate columns must resolve through Gap 4's self-read naming. See "Gap 4 Interaction" section below. ### Non-goals - **USING clauses.** USING(col) is syntactic sugar for ON a.col = b.col. It can be handled as a follow-up; the AST shape is different (`exp.Using` vs `exp.On`). - **NATURAL JOIN.** Implicit equi-join on all same-named columns. Requires schema knowledge to resolve. Out of scope. -- **LATERAL JOIN correlation.** Already handled by the lateral correlation path (`lineage_builder.py:160-162`). This design does not change that path. +- **LATERAL JOIN correlation.** The outer correlation reference is already handled by the lateral correlation path (`lineage_builder.py:160-162`). This design does not change that path. JOINs within a LATERAL subquery are regular JOINs and are in scope. - **Correlated subqueries in ON.** E.g., `ON a.id = (SELECT max(id) FROM ...)`. Rare in practice; the subquery parsing path already handles subqueries in WHERE/HAVING but not in ON. Follow-up. - **CROSS JOIN.** No ON clause; nothing to extract. - **Implicit joins (comma joins with WHERE predicate).** WHERE-clause predicates that act as join conditions are a separate problem. Out of scope. @@ -257,7 +258,7 @@ This is the final recommended shape. Predicate columns from both sides connect t 2. Add `join_predicates: List[JoinPredicateInfo]` field to `QueryUnit`. -3. In `query_parser._parse_select_statement`, after the JOIN loop (line 178), iterate joins again and extract ON-clause columns: +3. In `query_parser._parse_select_unit`, after the JOIN loop (line 178), iterate joins again and extract ON-clause columns: ```python for join in joins: on_clause = join.args.get("on") @@ -273,7 +274,8 @@ This is the final recommended shape. Predicate columns from both sides connect t )) ``` -4. Implement `_extract_join_predicate_columns` by reusing `_extract_source_column_refs` or by walking `exp.Column` nodes in the ON clause. +4. Implement `_extract_join_predicate_columns` by walking `exp.Column` nodes in the ON-clause expression tree directly. Do NOT reuse `_extract_source_column_refs` — it returns 6-tuples with JSON/nested-path metadata irrelevant for join predicates. A simpler dedicated walker that returns `List[Tuple[Optional[str], str]]` is cleaner. + Note: literal comparisons in ON clauses (e.g., `t.is_active = 'Y'`) produce column refs for the column side only (`t.is_active`); the literal `'Y'` is not an `exp.Column` node and is correctly ignored by the `exp.Column` walker. No special handling is required. ### Phase 2: Emit predicate edges (lineage_builder.py, models.py) @@ -284,7 +286,7 @@ This is the final recommended shape. Predicate columns from both sides connect t join_side: Optional[str] = None ``` -2. In `lineage_builder._process_unit`, add a new step after step 4 (window function edges): +2. In `lineage_builder._process_unit`, add a new step after step 8 (window function edges): ```python # 9. Create join predicate edges if unit.join_predicates: @@ -293,9 +295,27 @@ This is the final recommended shape. Predicate columns from both sides connect t 3. Implement `_create_join_predicate_edges`: - For each `JoinPredicateInfo` on the unit: - - Identify which output columns are sourced from the joined (right-side) table. - - For each column ref in the predicate, resolve to a source node. + - **Identify which output columns are sourced from the right-side table** using alias qualification: + 1. For each output column, walk its source expression AST to find `exp.Column` nodes. + 2. Check `column.table` (the alias qualifier) against `info.right_table` (the right-side alias). + 3. If the alias matches → this output column is from the right side; it receives predicate edges. + 4. If an output column is unqualified and ambiguous (could belong to either side), emit a + debug-level warning and skip predicate edges for that column. Users are expected to write + lineage-friendly SQL with explicit table qualifiers. + 5. If an output column's expression combines columns from both sides (e.g., `COALESCE(a.val, b.val)`), + treat it as right-side if *any* source column is from the right side. + - **Resolve each predicate column ref to a source node:** + 1. Use `unit.alias_mapping` to resolve the table alias from `(table_ref, col_name)` to a physical table name. + 2. Look up the corresponding input `ColumnNode` in the lineage graph by `(resolved_table, col_name)`. + 3. If not found (column is referenced in ON but never created as an input node), create the input node. + This parallels how `_create_qualify_edges` resolves columns via `_resolve_qualify_column`. + Note: unlike `_create_qualify_edges` (which targets a single representative output column — the first non-star output column), `_create_join_predicate_edges` targets ALL output columns sourced from the right-side table. - Emit a `ColumnEdge` from each predicate column to each right-side projected column, with `is_join_predicate=True`, `join_condition=info.condition_sql`, `edge_type="join_predicate"`. + Note: `"join_predicate"` is a new addition to the `edge_type` value set. Existing code that switches on `edge_type` (e.g., visualization, serialization, export) should be audited for exhaustive handling to ensure this new value is not silently ignored. + Note: `edge_type="join_predicate"` is intentionally distinct from the existing `"join"` type. Because `ColumnEdge.__eq__` and `__hash__` include `edge_type`, a column can have both a value edge (`edge_type="direct"`) and a predicate edge (`edge_type="join_predicate"`) to the same target node — this is by design and ensures both are preserved. + - **`join_side` assignment:** For each predicate column edge, set `join_side="left"` if the predicate column's table alias matches a left-side (FROM) table, `join_side="right"` if it matches the right-side table. This is determined by checking `table_ref` against `info.right_table`. + +4. **Update `_add_query_edges` in `pipeline_lineage_builder.py`** (around line 460): Add `is_join_predicate`, `join_condition`, and `join_side` to the explicit field list in `_add_query_edges` (pipeline_lineage_builder.py, around line 460). Without this, predicate edge metadata is silently lost when edges are copied into the pipeline graph. ### Phase 3: Tests @@ -377,6 +397,78 @@ Using `SQLColumnTracer`, verify that: The CDC BETWEEN join produces identical predicate edges across bigquery, postgres, snowflake, databricks. +### Test 8: Self-referencing query with JOIN predicates (Gap 4 interaction) + +```sql +INSERT INTO dim_customer +SELECT s.id, s.name, s.city, s.email, + COALESCE(t.is_active, 'Y') AS is_active +FROM staging s +LEFT JOIN dim_customer t + ON s.id = t.id AND t.is_active = 'Y' +WHERE t.id IS NULL OR (t.name <> s.name OR t.city <> s.city) +``` + +Single-query pipeline. Assert: +- Gap 4 self-read nodes exist: `query_0:self_read:dim_customer.id`, `query_0:self_read:dim_customer.is_active`. +- Gap 7 predicate edges exist from self-read nodes (not physical nodes): + - `query_0:self_read:dim_customer.id` -[P]-> `dim_customer.is_active` (output) with `is_join_predicate=True`. + - `query_0:self_read:dim_customer.is_active` -[P]-> `dim_customer.is_active` (output) with `is_join_predicate=True`. + - `staging.id` -[P]-> `dim_customer.is_active` (output) with `is_join_predicate=True`. +- No predicate edge has a `from_node` with physical `dim_customer.*` naming (all self-ref predicate sources go through self-read nodes). +- No predicate edge exists from `dim_customer.name` or `dim_customer.city` — those appear only in the WHERE clause, which is out of scope for this feature. + +### Test 9: Self-referencing multi-statement pipeline with JOIN predicates (Gap 4 + Gap 7) + +SCD2 two-step fixture (MERGE + INSERT). Assert: +- Step 2's ON-clause predicate columns (`t.id`, `t.is_active`) resolve to self-read nodes. +- Cross-query edges from Step 1's output to Step 2's self-read nodes exist (Gap 4). +- Predicate edges from self-read nodes to Step 2's output columns exist (Gap 7). +- Impact analysis from `staging.id` reaches `dim_customer.is_active` through both the value path and the predicate path. + +### Test 10: Unqualified predicate column emits warning + +```sql +SELECT a.id, name +FROM table_a a +INNER JOIN table_b b ON a.id = b.id +``` + +`name` is unqualified and ambiguous (could be from `a` or `b`). Assert: +- A debug-level warning is emitted about the ambiguous column. +- Predicate edges are still created for the qualified ON-clause columns (`a.id`, `b.id`). +- The unqualified `name` output column does not receive predicate edges (conservative: skip on ambiguity). + +## Gap 4 Interaction + +Gap 4 (self-referencing targets, landed in PR #61) introduced query-scoped self-read nodes for tables that appear as both source and destination in the same query or across consecutive pipeline statements. Gap 7 must integrate with this mechanism. + +### How it works + +When a self-referencing query has a JOIN whose right-side table is the self-referenced target: + +1. **Parsing (Phase 1):** `JoinPredicateInfo.right_table` captures the alias of the joined table (e.g., `"t"` for `LEFT JOIN dim_customer t`). `ParsedQuery.self_ref_aliases` (from Gap 4) maps `"t"` -> `"dim_customer"`. + +2. **Predicate column resolution (Phase 2):** When resolving a predicate column ref like `(table_ref="t", col_name="is_active")`: + - Resolve alias: `t` -> `dim_customer` via `unit.alias_mapping`. + - Check if `dim_customer` is in `query.self_referenced_tables`. + - If yes: the input node uses Gap 4's self-read naming: `{query_id}:self_read:dim_customer.is_active`. + - Look up this self-read node in the lineage graph (it was created by `pipeline_lineage_builder._add_query_columns` during Gap 4 processing). + +3. **Edge emission:** The predicate edge's `from_node` is the self-read node, not the physical `dim_customer.is_active` node. This is correct: the predicate reads the *prior state* of `dim_customer`, which is exactly what self-read nodes represent. + +### Implementation note + +Gap 7's predicate column resolution in `_create_join_predicate_edges` must check `query.self_referenced_tables` (or the pipeline-level equivalent) before resolving column names. If the predicate column's table is self-referenced, use the `{query_id}:self_read:{table}.{column}` naming convention to find the source node. + +At the single-query `lineage_builder.py` level, `self_referenced_tables` is not directly available (it lives on `ParsedQuery`, which is a multi-query concept). Two options: + +**Option A (recommended):** Gap 7's predicate edge emission happens at the `lineage_builder.py` level (single-query). The self-read renaming happens at the `pipeline_lineage_builder.py` level (multi-query) via `_make_full_name`. Since `_make_full_name` already routes self-read input columns to query-scoped names, predicate edges created at the single-query level will get renamed when `pipeline_lineage_builder._add_query_edges` copies them into the pipeline graph. **However, `_add_query_edges` (pipeline_lineage_builder.py:427-471) explicitly names every field it copies — it does NOT automatically copy new fields.** The new `is_join_predicate`, `join_condition`, and `join_side` fields must be added to the explicit field list in `_add_query_edges` (see Phase 2 required step below). Without this wiring, predicate edge metadata is silently lost when edges are copied into the pipeline graph. No special self-read handling is needed in `_create_join_predicate_edges` itself — but the pipeline-level copy must be updated. + +**Option B:** Add self-read awareness to `_create_join_predicate_edges`. This is only needed if the single-query lineage graph must be self-read-aware independently of the pipeline builder. + +Option A is preferred because it keeps Gap 7 implementation simple and leverages Gap 4's existing renaming infrastructure. + ## Hostile Review ### Attack 1: Graph blow-up on wide schemas @@ -447,6 +539,8 @@ However, tests that use `graph.edges` directly (e.g., `total_edges = [e for e in **Revision:** Before landing, run the full test suite against the implementation and fix any count-based assertions. The likely fix is to filter: `[e for e in graph.edges if e.to_node.full_name == X and not e.is_join_predicate]` in tests that care about value-only edges, or update count expectations. +**Risk note:** `test_join_types.py:254` uses `edges[0]` index access, which depends on edge insertion order. Since `_create_join_predicate_edges` runs as step 9 (after value edges are created), `[0]` will still return the value edge. However, this ordering is fragile. The implementation should verify this test still passes, and consider updating it to filter by `not e.is_join_predicate` if needed. + ### Attack 7: Self-join predicate edges **Concern:** In a self-join (`SELECT a.id, b.name FROM users a JOIN users b ON a.manager_id = b.id`), the "left" and "right" tables are the same physical table. Does `join_side` still work? Does `right_table` resolve correctly? @@ -480,3 +574,13 @@ However, tests that use `graph.edges` directly (e.g., `total_edges = [e for e in For adversarial cases (50 joins, 100 columns each), the edge count could reach tens of thousands. This is a pre-existing concern (star expansion on wide tables already produces similar edge counts) and is not specific to this change. **Accepted limitation:** No performance guardrails added. Monitor in practice; add edge-count warnings if needed (similar to existing `UNQUALIFIED_STAR_MULTIPLE_TABLES` warnings). + +### Attack 11: Gap 4 self-read nodes as predicate sources + +**Concern:** With Gap 4 landed (PR #61), self-referencing queries create `{query_id}:self_read:{table}.{column}` input nodes. If Gap 7 creates predicate edges from *physical* `dim_customer.id` instead of `query_0:self_read:dim_customer.id`, the edge connects to the wrong node — the output node rather than the prior-state input node. Impact analysis would miss the self-read chain. + +**Response:** This is handled by the implementation architecture. Gap 7 emits predicate edges at the single-query `lineage_builder.py` level, where nodes use raw `table.column` naming. When `pipeline_lineage_builder.py` copies these edges into the pipeline graph, `_make_full_name` (modified by Gap 4) already renames self-read input columns to `{query_id}:self_read:{table}.{column}`. The predicate edges are automatically remapped to self-read nodes without any Gap-7-specific code. + +**Verification:** Test 8 and Test 9 in the test plan validate this interaction explicitly. If the remapping fails, these tests will catch it. + +**No revision needed.** The existing Gap 4 renaming infrastructure handles this transparently. diff --git a/examples/cdc_scd_pipeline.ipynb b/examples/cdc_scd_pipeline.ipynb new file mode 100644 index 0000000..1fa6560 --- /dev/null +++ b/examples/cdc_scd_pipeline.ipynb @@ -0,0 +1,2048 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "a32b26c7", + "metadata": {}, + "source": [ + "# CDC / SCD Type 2 Pipeline — End-to-End Column Lineage\n", + "\n", + "**Example: A realistic Change Data Capture + Slowly Changing Dimension Type 2 pipeline, with column lineage through every layer.**\n", + "\n", + "This notebook stress-tests clgraph against the kind of pipeline you actually see in production:\n", + "\n", + "- **Raw CDC envelope** (Debezium-style `before`/`after` structs, operation codes)\n", + "- **Dedup staging** using `ROW_NUMBER()` + `WHERE rn = 1`\n", + "- **SCD2 target** via a two-step MERGE-then-INSERT pattern on the same dim table\n", + "- **Fact table** joined to the dim with a **point-in-time** range predicate\n", + "- **Mart** rollup by day and dimension attribute\n", + "\n", + "Along the way it demonstrates ten column-lineage features that were gaps in clgraph before this iteration (see `docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md`):\n", + "\n", + "| # | Feature | Edge / metadata |\n", + "|---|---------|-----------------|\n", + "| 1 | Struct dot-access on CDC envelope | `access_type=\"struct\"`, `nested_path` |\n", + "| 2 | Dedup via subquery + `WHERE rn = 1` | `is_qualify_column`, `qualify_function` |\n", + "| 3/10 | MERGE `WHEN MATCHED AND (...)` condition columns | `merge_column_role=\"condition\"` |\n", + "| 4 | Self-referencing target across statements | self-read nodes, `edge_role=\"prior_state_read\"` |\n", + "| 5 | Literal-only output columns (`'Y' AS is_active`) | terminal nodes, no upstream edges |\n", + "| 6 | Function-only output (`current_timestamp()`) | output nodes, no incoming edges |\n", + "| 7 | JOIN ON predicate columns (`BETWEEN`) | `is_join_predicate=True`, `join_side` |\n", + "| 8 | WHERE clause columns | `is_where_filter=True`, `where_condition` |\n", + "| 9 | MERGE ON literal-bound predicate (`t.is_active = 'Y'`) | `merge_match_filter` edges |\n", + "\n", + "Dialect: **Databricks / Delta Lake** (the natural target for Debezium CDC landing)." + ] + }, + { + "cell_type": "markdown", + "id": "fe790cad", + "metadata": {}, + "source": [ + "## 1. Pipeline architecture\n", + "\n", + "```\n", + "raw_customer_cdc (Debezium envelope: op, ts_ms, before STRUCT, after STRUCT)\n", + " |\n", + " v\n", + "staging_customer_latest (flatten after.*, dedup by PK via ROW_NUMBER + WHERE rn = 1)\n", + " |\n", + " +------------------> dim_customer (SCD2)\n", + " | - Step 1: MERGE WHEN MATCHED UPDATE (close old row)\n", + " | - Step 2: INSERT ... LEFT JOIN (open new version)\n", + " |\n", + " v\n", + "fact_orders (append-only; joins dim_customer with BETWEEN valid_from/valid_to)\n", + " |\n", + " v\n", + "mart_daily_revenue (rollup by day and customer_city)\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "e873e24a", + "metadata": {}, + "source": [ + "## 2. Define the SQL statements\n", + "\n", + "Six statements across four layers. Note the `-- Deletes filtered out; tombstone handling is a future extension` comment on the staging CTE — for this example we only handle creates and updates." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "8af2f37a", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.770752Z", + "iopub.status.busy": "2026-04-15T21:49:27.770595Z", + "iopub.status.idle": "2026-04-15T21:49:27.836924Z", + "shell.execute_reply": "2026-04-15T21:49:27.836505Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Queries: 5\n", + "Columns: 40\n", + "Edges: 95\n", + "\n", + "Topological order:\n", + " 0. l1_l2_staging CREATE OR REPLACE TABLE -> staging_customer_latest\n", + " 1. l2_l3a_close MERGE -> dim_customer\n", + " 2. l2_l3a_open INSERT -> dim_customer\n", + " 3. l3_fact INSERT -> fact_orders\n", + " 4. fact_mart CREATE OR REPLACE TABLE -> mart_daily_revenue\n" + ] + } + ], + "source": [ + "from clgraph import Pipeline\n", + "\n", + "L1_L2_STAGING_SQL = \"\"\"\n", + "CREATE OR REPLACE TABLE staging_customer_latest AS\n", + "SELECT after.id, after.name, after.city, after.email, ts_ms, op\n", + "FROM (\n", + " SELECT *,\n", + " ROW_NUMBER() OVER (PARTITION BY after.id ORDER BY ts_ms DESC) AS rn\n", + " FROM raw_customer_cdc\n", + " WHERE op IN ('c', 'u') -- Deletes filtered out; tombstone handling is a future extension\n", + ")\n", + "WHERE rn = 1\n", + "\"\"\"\n", + "\n", + "L2_L3A_CLOSE_SQL = \"\"\"\n", + "MERGE INTO dim_customer t\n", + "USING staging_customer_latest s ON t.id = s.id AND t.is_active = 'Y'\n", + "WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city OR t.email <> s.email) THEN\n", + " UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N'\n", + "\"\"\"\n", + "\n", + "L2_L3A_OPEN_SQL = \"\"\"\n", + "INSERT INTO dim_customer\n", + "SELECT s.id, s.name, s.city, s.email,\n", + " current_timestamp() AS start_time,\n", + " TIMESTAMP '9999-12-31 00:00:00' AS end_time,\n", + " 'Y' AS is_active\n", + "FROM staging_customer_latest s\n", + "LEFT JOIN dim_customer t\n", + " ON s.id = t.id AND t.is_active = 'Y'\n", + "WHERE t.id IS NULL OR (t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\n", + "\"\"\"\n", + "\n", + "L3_FACT_SQL = \"\"\"\n", + "INSERT INTO fact_orders\n", + "SELECT o.order_id, o.customer_id, o.order_ts, o.amount,\n", + " d.city AS customer_city_at_order\n", + "FROM raw_orders o\n", + "LEFT JOIN dim_customer d\n", + " ON o.customer_id = d.id\n", + " AND o.order_ts BETWEEN d.start_time AND d.end_time\n", + "\"\"\"\n", + "\n", + "FACT_MART_SQL = \"\"\"\n", + "CREATE OR REPLACE TABLE mart_daily_revenue AS\n", + "SELECT DATE(order_ts) AS order_date, customer_city_at_order,\n", + " SUM(amount) AS revenue, COUNT(*) AS orders\n", + "FROM fact_orders\n", + "GROUP BY 1, 2\n", + "\"\"\"\n", + "\n", + "pipeline = Pipeline(\n", + " queries=[\n", + " (\"l1_l2_staging\", L1_L2_STAGING_SQL),\n", + " (\"l2_l3a_close\", L2_L3A_CLOSE_SQL),\n", + " (\"l2_l3a_open\", L2_L3A_OPEN_SQL),\n", + " (\"l3_fact\", L3_FACT_SQL),\n", + " (\"fact_mart\", FACT_MART_SQL),\n", + " ],\n", + " dialect=\"databricks\",\n", + ")\n", + "\n", + "print(f\"Queries: {len(pipeline.table_graph.queries)}\")\n", + "print(f\"Columns: {len(pipeline.columns)}\")\n", + "print(f\"Edges: {len(pipeline.edges)}\")\n", + "print()\n", + "print(\"Topological order:\")\n", + "for i, qid in enumerate(pipeline.table_graph.topological_sort()):\n", + " q = pipeline.table_graph.queries[qid]\n", + " print(f\" {i}. {qid:18s} {q.operation.value:6s} -> {q.destination_table}\")" + ] + }, + { + "cell_type": "markdown", + "id": "cc144e72", + "metadata": {}, + "source": [ + "## 3. Visualize the pipeline\n", + "\n", + "### Table-level DAG" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "7b0ba00f", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.838097Z", + "iopub.status.busy": "2026-04-15T21:49:27.838019Z", + "iopub.status.idle": "2026-04-15T21:49:27.893404Z", + "shell.execute_reply": "2026-04-15T21:49:27.892971Z" + } + }, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest\n", + "\n", + "🔸 staging_customer_latest\n", + "(intermediate)\n", + "\n", + "\n", + "\n", + "dim_customer\n", + "\n", + "🔸 dim_customer\n", + "(intermediate)\n", + "\n", + "\n", + "\n", + "staging_customer_latest->dim_customer\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "MERGE\n", + "\n", + "\n", + "\n", + "staging_customer_latest->dim_customer\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "INSERT\n", + "\n", + "\n", + "\n", + "raw_customer_cdc\n", + "\n", + "\n", + "📊 raw_customer_cdc\n", + "(source)\n", + "\n", + "\n", + "\n", + "raw_customer_cdc->staging_customer_latest\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "CREATE\n", + "\n", + "\n", + "\n", + "dim_customer->dim_customer\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "INSERT\n", + "\n", + "\n", + "\n", + "fact_orders\n", + "\n", + "🔸 fact_orders\n", + "(intermediate)\n", + "\n", + "\n", + "\n", + "dim_customer->fact_orders\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "INSERT\n", + "\n", + "\n", + "\n", + "mart_daily_revenue\n", + "\n", + "🎯 mart_daily_revenue\n", + "(final)\n", + "\n", + "\n", + "\n", + "fact_orders->mart_daily_revenue\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "CREATE\n", + "\n", + "\n", + "\n", + "raw_orders\n", + "\n", + "\n", + "📊 raw_orders\n", + "(source)\n", + "\n", + "\n", + "\n", + "raw_orders->fact_orders\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "INSERT\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import shutil\n", + "\n", + "from clgraph import visualize_pipeline_lineage, visualize_table_dependencies\n", + "\n", + "if shutil.which(\"dot\") is None:\n", + " print(\"Graphviz not installed. Install with: brew install graphviz\")\n", + "else:\n", + " display(visualize_table_dependencies(pipeline.table_graph))" + ] + }, + { + "cell_type": "markdown", + "id": "1afd6697", + "metadata": {}, + "source": [ + "### Column-level lineage (simplified)\n", + "\n", + "The simplified view collapses per-query subquery intermediate nodes but keeps all cross-table flows — ideal for a bird's-eye view." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "45ebee32", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.894700Z", + "iopub.status.busy": "2026-04-15T21:49:27.894594Z", + "iopub.status.idle": "2026-04-15T21:49:27.950728Z", + "shell.execute_reply": "2026-04-15T21:49:27.950205Z" + } + }, + "outputs": [ + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster_fact_mart__main__mart_daily_revenue\n", + "\n", + "🎯 [fact_mart] mart_daily_revenue\n", + "\n", + "\n", + "cluster_l1_l2_staging__external__raw_customer_cdc\n", + "\n", + "📊 raw_customer_cdc\n", + "\n", + "\n", + "cluster_l1_l2_staging__main__staging_customer_latest\n", + "\n", + "🎯 [l1_l2_staging] staging_customer_latest\n", + "\n", + "\n", + "cluster_l2_l3a_close__external__staging_customer_latest\n", + "\n", + "📊 staging_customer_latest\n", + "\n", + "\n", + "cluster_l2_l3a_open__main__dim_customer\n", + "\n", + "🎯 [l2_l3a_open] dim_customer\n", + "\n", + "\n", + "cluster_l3_fact__external__raw_orders\n", + "\n", + "📊 raw_orders\n", + "\n", + "\n", + "cluster_l3_fact__main__fact_orders\n", + "\n", + "🎯 [l3_fact] fact_orders\n", + "\n", + "\n", + "\n", + "mart_daily_revenue_order_date\n", + "\n", + "\n", + "order_date\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "mart_daily_revenue_customer_city_at_order\n", + "\n", + "\n", + "customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "mart_daily_revenue_revenue\n", + "\n", + "\n", + "revenue\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "mart_daily_revenue_orders\n", + "\n", + "\n", + "orders\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_customer_cdc_after\n", + "\n", + "\n", + "after\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_id\n", + "\n", + "\n", + "id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_customer_cdc_after->staging_customer_latest_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_customer_cdc_after->staging_customer_latest_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name\n", + "\n", + "\n", + "name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_customer_cdc_after->staging_customer_latest_name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city\n", + "\n", + "\n", + "city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_customer_cdc_after->staging_customer_latest_city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email\n", + "\n", + "\n", + "email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_customer_cdc_after->staging_customer_latest_email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_customer_cdc_ts_ms\n", + "\n", + "\n", + "ts_ms\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_customer_cdc_op\n", + "\n", + "\n", + "op\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_id\n", + "\n", + "\n", + "id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_id->dim_customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name->dim_customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_name\n", + "\n", + "\n", + "name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name->dim_customer_name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name->dim_customer_name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_city\n", + "\n", + "\n", + "city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name->dim_customer_city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_email\n", + "\n", + "\n", + "email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name->dim_customer_email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_start_time\n", + "\n", + "\n", + "start_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name->dim_customer_start_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_end_time\n", + "\n", + "\n", + "end_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name->dim_customer_end_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_is_active\n", + "\n", + "\n", + "is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_name->dim_customer_is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city->dim_customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city->dim_customer_name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city->dim_customer_city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city->dim_customer_city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city->dim_customer_email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city->dim_customer_start_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city->dim_customer_end_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_city->dim_customer_is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email->dim_customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email->dim_customer_name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email->dim_customer_city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email->dim_customer_email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email->dim_customer_email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email->dim_customer_start_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email->dim_customer_end_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_email->dim_customer_is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_ts_ms\n", + "\n", + "\n", + "ts_ms\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_op\n", + "\n", + "\n", + "op\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "staging_customer_latest_is_active\n", + "\n", + "\n", + "is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_id->dim_customer_name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_id->dim_customer_city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_id->dim_customer_email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_id->dim_customer_start_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_id->dim_customer_end_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_id->dim_customer_is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "fact_orders_customer_city_at_order\n", + "\n", + "\n", + "customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_id->fact_orders_customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_name->dim_customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_name->dim_customer_city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_name->dim_customer_email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_name->dim_customer_start_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_name->dim_customer_end_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_name->dim_customer_is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_city->dim_customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_city->dim_customer_name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_city->dim_customer_email\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_city->dim_customer_start_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_city->dim_customer_end_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_city->dim_customer_is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_city->fact_orders_customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_email->dim_customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_email->dim_customer_name\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_email->dim_customer_city\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_email->dim_customer_start_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_email->dim_customer_end_time\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_email->dim_customer_is_active\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_start_time->fact_orders_customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "dim_customer_end_time->fact_orders_customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_order_id\n", + "\n", + "\n", + "order_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "fact_orders_order_id\n", + "\n", + "\n", + "order_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_order_id->fact_orders_order_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_customer_id\n", + "\n", + "\n", + "customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "fact_orders_customer_id\n", + "\n", + "\n", + "customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_customer_id->fact_orders_customer_id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_customer_id->fact_orders_customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_order_ts\n", + "\n", + "\n", + "order_ts\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "fact_orders_order_ts\n", + "\n", + "\n", + "order_ts\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_order_ts->fact_orders_order_ts\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_order_ts->fact_orders_customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_amount\n", + "\n", + "\n", + "amount\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "fact_orders_amount\n", + "\n", + "\n", + "amount\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "raw_orders_amount->fact_orders_amount\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "fact_orders_order_ts->mart_daily_revenue_order_date\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "fact_orders_amount->mart_daily_revenue_revenue\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "fact_orders_customer_city_at_order->mart_daily_revenue_customer_city_at_order\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "if shutil.which(\"dot\") is None:\n", + " print(\"Graphviz not installed. Install with: brew install graphviz\")\n", + "else:\n", + " display(visualize_pipeline_lineage(pipeline.column_graph.to_simplified()))" + ] + }, + { + "cell_type": "markdown", + "id": "181e8cbc", + "metadata": {}, + "source": [ + "## 4. What clgraph captures — a gap-by-gap showcase\n", + "\n", + "Each section below demonstrates a lineage feature on the actual pipeline above." + ] + }, + { + "cell_type": "markdown", + "id": "2fbaa92d", + "metadata": {}, + "source": [ + "### Gap 1 — Struct dot-access (`after.id`, `after.city`, ...)\n", + "\n", + "Debezium-style CDC envelopes put the row payload under nested structs. sqlglot parses `after.id` as `Column(table=\"after\", name=\"id\")` — indistinguishable from a table-qualified reference. clgraph now detects when `Column.table` can't be resolved as any known table/alias and emits a **struct edge** instead, carrying `nested_path` and `access_type=\"struct\"` metadata." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "d0a8e597", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.951847Z", + "iopub.status.busy": "2026-04-15T21:49:27.951749Z", + "iopub.status.idle": "2026-04-15T21:49:27.953942Z", + "shell.execute_reply": "2026-04-15T21:49:27.953592Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Struct dot-access edges: 5\n", + "\n", + " raw_customer_cdc.after -> l1_l2_staging:subq:subquery_0.rn\n", + " nested_path='.id' access_type='struct'\n", + " raw_customer_cdc.after -> staging_customer_latest.id\n", + " nested_path='.id' access_type='struct'\n", + " raw_customer_cdc.after -> staging_customer_latest.name\n", + " nested_path='.name' access_type='struct'\n", + " raw_customer_cdc.after -> staging_customer_latest.city\n", + " nested_path='.city' access_type='struct'\n", + " raw_customer_cdc.after -> staging_customer_latest.email\n", + " nested_path='.email' access_type='struct'\n" + ] + } + ], + "source": [ + "struct_edges = [e for e in pipeline.edges if e.access_type == \"struct\"]\n", + "\n", + "print(f\"Struct dot-access edges: {len(struct_edges)}\\n\")\n", + "for e in struct_edges:\n", + " print(f\" {e.from_node.full_name:30s} -> {e.to_node.full_name}\")\n", + " print(f\" nested_path={e.nested_path!r:12s} access_type={e.access_type!r}\")" + ] + }, + { + "cell_type": "markdown", + "id": "040cdbf6", + "metadata": {}, + "source": [ + "### Gap 2 — Dedup via subquery + `WHERE rn = 1`\n", + "\n", + "The canonical dedup pattern: `ROW_NUMBER() OVER (PARTITION BY ...)` in a subquery, then `WHERE rn = 1` in the outer query. clgraph detects this shape and promotes qualify metadata — partition / order columns become visible as qualify-context sources of the final output." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "b92ade0b", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.954949Z", + "iopub.status.busy": "2026-04-15T21:49:27.954870Z", + "iopub.status.idle": "2026-04-15T21:49:27.956718Z", + "shell.execute_reply": "2026-04-15T21:49:27.956409Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Qualify (dedup) edges: 1\n", + "\n", + " raw_customer_cdc.after -> staging_customer_latest.id\n", + " qualify_context='partition' qualify_function='ROW_NUMBER'\n" + ] + } + ], + "source": [ + "qualify_edges = [e for e in pipeline.edges if e.is_qualify_column]\n", + "\n", + "print(f\"Qualify (dedup) edges: {len(qualify_edges)}\\n\")\n", + "for e in qualify_edges:\n", + " print(f\" {e.from_node.full_name:40s} -> {e.to_node.full_name}\")\n", + " print(f\" qualify_context={e.qualify_context!r} qualify_function={e.qualify_function!r}\")" + ] + }, + { + "cell_type": "markdown", + "id": "1917568d", + "metadata": {}, + "source": [ + "### Gap 3 / Gap 10 — MERGE `WHEN MATCHED AND (...)` condition columns\n", + "\n", + "The close-old-row MERGE only fires when `t.name <> s.name OR t.city <> s.city OR t.email <> s.email`. Those columns are not *assigned* — but they **gate** the assignment. Before this fix, impact analysis on `staging.name` would miss that it drives whether `dim_customer.end_time` gets updated.\n", + "\n", + "Condition edges now carry `merge_column_role=\"condition\"` (as distinct from `\"assignment\"`)." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "5dfc65e8", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.957614Z", + "iopub.status.busy": "2026-04-15T21:49:27.957552Z", + "iopub.status.idle": "2026-04-15T21:49:27.959468Z", + "shell.execute_reply": "2026-04-15T21:49:27.959210Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "MERGE condition edges: 13\n", + "\n", + " staging_customer_latest.city -> l2_l3a_close:subq:main.end_time (merge_update)\n", + " staging_customer_latest.city -> l2_l3a_close:subq:main.end_time (merge_update)\n", + " staging_customer_latest.city -> l2_l3a_close:subq:main.is_active (merge_update)\n", + " staging_customer_latest.city -> l2_l3a_close:subq:main.is_active (merge_update)\n", + " staging_customer_latest.email -> l2_l3a_close:subq:main.end_time (merge_update)\n", + " staging_customer_latest.email -> l2_l3a_close:subq:main.end_time (merge_update)\n", + " staging_customer_latest.email -> l2_l3a_close:subq:main.is_active (merge_update)\n", + " staging_customer_latest.email -> l2_l3a_close:subq:main.is_active (merge_update)\n", + " staging_customer_latest.is_active -> l2_l3a_close:subq:main.is_active (merge_match_filter)\n", + " staging_customer_latest.name -> l2_l3a_close:subq:main.end_time (merge_update)\n", + " staging_customer_latest.name -> l2_l3a_close:subq:main.end_time (merge_update)\n", + " staging_customer_latest.name -> l2_l3a_close:subq:main.is_active (merge_update)\n", + " staging_customer_latest.name -> l2_l3a_close:subq:main.is_active (merge_update)\n" + ] + } + ], + "source": [ + "cond_edges = [e for e in pipeline.edges if e.merge_column_role == \"condition\"]\n", + "\n", + "print(f\"MERGE condition edges: {len(cond_edges)}\\n\")\n", + "for e in sorted(cond_edges, key=lambda e: (e.from_node.full_name, e.to_node.full_name)):\n", + " print(f\" {e.from_node.full_name:30s} -> {e.to_node.full_name:50s} ({e.edge_type})\")" + ] + }, + { + "cell_type": "markdown", + "id": "4450cd84", + "metadata": {}, + "source": [ + "### Gap 4 — Self-referencing target across statements\n", + "\n", + "Step 2 (`INSERT INTO dim_customer ... LEFT JOIN dim_customer t`) reads from a table that Step 1 just mutated. Naively, this would produce a self-loop in the pipeline graph. clgraph instead creates **self-read nodes** — statement-scoped representations of the prior table state — and wires a **cross-query self-ref edge** from Step 1's output to Step 2's self-read input." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "9ffbe56b", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.960547Z", + "iopub.status.busy": "2026-04-15T21:49:27.960482Z", + "iopub.status.idle": "2026-04-15T21:49:27.962590Z", + "shell.execute_reply": "2026-04-15T21:49:27.962279Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Self-read columns for dim_customer: 4\n", + " l2_l3a_open:self_read:dim_customer.id node_type=self_read\n", + " l2_l3a_open:self_read:dim_customer.email node_type=self_read\n", + " l2_l3a_open:self_read:dim_customer.name node_type=self_read\n", + " l2_l3a_open:self_read:dim_customer.city node_type=self_read\n", + "\n", + "prior_state_read edges (inside step 2): 28\n", + "cross_query_self_ref edges (step 1 -> step 2): 4\n" + ] + } + ], + "source": [ + "sr_cols = pipeline.get_self_read_columns(\"dim_customer\")\n", + "print(f\"Self-read columns for dim_customer: {len(sr_cols)}\")\n", + "for c in sr_cols:\n", + " print(f\" {c.full_name:60s} node_type={c.node_type}\")\n", + "\n", + "print()\n", + "prior_state_edges = [e for e in pipeline.edges if e.edge_role == \"prior_state_read\"]\n", + "cross_query_edges = [e for e in pipeline.edges if e.edge_role == \"cross_query_self_ref\"]\n", + "print(f\"prior_state_read edges (inside step 2): {len(prior_state_edges)}\")\n", + "print(f\"cross_query_self_ref edges (step 1 -> step 2): {len(cross_query_edges)}\")" + ] + }, + { + "cell_type": "markdown", + "id": "6bd7c242", + "metadata": {}, + "source": [ + "### Gap 5 — Literal-only output columns (`'Y' AS is_active`)\n", + "\n", + "Columns derived purely from literals (e.g., `'Y' AS is_active`, `TIMESTAMP '9999-12-31 ...'`) appear as terminal output nodes with **zero upstream edges**. They're not silently dropped — they're visible as first-class columns on the dim table." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "10528c00", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.963407Z", + "iopub.status.busy": "2026-04-15T21:49:27.963343Z", + "iopub.status.idle": "2026-04-15T21:49:27.965592Z", + "shell.execute_reply": "2026-04-15T21:49:27.965311Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Literal-only / function-only output columns on dim_customer:\n", + "\n", + " city (1 value edges)\n", + " email (1 value edges)\n", + " end_time (literal/function only)\n", + " id (1 value edges)\n", + " is_active (literal/function only)\n", + " name (1 value edges)\n", + " start_time (literal/function only)\n" + ] + } + ], + "source": [ + "from collections import defaultdict\n", + "\n", + "# Find output columns with no incoming value edges — these are literal-only or function-only\n", + "incoming = defaultdict(list)\n", + "for e in pipeline.edges:\n", + " if not e.is_join_predicate and not e.is_where_filter and e.merge_column_role != \"condition\":\n", + " incoming[e.to_node.full_name].append(e)\n", + "\n", + "dim_output_cols = [\n", + " c for c in pipeline.columns.values() if c.table_name == \"dim_customer\" and c.layer == \"output\"\n", + "]\n", + "print(\"Literal-only / function-only output columns on dim_customer:\\n\")\n", + "for col in sorted(dim_output_cols, key=lambda c: c.column_name):\n", + " n_value_in = len(incoming.get(col.full_name, []))\n", + " marker = \"(literal/function only)\" if n_value_in == 0 else f\"({n_value_in} value edges)\"\n", + " print(f\" {col.column_name:15s} {marker}\")" + ] + }, + { + "cell_type": "markdown", + "id": "98ce3eda", + "metadata": {}, + "source": [ + "### Gap 6 — Function-only outputs (`current_timestamp()`)\n", + "\n", + "`current_timestamp()` has no column dependencies — it's a function-only source. The resulting columns (`start_time`, `end_time` after MERGE) appear as outputs in the graph without incoming column edges, visible for downstream impact analysis even though nothing upstream drives their value." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "b3dec72f", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.966622Z", + "iopub.status.busy": "2026-04-15T21:49:27.966554Z", + "iopub.status.idle": "2026-04-15T21:49:27.968728Z", + "shell.execute_reply": "2026-04-15T21:49:27.968390Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Edges targeting *.end_time (by query):\n", + "\n", + " query=l2_l3a_close\n", + " staging_customer_latest.email -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n", + " staging_customer_latest.email -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n", + " staging_customer_latest.name -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n", + " staging_customer_latest.name -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n", + " staging_customer_latest.city -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n", + " ... and 1 more\n", + "\n", + " query=l2_l3a_open\n", + " l2_l3a_open:self_read:dim_customer.id -> dim_customer.end_time (where_filter, role=None)\n", + " l2_l3a_open:self_read:dim_customer.email -> dim_customer.end_time (where_filter, role=None)\n", + " staging_customer_latest.email -> dim_customer.end_time (where_filter, role=None)\n", + " l2_l3a_open:self_read:dim_customer.name -> dim_customer.end_time (where_filter, role=None)\n", + " staging_customer_latest.name -> dim_customer.end_time (where_filter, role=None)\n", + " ... and 2 more\n" + ] + } + ], + "source": [ + "# end_time appears twice: assigned via current_timestamp() in the MERGE,\n", + "# and (separately) derived from the '9999-...' literal in the INSERT.\n", + "# Both are handled correctly.\n", + "\n", + "end_time_edges = [e for e in pipeline.edges if e.to_node.column_name == \"end_time\"]\n", + "print(\"Edges targeting *.end_time (by query):\")\n", + "by_query: dict[str, list] = {}\n", + "for e in end_time_edges:\n", + " by_query.setdefault(e.query_id or \"?\", []).append(e)\n", + "\n", + "for qid, edges in by_query.items():\n", + " print(f\"\\n query={qid}\")\n", + " for e in edges[:5]:\n", + " print(\n", + " f\" {e.from_node.full_name:50s} -> {e.to_node.full_name} ({e.edge_type}, role={e.merge_column_role})\"\n", + " )\n", + " if len(edges) > 5:\n", + " print(f\" ... and {len(edges) - 5} more\")" + ] + }, + { + "cell_type": "markdown", + "id": "94194131", + "metadata": {}, + "source": [ + "### Gap 7 — JOIN ON predicate columns (including `BETWEEN`)\n", + "\n", + "The fact-table join uses a point-in-time predicate: `o.order_ts BETWEEN d.start_time AND d.end_time`. Before this fix, `d.start_time` and `d.end_time` were invisible to column lineage (the JOIN ON clause wasn't traced at all).\n", + "\n", + "Now predicate-only columns produce **predicate edges** with `is_join_predicate=True` to every output column of the joined query, enabling impact analysis (\"if I change the SCD2 `end_time` semantics, what downstream columns are affected?\")." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "352af875", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.969547Z", + "iopub.status.busy": "2026-04-15T21:49:27.969476Z", + "iopub.status.idle": "2026-04-15T21:49:27.971143Z", + "shell.execute_reply": "2026-04-15T21:49:27.970829Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "JOIN predicate edges: 5\n", + "\n", + " dim_customer.end_time -> fact_orders.customer_city_at_order (side=right)\n", + " dim_customer.id -> fact_orders.customer_city_at_order (side=right)\n", + " dim_customer.start_time -> fact_orders.customer_city_at_order (side=right)\n", + " raw_orders.customer_id -> fact_orders.customer_city_at_order (side=left)\n", + " raw_orders.order_ts -> fact_orders.customer_city_at_order (side=left)\n" + ] + } + ], + "source": [ + "join_pred_edges = [e for e in pipeline.edges if e.is_join_predicate]\n", + "print(f\"JOIN predicate edges: {len(join_pred_edges)}\\n\")\n", + "for e in sorted(join_pred_edges, key=lambda e: (e.from_node.full_name, e.to_node.full_name)):\n", + " print(f\" {e.from_node.full_name:30s} -> {e.to_node.full_name:45s} (side={e.join_side})\")" + ] + }, + { + "cell_type": "markdown", + "id": "63d46ef8", + "metadata": {}, + "source": [ + "### Gap 8 — WHERE clause columns\n", + "\n", + "WHERE predicates don't produce output values directly, but they gate which rows land. `WHERE t.id IS NULL OR (t.name <> s.name OR t.city <> s.city OR t.email <> s.email)` in Step 2 is the sentinel that distinguishes *new* rows from *changed* rows. Column lineage now records these as `where_filter` edges." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "07ffd56f", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.972025Z", + "iopub.status.busy": "2026-04-15T21:49:27.971955Z", + "iopub.status.idle": "2026-04-15T21:49:27.974004Z", + "shell.execute_reply": "2026-04-15T21:49:27.973647Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "WHERE filter edges: 50\n", + "\n", + " l2_l3a_open:self_read:dim_customer.id -> 7 output columns (where_filter)\n", + " l2_l3a_open:self_read:dim_customer.email -> 7 output columns (where_filter)\n", + " staging_customer_latest.email -> 7 output columns (where_filter)\n", + " l2_l3a_open:self_read:dim_customer.name -> 7 output columns (where_filter)\n", + " staging_customer_latest.name -> 7 output columns (where_filter)\n", + " l2_l3a_open:self_read:dim_customer.city -> 7 output columns (where_filter)\n", + " staging_customer_latest.city -> 7 output columns (where_filter)\n", + " raw_customer_cdc.op -> 1 output columns (where_filter)\n", + "\n", + "Sample where_condition metadata:\n", + " raw_customer_cdc.op -> l1_l2_staging:subq:subquery_0.rn\n", + " where_condition=\"op IN ('c', 'u') /* Deletes filtered out; tombstone handling is a future extension */\"\n", + " l2_l3a_open:self_read:dim_customer.id -> dim_customer.id\n", + " where_condition='t.id IS NULL OR (t.name <> s.name OR t.city <> s.city OR t.email <> s.email)'\n", + " l2_l3a_open:self_read:dim_customer.id -> dim_customer.name\n", + " where_condition='t.id IS NULL OR (t.name <> s.name OR t.city <> s.city OR t.email <> s.email)'\n" + ] + } + ], + "source": [ + "from collections import Counter\n", + "\n", + "where_edges = [e for e in pipeline.edges if e.is_where_filter]\n", + "print(f\"WHERE filter edges: {len(where_edges)}\\n\")\n", + "\n", + "# Group by source column to avoid spam — each source predicate column fans out\n", + "# to every non-star output column of the query\n", + "by_source = Counter(e.from_node.full_name for e in where_edges)\n", + "for col, count in by_source.most_common():\n", + " print(f\" {col:40s} -> {count} output columns (where_filter)\")\n", + "\n", + "print(\"\\nSample where_condition metadata:\")\n", + "for e in where_edges[:3]:\n", + " print(f\" {e.from_node.full_name} -> {e.to_node.full_name}\")\n", + " print(f\" where_condition={e.where_condition!r}\")" + ] + }, + { + "cell_type": "markdown", + "id": "4b2d16e5", + "metadata": {}, + "source": [ + "### Gap 9 — MERGE ON literal-bound predicate (`t.is_active = 'Y'`)\n", + "\n", + "The close-old-row MERGE's ON clause is `t.id = s.id AND t.is_active = 'Y'`. The second conjunct is a literal filter — `match_columns` extraction only captures `EQ` column pairs, so before this fix `t.is_active` was dropped. Now it's emitted as a **`merge_match_filter`** edge with `merge_column_role=\"condition\"`." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "75a6f4b3", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.974812Z", + "iopub.status.busy": "2026-04-15T21:49:27.974744Z", + "iopub.status.idle": "2026-04-15T21:49:27.976293Z", + "shell.execute_reply": "2026-04-15T21:49:27.976031Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "merge_match_filter edges: 1\n", + "\n", + " staging_customer_latest.is_active -> l2_l3a_close:subq:main.is_active\n", + " merge_column_role='condition' merge_condition=None\n" + ] + } + ], + "source": [ + "mmf_edges = [e for e in pipeline.edges if e.edge_type == \"merge_match_filter\"]\n", + "print(f\"merge_match_filter edges: {len(mmf_edges)}\\n\")\n", + "for e in mmf_edges:\n", + " print(f\" {e.from_node.full_name:40s} -> {e.to_node.full_name}\")\n", + " print(f\" merge_column_role={e.merge_column_role!r} merge_condition={e.merge_condition!r}\")" + ] + }, + { + "cell_type": "markdown", + "id": "84235fc7", + "metadata": {}, + "source": [ + "## 5. Impact analysis — putting it all together\n", + "\n", + "With all 10 gap features in place, impact analysis queries return realistic, production-accurate answers. Two worked examples:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "d69e186e", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.977182Z", + "iopub.status.busy": "2026-04-15T21:49:27.977118Z", + "iopub.status.idle": "2026-04-15T21:49:27.979531Z", + "shell.execute_reply": "2026-04-15T21:49:27.979104Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Q1. Forward impact of raw_customer_cdc.after.city:\n", + "\n", + " dim_customer -> ['city', 'email', 'id', 'is_active', 'name']\n", + " main -> ['end_time', 'id', 'is_active']\n", + " mart_daily_revenue -> ['customer_city_at_order']\n", + " subquery_0 -> ['rn']\n" + ] + } + ], + "source": [ + "# Q1: \"What downstream columns depend on raw_customer_cdc.after.city?\"\n", + "# (The CDC envelope struct field, via dedup -> staging -> SCD2 dim -> fact -> mart)\n", + "print(\"Q1. Forward impact of raw_customer_cdc.after.city:\\n\")\n", + "downstream = pipeline.trace_column_forward(\"raw_customer_cdc\", \"after\")\n", + "by_table: dict[str, list] = {}\n", + "for col in downstream:\n", + " by_table.setdefault(col.table_name, []).append(col.column_name)\n", + "for table, cols in sorted(by_table.items()):\n", + " print(f\" {table:30s} -> {sorted(set(cols))}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "15739966", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.980423Z", + "iopub.status.busy": "2026-04-15T21:49:27.980354Z", + "iopub.status.idle": "2026-04-15T21:49:27.982085Z", + "shell.execute_reply": "2026-04-15T21:49:27.981804Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Q2. Backward trace of mart_daily_revenue.customer_city_at_order:\n", + "\n", + " raw_customer_cdc <- ['after']\n", + " raw_orders <- ['customer_id', 'order_ts']\n" + ] + } + ], + "source": [ + "# Q2: \"What upstream columns drive mart_daily_revenue.customer_city_at_order?\"\n", + "print(\"Q2. Backward trace of mart_daily_revenue.customer_city_at_order:\\n\")\n", + "upstream = pipeline.trace_column_backward(\"mart_daily_revenue\", \"customer_city_at_order\")\n", + "by_table = {}\n", + "for col in upstream:\n", + " by_table.setdefault(col.table_name, []).append(col.column_name)\n", + "for table, cols in sorted(by_table.items()):\n", + " print(f\" {table:30s} <- {sorted(set(cols))}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "1f0afe39", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.982891Z", + "iopub.status.busy": "2026-04-15T21:49:27.982836Z", + "iopub.status.idle": "2026-04-15T21:49:27.984722Z", + "shell.execute_reply": "2026-04-15T21:49:27.984392Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Q3. Forward impact of dim_customer.end_time (Gap 7 predicate-edge reachability):\n", + "\n", + " mart_daily_revenue -> ['customer_city_at_order']\n" + ] + } + ], + "source": [ + "# Q3: \"If we change the SCD2 validity semantics (dim_customer.end_time),\n", + "# what downstream columns in fact_orders / mart are affected via the BETWEEN join?\"\n", + "print(\"Q3. Forward impact of dim_customer.end_time (Gap 7 predicate-edge reachability):\\n\")\n", + "downstream = pipeline.trace_column_forward(\"dim_customer\", \"end_time\")\n", + "by_table = {}\n", + "for col in downstream:\n", + " by_table.setdefault(col.table_name, []).append(col.column_name)\n", + "for table, cols in sorted(by_table.items()):\n", + " print(f\" {table:30s} -> {sorted(set(cols))}\")" + ] + }, + { + "cell_type": "markdown", + "id": "7f84ebc6", + "metadata": {}, + "source": [ + "## 6. Edge-metadata summary\n", + "\n", + "A one-glance summary of what this pipeline produces, by edge type / flag." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "9e27071f", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-15T21:49:27.985588Z", + "iopub.status.busy": "2026-04-15T21:49:27.985539Z", + "iopub.status.idle": "2026-04-15T21:49:27.988035Z", + "shell.execute_reply": "2026-04-15T21:49:27.987656Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Total edges: 95\n", + "\n", + "By edge_type:\n", + " where_filter 50\n", + " merge_update 12\n", + " direct_column 9\n", + " expression 6\n", + " join_predicate 5\n", + " cross_query_self_ref 4\n", + " window 2\n", + " star_passthrough 1\n", + " window_partition 1\n", + " window_order 1\n", + " qualify_partition 1\n", + " merge_match 1\n", + " merge_match_filter 1\n", + " aggregate 1\n", + "\n", + "By feature flag:\n", + " is_join_predicate 5\n", + " is_where_filter 50\n", + " is_qualify_column 1\n", + " access_type=struct 5\n", + " merge_column_role=condition 13\n", + " merge_column_role=assignment 0\n", + " edge_role=prior_state_read 28\n", + " edge_role=cross_query_self_ref 4\n" + ] + } + ], + "source": [ + "from collections import Counter\n", + "\n", + "print(f\"Total edges: {len(pipeline.edges)}\\n\")\n", + "\n", + "edge_type_counts = Counter(e.edge_type for e in pipeline.edges)\n", + "print(\"By edge_type:\")\n", + "for et, n in edge_type_counts.most_common():\n", + " print(f\" {et:25s} {n}\")\n", + "\n", + "print(\"\\nBy feature flag:\")\n", + "flag_counts = {\n", + " \"is_join_predicate\": sum(1 for e in pipeline.edges if e.is_join_predicate),\n", + " \"is_where_filter\": sum(1 for e in pipeline.edges if e.is_where_filter),\n", + " \"is_qualify_column\": sum(1 for e in pipeline.edges if e.is_qualify_column),\n", + " \"access_type=struct\": sum(1 for e in pipeline.edges if e.access_type == \"struct\"),\n", + " \"merge_column_role=condition\": sum(\n", + " 1 for e in pipeline.edges if e.merge_column_role == \"condition\"\n", + " ),\n", + " \"merge_column_role=assignment\": sum(\n", + " 1 for e in pipeline.edges if e.merge_column_role == \"assignment\"\n", + " ),\n", + " \"edge_role=prior_state_read\": sum(\n", + " 1 for e in pipeline.edges if e.edge_role == \"prior_state_read\"\n", + " ),\n", + " \"edge_role=cross_query_self_ref\": sum(\n", + " 1 for e in pipeline.edges if e.edge_role == \"cross_query_self_ref\"\n", + " ),\n", + "}\n", + "for flag, n in flag_counts.items():\n", + " print(f\" {flag:35s} {n}\")" + ] + }, + { + "cell_type": "markdown", + "id": "104ee433", + "metadata": {}, + "source": [ + "## 7. Further reading\n", + "\n", + "- **Design doc:** `docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md`\n", + "- **Gap-specific designs:**\n", + " - `2026-04-13-gap4-self-referencing-target-design.md`\n", + " - `2026-04-13-gap7-join-predicate-columns-design.md`\n", + " - `2026-04-14-gaps-1-2-8-design.md`\n", + "- **Focused examples:**\n", + " - `examples/self_referencing_lineage.ipynb` — Gap 4 in isolation\n", + " - `examples/join_predicate_lineage.ipynb` — Gap 7 in isolation\n", + " - `examples/merge_lineage.ipynb` — Gap 3/9/10 MERGE mechanics\n", + "- **Integration tests:** `tests/test_cdc_scd_pipeline.py`" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.1" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}