diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index bf004cd..f4176e2 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -58,3 +58,29 @@ jobs:
- name: Run tests
run: uv run pytest tests/ -v --cov=src/clgraph --cov-report=term-missing
+
+ notebooks:
+ name: Example Notebooks
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.12'
+
+ - name: Install Graphviz
+ run: sudo apt-get update && sudo apt-get install -y graphviz
+
+ - name: Install uv
+ run: pip install uv
+
+ - name: Install dependencies
+ run: |
+ uv sync
+ uv pip install -e ".[dev]"
+
+ - name: Run example notebooks (skip LLM)
+ run: uv run python run_all_notebooks.py --skip-llm
diff --git a/docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md b/docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md
index 2572151..555c338 100644
--- a/docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md
+++ b/docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md
@@ -1,9 +1,29 @@
# CDC/SCD Pipeline: Gap Analysis & Example Notebook
-**Date:** 2026-04-13
+**Date:** 2026-04-13 (updated 2026-04-14)
**TODO item:** B — `examples/cdc_scd_pipeline.ipynb`
**Goal:** Stress-test clgraph with a realistic CDC/SCD Type 2 pipeline, surface gaps in column-lineage capture, fix what's practical, and deliver a showcase notebook that honestly documents what works and what doesn't.
+## Progress Summary
+
+**All 10 gaps closed.** No remaining open issues.
+
+| Closed | Fix |
+|--------|-----|
+| Gap 1 | Struct dot-access fallback for unresolvable table refs with `nested_path`/`access_type="struct"` (67859c7) |
+| Gap 2 | Promote qualify metadata from subquery-based dedup `WHERE rn = 1` pattern (622e651) |
+| Gap 3/10 | MERGE condition-gating edges with `merge_column_role='condition'` (778f918) |
+| Gap 4 | Statement-scoped self-read nodes for self-referencing targets (5ef8dad) |
+| Gap 5 | Verified: literal-only columns appear as terminal nodes with zero upstream edges |
+| Gap 6 | Verified: `current_timestamp()` columns appear as output nodes, no incoming edges |
+| Gap 7 | Tagged predicate edges for JOIN ON columns with `is_join_predicate=True` (12f2b62) |
+| Gap 8 | WHERE filter lineage with `is_where_filter=True` and `where_condition` edges (bf6972d) |
+| Gap 9 | Literal-bound MERGE ON predicate extraction as `merge_match_filter` edges (dfbd6b7) |
+
+Design docs: [Gap 4](2026-04-13-gap4-self-referencing-target-design.md), [Gap 7](2026-04-13-gap7-join-predicate-columns-design.md), [Gaps 1+2+8](2026-04-14-gaps-1-2-8-design.md)
+
+Test suites: `test_struct_dot_access.py` (Gap 1), `test_subquery_dedup_qualify.py` (Gap 2), `test_where_filter_lineage.py` (Gap 8), `test_cdc_scd_pipeline.py` (integration), `test_join_predicate_columns.py` (Gap 7)
+
## Background
clgraph already has MERGE parsing (`query_parser._parse_merge_statement`) that captures target, source, match columns, and matched/not-matched actions with per-column mappings. An example exists at `examples/merge_lineage.ipynb`. What is *not* yet proven is whether clgraph produces **correct and complete** column lineage for the specific MERGE patterns that appear in real-world CDC/SCD Type 2 pipelines.
@@ -95,18 +115,18 @@ Each gap will be tested by running the fixture through `Pipeline` and inspecting
- **I** — *Parses but lineage incomplete* (missing edges)
- **F** — *Fails to parse*
-| # | Gap | Where | Expected classification |
-|---|-----|-------|------------------------|
-| 1 | Struct field access on CDC envelope: `after.id` → `staging.id` | L1→L2 | I (may work via existing struct support, needs verification) |
-| 2 | Dedup pattern: `QUALIFY rn = 1` / `WHERE rn = 1` after `ROW_NUMBER()` | L2 | Likely works (`qualify_lineage` exists); verify not dropped through CTE |
-| 3 | **MERGE WHEN MATCHED trigger columns** — condition `t.name <> s.name` should contribute to the lineage of `end_time` and `is_active`, not just the assigned exprs | L3a Step 1 | **P / I** (likely current behavior only records assigned exprs) |
-| 4 | **Self-referencing target** — Step 2 `LEFT JOIN dim_customer` on a table Step 1 just mutated. Pipeline lineage must treat the same table as both an input and output between statements | L3a | I (pipeline_lineage_builder behavior unknown) |
-| 5 | Literal-only columns (`'Y' AS is_active`, `TIMESTAMP '9999-...'`) — should appear as terminal nodes, not be silently dropped | L3a Step 2 | Likely works; verify |
-| 6 | `current_timestamp()` — function-only source (no column deps) | L3a both steps | Likely works; verify |
-| 7 | `BETWEEN d.start_time AND d.end_time` in JOIN ON — does `fact_orders.customer_city_at_order` lineage include `d.start_time/end_time` as condition columns? | L3→fact | **P / I** (join predicate columns often omitted from column lineage) |
-| 8 | `WHERE t.id IS NULL OR (...)` as sentinel for new-vs-versioned inserts — does the NULL branch get recorded? | L3a Step 2 | I (expected; logical branch not in column graph today) |
-| 9 | MERGE's `ON t.id = s.id AND t.is_active = 'Y'` with literal predicate — match_columns extraction looks at `EQ` pairs only; literal-bound predicate may be dropped | L3a Step 1 | I |
-| 10 | Does MERGE's close-action (UPDATE of `end_time`) show `is_active` as a **dependency** of `end_time` via the WHEN MATCHED condition? Impact analysis depends on this | L3a Step 1 | P (suspected) |
+| # | Gap | Where | Expected classification | Status |
+|---|-----|-------|------------------------|--------|
+| 1 | Struct field access on CDC envelope: `after.id` → `staging.id` | L1→L2 | **I** — sqlglot parses `after.id` as `Column(table="after", name="id")`, indistinguishable from table-qualified ref. | **Closed** (67859c7) — struct fallback emits edges with `nested_path`/`access_type="struct"` when `Column.table` doesn't resolve |
+| 2 | Dedup pattern: `QUALIFY rn = 1` / `WHERE rn = 1` after `ROW_NUMBER()` | L2 | **I** — Subquery-based dedup did not propagate qualify metadata to final output. | **Closed** (622e651) — detects ranking functions in subquery + `WHERE rn = 1` outer filter, promotes qualify metadata |
+| 3 | **MERGE WHEN MATCHED trigger columns** — condition `t.name <> s.name` should contribute to the lineage of `end_time` and `is_active`, not just the assigned exprs | L3a Step 1 | **P / I** (likely current behavior only records assigned exprs) | **Closed** (778f918) |
+| 4 | **Self-referencing target** — Step 2 `LEFT JOIN dim_customer` on a table Step 1 just mutated. Pipeline lineage must treat the same table as both an input and output between statements | L3a | I (pipeline_lineage_builder behavior unknown) | **Closed** (5ef8dad) |
+| 5 | Literal-only columns (`'Y' AS is_active`, `TIMESTAMP '9999-...'`) — should appear as terminal nodes, not be silently dropped | L3a Step 2 | Verified: terminal nodes with zero upstream edges. Minor: `is_literal` flag not set (only used for VALUES clauses). | **Closed** (verified) |
+| 6 | `current_timestamp()` — function-only source (no column deps) | L3a both steps | Verified: output nodes with `node_type=expression`, no incoming edges. Works in both MERGE UPDATE and INSERT contexts. | **Closed** (verified) |
+| 7 | `BETWEEN d.start_time AND d.end_time` in JOIN ON — does `fact_orders.customer_city_at_order` lineage include `d.start_time/end_time` as condition columns? | L3→fact | **P / I** (join predicate columns often omitted from column lineage) | **Closed** (12f2b62) |
+| 8 | `WHERE t.id IS NULL OR (...)` as sentinel for new-vs-versioned inserts — does the NULL branch get recorded? | L3a Step 2 | **I** — WHERE clause columns not tracked in column lineage | **Closed** (bf6972d) — `where_filter` edges with `is_where_filter=True` and `where_condition` metadata |
+| 9 | MERGE's `ON t.id = s.id AND t.is_active = 'Y'` with literal predicate — match_columns extraction looks at `EQ` pairs only; literal-bound predicate may be dropped | L3a Step 1 | I | **Closed** (dfbd6b7) |
+| 10 | Does MERGE's close-action (UPDATE of `end_time`) show `is_active` as a **dependency** of `end_time` via the WHEN MATCHED condition? Impact analysis depends on this | L3a Step 1 | P (suspected) | **Closed** (778f918, same fix as Gap 3) |
**Probably-already-works (sanity checks, not gaps):** cross-CTE propagation (fixed by 8aaa454), column extraction for `DATE(order_ts)`, `SUM(amount)` aggregates, GROUP BY columns.
@@ -122,30 +142,31 @@ Three artifacts, in order:
### 2. Gap fixes (scope-limited, each in its own commit)
Only fixes that are **localized and low-risk** land in this effort. Larger architectural changes (e.g., introducing a new edge type for "predicate-conditional" columns) are documented as follow-ups in `TODO.md`, not attempted here.
-Likely in-scope fixes:
-- Gap 3/10: extend `_parse_merge_statement` to record WHEN MATCHED `condition` columns as inputs to each assigned target column's mapping.
-- Gap 9: generalize match_columns extraction to skip literal-bound predicates cleanly rather than dropping the whole predicate.
-
-Likely out-of-scope (documented as follow-ups):
-- Gap 7 (join-predicate columns in column lineage) — this is a project-wide convention question, not a CDC-specific fix.
-- Gap 8 (logical-branch lineage from NULL sentinels) — needs a new edge semantic.
+All fixes completed (each in its own commit):
+- **Gap 1** (67859c7): Struct dot-access fallback — when `Column.table` doesn't resolve to a known table/alias/subquery in scope, emits a lineage edge with `nested_path` and `access_type="struct"` using the first base table from the dependency chain as the source. Handles recursive base table resolution for CDC-like subquery patterns.
+- **Gap 2** (622e651): Subquery-based dedup promotion — detects the common pattern (ROW_NUMBER/RANK/DENSE_RANK/NTILE in subquery + `WHERE rn = 1` in outer query) and promotes qualify metadata (`is_qualify_column`, `qualify_context`, `qualify_function`) to the outer unit. Adds `ranking_window_columns` to `QueryUnit` for cross-unit metadata propagation.
+- **Gap 3/10** (778f918): Extended MERGE parsing to extract column references from WHEN MATCHED conditions and emit edges with `merge_column_role='condition'`. Condition columns (e.g., `staging.name`) now appear as upstream inputs to assigned target columns (e.g., `dim_customer.end_time`).
+- **Gap 4** (5ef8dad): Implemented statement-scoped table versioning with `{query_id}:self_read:{table}.{col}` naming. Self-read nodes represent the prior table state, enabling correct lineage when the same table is both input and output across pipeline statements.
+- **Gap 7** (12f2b62): Emits tagged predicate edges for JOIN ON clause columns with `is_join_predicate=True`, `join_condition`, and `join_side` metadata. Supports equi-joins, range/BETWEEN, function-wrapped predicates, and multi-join chains.
+- **Gap 8** (bf6972d): WHERE filter lineage — columns referenced in WHERE clauses now produce `where_filter` edges to all non-star output columns, with `is_where_filter=True` and `where_condition` metadata. Subquery columns within WHERE are excluded from the outer query's predicates. Also fixes `trace_forward` BFS to treat nodes as terminals when all outgoing targets are already visited.
+- **Gap 9** (dfbd6b7): Extracts literal-bound ON predicates in MERGE (e.g., `t.is_active = 'Y'`) and emits lineage edges with `merge_match_filter` edge type and `merge_column_role="condition"`.
### 3. Showcase notebook (`examples/cdc_scd_pipeline.ipynb`)
Structure:
1. **Narrative intro** — what a CDC/SCD2 pipeline is, why it matters, what we're testing.
2. **The pipeline SQL** — the 6 statements above, with comments pointing out the interesting structures.
3. **Build + visualize** — `Pipeline(...)` → lineage graph rendered (GraphViz), table-level DAG.
-4. **What clgraph captures** — 4-5 concrete impact-analysis queries (e.g., "what downstream columns depend on `raw_customer_cdc.after.city`?"), showing them returning correct results.
-5. **Known limitations** — honest subsection listing the remaining gaps (7, 8, and any others not fixed), each with a short SQL snippet and what the ideal answer would be. Links to the xfail tests.
+4. **What clgraph captures** — concrete impact-analysis queries (e.g., "what downstream columns depend on `raw_customer_cdc.after.city`?"), showcasing all 10 resolved gaps: struct dot-access (1), dedup qualify promotion (2), MERGE condition-gating (3/10), self-referencing targets (4), literal terminals (5), function-only sources (6), JOIN predicate columns (7), WHERE filter lineage (8), and MERGE literal predicates (9).
+5. **Edge semantics showcase** — demonstrate the new edge types and metadata: `access_type="struct"`, `is_qualify_column`, `merge_column_role`, `is_join_predicate`, `is_where_filter`, `merge_match_filter`.
## Acceptance Criteria
- [ ] `tests/test_cdc_scd_pipeline.py` exists, runs in CI, and has tests for every row in the gap table.
-- [ ] Each xfail is tagged with a specific gap number and a reason string.
+- [ ] All 10 gap tests pass (no xfails remaining).
- [ ] Every fix commit has: failing test → fix → passing test (TDD, per repo convention).
- [ ] The notebook runs end-to-end via `run_all_notebooks.py` with no errors.
-- [ ] The notebook's "Known limitations" section matches the current xfail list (no silent discrepancies).
-- [ ] `TODO.md` updated: item B checked off; follow-up gaps listed as their own new entries.
+- [ ] The notebook showcases all 10 resolved gaps with concrete lineage queries.
+- [ ] `TODO.md` updated: item B checked off.
## Out of Scope
@@ -157,26 +178,26 @@ Structure:
## Risks & Open Questions
-1. **Risk: gap 3/10 fix has wider blast radius than expected.** The MERGE WHEN-MATCHED condition column inputs may already be partially captured elsewhere (e.g., in `match_columns`) in a different way; we need to not duplicate or contradict that. Mitigation: read `lineage_builder`'s MERGE handling before editing `query_parser`.
-2. **Open: how should "trigger columns" be represented in the column graph?** Options: (a) same edge type as assignment inputs — simplest, but flattens semantics; (b) new edge attribute `role: "trigger" | "assignment"`. Recommend (a) for this iteration — keep it simple, revisit if users ask for the distinction.
-3. **Open: should the staging CTE preserve `op` column for dim?** CDC deletes usually produce SCD2 tombstones (`is_active='N', end_time=now()`), not hard deletes. For this iteration we filter `op IN ('c','u')` and list delete-handling as a follow-up gap.
+1. **~~Risk: gap 3/10 fix has wider blast radius than expected.~~** **RESOLVED.** The fix uses `merge_column_role` to distinguish condition vs assignment edges, avoiding duplication with `match_columns`. No blast radius issues encountered.
+2. **~~Open: how should "trigger columns" be represented in the column graph?~~** **RESOLVED.** Chose option (b): edges carry `merge_column_role='condition'` vs `merge_column_role='assignment'`, giving users the ability to distinguish trigger vs assignment semantics without flattening.
+3. **Open: should the staging CTE preserve `op` column for dim?** CDC deletes usually produce SCD2 tombstones (`is_active='N', end_time=now()`), not hard deletes. For this iteration we filter `op IN ('c','u')` and list delete-handling as a follow-up gap. **Analysis:** The `op` column is selected into `staging_customer_latest` but never referenced by the downstream MERGE or INSERT statements — it's present but unused. The test fixture in `test_cdc_scd_pipeline.py` assumes `staging_customer_latest` already exists, so the staging CTE is not yet tested. This is a data modeling choice, not a lineage bug. Reconsider if future requirements need full-envelope preservation or delete-handling.
### Additional risks surfaced from code inspection
-4. **Risk: gap 3/10 may already be half-plumbed, which makes the "fix" subtler than it looks.** `trace_strategies.trace_merge_columns` (`trace_strategies.py:186-229`) already stamps each MERGE `ColumnEdge` with `merge_condition` (raw SQL of the WHEN clause) and `merge_action`. The condition is therefore *recorded as edge metadata* but the columns *referenced by that condition* are not emitted as upstream inputs of the assigned target columns. The fix is not "start capturing the condition" — it's "parse the stored condition into column refs and add them as edges." Mitigation: the parser already has a column-extraction pass we can reuse; don't re-implement it in the MERGE path.
+4. **~~Risk: gap 3/10 may already be half-plumbed.~~** **RESOLVED.** The fix (778f918) parses the stored WHEN condition into column refs and adds them as condition-gating edges with `merge_column_role='condition'`. Reused the existing column-extraction pass as recommended.
-5. **Risk: gap 4 (self-referencing target) is load-bearing and unmitigated today.** Confirmed: clgraph has *no* multi-statement table versioning — `depends_on_tables` / `depends_on_units` in `pipeline_lineage_builder.py:76-108` reference tables by name only, with no N-vs-N+1 snapshot distinction. Step 2's `LEFT JOIN dim_customer t` will collapse onto the same node Step 1 just wrote, producing a self-loop in the pipeline graph. This is the dominant correctness issue for SCD2 and is *not* in the "likely in-scope fixes" list. Decide explicitly: either (a) accept the self-loop and document it as gap 4's known limitation in the notebook, or (b) promote gap 4 to in-scope and design a minimal "statement-index-scoped table ref" extension. Recommend (a) for this iteration — the architectural change for (b) is out of proportion with the stated deliverable.
+5. **~~Risk: gap 4 (self-referencing target) is load-bearing and unmitigated.~~** **RESOLVED.** Promoted to in-scope; implemented statement-scoped table versioning with self-read nodes (`{query_id}:self_read:{table}.{col}`). Design doc: `2026-04-13-gap4-self-referencing-target-design.md`. Tests in `test_cdc_scd_pipeline.py` (16 test classes).
-6. **Risk: gap 7 is worse than the design implies.** Design says "join predicate columns often omitted." Code search confirms JOIN ON predicate columns produce **zero** lineage edges today (no handling in `lineage_builder`). The fact-layer BETWEEN join therefore produces no evidence of `dim_customer.start_time/end_time` influence on `fact_orders.customer_city_at_order` beyond the equi-join on `customer_id`. The test assertion should be a hard `xfail` with a clear message, not a soft "incomplete" — and the notebook's "known limitations" section needs an explicit callout so users don't silently trust the temporal join.
+6. **~~Risk: gap 7 is worse than the design implies.~~** **RESOLVED.** Implemented tagged predicate edges for JOIN ON clause columns with `is_join_predicate=True` metadata. Design doc: `2026-04-13-gap7-join-predicate-columns-design.md`. Tests in `test_join_predicate_columns.py` (38 test methods).
-7. **Risk: gap 9 framing is inaccurate.** `match_columns` extraction (`query_parser.py:612-619`) walks EQ nodes and pairs columns — a literal-bound EQ like `t.is_active = 'Y'` produces an EQ with a literal on one side, which the current code likely filters (column-column pairs only) but may or may not drop cleanly. Before writing the fix, add a unit test that asserts current behavior on `ON t.id = s.id AND t.is_active = 'Y'`, then decide. The design's claim that "literal-bound predicate may be dropped" needs to be reduced to fact before coding.
+7. **~~Risk: gap 9 framing is inaccurate.~~** **RESOLVED.** Fix (dfbd6b7) extracts literal-bound ON predicates and emits `merge_match_filter` edges. Unit tests confirm behavior on `ON t.id = s.id AND t.is_active = 'Y'`.
-8. **Risk: struct-column node naming is underspecified for test assertions.** `_extract_nested_path_from_expression` (`lineage_utils.py:335-395`) emits a `(table_ref, column_name, json_path, json_function, nested_path, access_type)` tuple for `after.id` — meaning the column node shape is not a single string but a structured reference. Tests that assert `"after.id"` as a node label will be brittle. Pin the exact shape by reading one existing struct-access test before writing fixture assertions.
+8. **~~Risk: struct-column node naming is underspecified for test assertions.~~** **RESOLVED.** Gap 1 fix (67859c7) implements the struct fallback using the same `nested_path`/`access_type` metadata pattern. Tests in `test_struct_dot_access.py` assert `edge.nested_path == ".id"`, `edge.access_type == "struct"`, consistent with existing `test_struct_array_subscript.py` conventions.
-9. **Risk: CI execution model.** The notebook acceptance says "runs via `run_all_notebooks.py` with no errors." clgraph is a static parser, so the SQL does not need a live Delta/Databricks engine — but any cell that calls `spark.sql(...)`, `dbutils`, or prints a rendered Delta table will fail in CI. Keep all cells to: (a) strings of SQL, (b) `Pipeline(...)` calls, (c) `pyvis`/graphviz renders against file output only.
+9. **Risk: CI execution model.** **OPEN — integration gap, not correctness bug.** All 25 existing notebooks use static SQL + `Pipeline()` only (no `spark.sql()`, `dbutils`). Convention is sound. However, `run_all_notebooks.py` is **not wired into CI** — `.github/workflows/ci.yml` runs only `uv run pytest tests/`. The acceptance criterion "runs via `run_all_notebooks.py`" is not enforced. Action: add CI step `python run_all_notebooks.py --skip-llm` after CDC/SCD notebook is created.
-10. **Open: de-duplicate with existing MERGE tests.** `tests/test_merge_statements.py` already covers basic MERGE parsing, match_columns, matched/not-matched actions, and edge properties. The new `tests/test_cdc_scd_pipeline.py` should cover *only* the CDC-pipeline-shaped assertions (multi-statement, self-reference, envelope flatten, SCD2-pair semantics) and reference existing tests for the single-statement primitives. Otherwise the gap backlog becomes ambiguous.
+10. **~~Open: de-duplicate with existing MERGE tests.~~** **RESOLVED.** Verified: `test_merge_statements.py` covers single-statement MERGE parsing (match_columns, action column mappings). `test_cdc_scd_pipeline.py` covers multi-statement cross-query semantics (self-reference, topological sort, cross-query edges). Zero overlap — properly scoped by design.
-11. **Open: QUALIFY assertion target.** QUALIFY partition/order columns are emitted via edges with `context="qualify_partition"` / `context="qualify_order"` and `is_qualify_column=True` (`lineage_builder.py:376-442`). When writing gap 2's test, assert on `is_qualify_column`/`qualify_context` metadata, not on the plain node set — that's what will catch the "silently dropped through CTE" regression the design worries about.
+11. **~~Open: QUALIFY assertion target.~~** **RESOLVED.** Gap 2 fix (622e651) promotes qualify metadata from subquery-based dedup patterns. Tests in `test_subquery_dedup_qualify.py` use the same metadata assertion pattern (`is_qualify_column`, `qualify_context`, `qualify_function`) as `test_qualify_clause.py`.
-12. **Open: should gap 4 (self-reference) and gap 7 (join-predicate columns) graduate to their own design docs?** Both are cross-cutting architectural shifts (statement-scoped table refs; a new edge semantic for predicate-conditional columns). If we leave them as one-liners in `TODO.md`, they will rot. Recommend: this effort closes with two short follow-up design stubs, not just backlog entries.
+12. **~~Open: should gap 4 and gap 7 graduate to their own design docs?~~** **RESOLVED.** Both now have dedicated design docs in `docs/superpowers/specs/`.
diff --git a/docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md b/docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md
index 7095f93..e09847e 100644
--- a/docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md
+++ b/docs/superpowers/specs/2026-04-13-gap4-self-referencing-target-design.md
@@ -48,7 +48,7 @@ For Step 2 (`INSERT INTO dim_customer ... LEFT JOIN dim_customer`), `destination
**2. Single `TableNode` per name in `table.py:160-164`.**
`TableDependencyGraph.add_table` deduplicates by name. Both the Step 1 MERGE target and the Step 2 INSERT target resolve to the same `TableNode("dim_customer")`. That node has a single `created_by` slot (set by the first DDL; MERGE/INSERT are DML so they go to `modified_by`). There is no concept of "version N" vs "version N+1."
-**3. Shared column naming in `pipeline_lineage_builder.py:734-736`.**
+**3. Shared column naming in `pipeline_lineage_builder.py:708-736` (`_make_full_name`, physical-table return at line 736).**
`_make_full_name` for physical table columns returns `{table_name}.{column_name}` with no query-scoped prefix. Both Step 1's input `dim_customer.id` and Step 2's output `dim_customer.id` map to the same `full_name`, so the pipeline graph has exactly one `ColumnNode` for each column of `dim_customer`, shared across all statements.
**4. Topological sort self-dependency in `table.py:188-204`.**
@@ -121,8 +121,8 @@ staging_customer_latest.id ──► dim_customer@v2.id (Step 2 INSERT)
**User-facing graph shape:**
```
staging.id ──► dim_customer.id (Step 1 output)
-dim_customer.id ──[snapshot]──► q1:snapshot:dim_customer.id
-q1:snapshot:dim_customer.id ──► dim_customer.id (Step 2 output, via INSERT)
+dim_customer.id ──[snapshot]──► {query_id}:snapshot:dim_customer.id
+{query_id}:snapshot:dim_customer.id ──► dim_customer.id (Step 2 output, via INSERT)
staging.id ──► dim_customer.id (Step 2 output)
```
@@ -131,36 +131,36 @@ staging.id ──► dim_customer.id (Step 2 output)
### Option C: Read-Before-Write Detection with Ordered DML Edges
-**Idea:** Do not create new nodes. Instead, recognize self-referencing tables at the pipeline level, stop excluding them from `source_tables`, and annotate the resulting column edges with a `statement_order` attribute. The self-read edges are distinguished from self-write edges by their `statement_order` and a new `edge_role` ("prior_state_read" vs "write").
+**Idea:** Do not create new nodes. Instead, recognize self-referencing tables at the pipeline level, stop excluding them from `source_tables`, and annotate the resulting column edges with a `statement_order` attribute. The self-read edges are distinguished from normal edges by their `statement_order` and a new `edge_role` (`"prior_state_read"` for within-query self-reads, `"cross_query_self_ref"` for cross-query wiring, `None` for normal edges).
**Data model change:**
- `ParsedQuery` gains `self_referenced_tables: Set[str]` (tables that appear in both `destination_table` and `source_tables` for the same query, or across consecutive queries targeting the same table).
-- `ColumnEdge` gains `statement_order: Optional[int]` and `edge_role: Optional[str]` (values: `"prior_state_read"`, `"write"`, or `None` for normal edges).
+- `ColumnEdge` gains `statement_order: Optional[int]` and `edge_role: Optional[str]` (values: `"prior_state_read"`, `"cross_query_self_ref"`, or `None` for normal edges).
- `TableNode` gains `self_referencing_queries: List[str]` tracking which queries read-before-write.
**Scope of code touched:**
- `multi_query.py:286`: remove the `table_name == destination_table` filter; instead, when a source table matches the destination, add it to both `source_tables` and `self_referenced_tables`.
- `table.py:188-204`: when building query dependencies for a self-referencing query, the query depends on all *prior* modifications to the same table, but not on itself. Add cycle-prevention logic.
- `pipeline_lineage_builder.py`:
- - `_infer_table_name` (line 680-700): for input columns from a self-referenced table, qualify the node name with `{query_id}:self_read:{table_name}` to avoid colliding with the output node.
+ - `_infer_table_name` (starts at line ~654): for input columns from a self-referenced table, qualify the node name with `{query_id}:self_read:{table_name}` to avoid colliding with the output node.
- `_make_full_name`: add a branch for self-read input nodes.
- `_add_query_edges`: stamp edges from self-read nodes with `edge_role="prior_state_read"` and `statement_order`.
- `_add_cross_query_edges`: connect the prior output (from the previous statement) to the self-read input node of the current statement.
- `models.py`: add `statement_order` and `edge_role` to `ColumnEdge`; add `self_referenced_tables` to `ParsedQuery`.
- Exporters: include new edge attributes in JSON/GraphViz output.
-**User-facing graph shape:**
+**User-facing graph shape** (using concrete query IDs `query_0` for Step 1, `query_1` for Step 2):
```
┌── staging.id ──────────────────┐
│ ▼
-staging.id ──► dim_customer.id ──► q1:self_read:dim_customer.id ──► dim_customer.id
- (Step 1 output) (Step 2 reads prior state) (Step 2 output)
+staging.id ──► dim_customer.id ──► query_1:self_read:dim_customer.id ──► dim_customer.id
+ (Step 1 output) (Step 2 reads prior state) (Step 2 output)
```
-The self-read node is query-scoped and represents "dim_customer as it existed before this query ran." Cross-query edges connect Step 1's output `dim_customer.id` to Step 2's `q1:self_read:dim_customer.id`.
+The self-read node is query-scoped and represents "dim_customer as it existed before this query ran." Cross-query edges connect Step 1's output `dim_customer.id` to Step 2's `{query_id}:self_read:dim_customer.id`.
**Pros:** Minimal new concepts; edges carry explicit semantics; physical output nodes keep their canonical names; no version numbering scheme.
-**Cons:** Self-read nodes are still a new node variant; `q1:self_read:dim_customer.id` naming needs renderer support.
+**Cons:** Self-read nodes are still a new node variant; `{query_id}:self_read:dim_customer.id` naming needs renderer support.
## Recommendation: Option C (Read-Before-Write with Ordered DML Edges)
@@ -192,47 +192,96 @@ No edge from `dim_customer` to itself. The self-read in Step 2 is lost.
**After (Option C):**
```
-Step 1 (MERGE):
- staging_customer_latest.id ──[merge,write]──► dim_customer.id
- staging_customer_latest.name ──[merge,write]──► dim_customer.name
- staging_customer_latest.city ──[merge,write]──► dim_customer.city
- dim_customer.id ──[merge,prior_state_read]──► dim_customer.end_time
- dim_customer.is_active ──[merge,prior_state_read]──► dim_customer.end_time
+Step 1 (MERGE, query_0):
+ staging_customer_latest.id ──► dim_customer.id (is_merge=true, edge_role=None)
+ staging_customer_latest.name ──► dim_customer.name (is_merge=true, edge_role=None)
+ staging_customer_latest.city ──► dim_customer.city (is_merge=true, edge_role=None)
+ dim_customer.id ──► dim_customer.end_time (is_merge=true, edge_role=None)
+ dim_customer.is_active ──► dim_customer.end_time (is_merge=true, edge_role=None)
Cross-query (Step 1 output -> Step 2 self-read):
- dim_customer.id ──[cross_query]──► step2:self_read:dim_customer.id
- dim_customer.is_active ──[cross_query]──► step2:self_read:dim_customer.is_active
-
-Step 2 (INSERT):
- staging_customer_latest.id ──[write]──► dim_customer.id
- staging_customer_latest.name ──[write]──► dim_customer.name
- step2:self_read:dim_customer.id ──[prior_state_read]──► dim_customer.id
- step2:self_read:dim_customer.is_active ──[prior_state_read]──► dim_customer.is_active
- step2:self_read:dim_customer.name ──[prior_state_read]──► dim_customer.name
+ dim_customer.id ──► query_1:self_read:dim_customer.id (edge_role=cross_query_self_ref)
+ dim_customer.is_active ──► query_1:self_read:dim_customer.is_active
+ (edge_role=cross_query_self_ref)
+
+Step 2 (INSERT, query_1):
+ staging_customer_latest.id ──► dim_customer.id (edge_role=None)
+ staging_customer_latest.name ──► dim_customer.name (edge_role=None)
+ query_1:self_read:dim_customer.id ──► dim_customer.id (edge_role=prior_state_read)
+ query_1:self_read:dim_customer.is_active ──► dim_customer.is_active
+ (edge_role=prior_state_read)
+ query_1:self_read:dim_customer.name ──► dim_customer.name (edge_role=prior_state_read)
```
+Note: `is_merge_operation` and `merge_action` are existing `ColumnEdge` fields; `edge_role` is the new field added by this design. They are independent attributes on the same edge, not comma-separated compound values.
+
The key difference: `dim_customer` columns that Step 2 reads from the LEFT JOIN flow through query-scoped `self_read` nodes, which are connected to Step 1's output via cross-query edges. The graph is a DAG (no self-loops).
## Implementation Sketch
-### Phase 1: Stop filtering self-references
+### Phase 1: Stop filtering self-references (source-scope only)
File: `multi_query.py`, method `_extract_source_tables`.
-Change: when `table_name == destination_table`, do not `continue`. Instead, add the table to both `source_tables` and a new field `self_referenced_tables` on `ParsedQuery`.
+Change: when `table_name == destination_table`, check whether the `Table` node appears in a source scope (FROM, JOIN, USING subquery) rather than in the target slot. `sqlglot.ast.find_all(exp.Table)` walks into the target slot (`Insert.this`, `Merge.this`, `Update.this`), so naively removing the filter would mark every INSERT/MERGE as self-referencing even when the body never reads the target.
+
+**Detection heuristic:** a Table node is a self-reference if it is a descendant of one of the source-scope AST containers: `exp.From`, `exp.Join`, `exp.Subquery`, or `exp.Merge.args["using"]` — but NOT if its immediate parent chain leads to `Insert.this`, `Merge.this`, `Update.this`, or `Create.this`.
```python
-if table_name == destination_table:
- self_referenced = True
- # Still add to source_tables — it IS a source
+# Build set of Table nodes that sit in the target slot (not source scope).
+# IMPORTANT: for INSERT INTO t (col1, col2) SELECT ... and CREATE TABLE t (col1 ...),
+# sqlglot wraps the target in an exp.Schema (not exp.Table). The Schema's .this
+# child is the actual exp.Table. We must collect both the Schema and its inner
+# Table to avoid false-positive self-reference detection.
+target_table_nodes = set()
+if isinstance(ast, (exp.Create, exp.Insert, exp.Merge, exp.Update, exp.Delete)):
+ if ast.this:
+ # ast.this may be exp.Table, exp.Schema, or other node
+ target_table_nodes.add(id(ast.this))
+ # Also collect all Table nodes nested inside the target slot
+ # (handles Schema -> Table, Schema -> Table with column list, etc.)
+ for t in ast.this.find_all(exp.Table):
+ target_table_nodes.add(id(t))
+
+for table_node in ast.find_all(exp.Table):
+ table_name = self._get_table_name(table_node, tokenizer)
+ if not table_name:
+ continue
+ # Skip Table nodes that are the target slot itself
+ if id(table_node) in target_table_nodes:
+ continue
+ # Self-reference: table appears in source scope with same name as destination
+ if table_name == destination_table:
+ self_referenced_tables.add(table_name)
+ # Still add to source_tables — it IS a source
+ # ... existing CTE alias filter ...
+ tables.add(table_name)
```
+**Note on DELETE statements:** `_extract_operation_and_destination` (`multi_query.py:212-257`) currently does not handle `exp.Delete`. The following changes are required:
+
+(a) Add `DELETE = "DELETE"` to the `SQLOperation` enum (`models.py:817-841`) and include `DELETE` in `SQLOperation.is_dml()` so it is recognized as a DML operation. Note: the existing `DELETE_AND_INSERT = "DELETE+INSERT"` (`models.py:835`) is a pre-existing composite operation for the atomic delete-then-insert pattern, parsed as a single unit. The new `DELETE` is for standalone DELETE statements. Both enum values coexist and serve distinct purposes. `DELETE` must be added to the `is_dml()` method's member list alongside the existing DML operations.
+
+(b) Add an `exp.Delete` branch to `_extract_operation_and_destination` that extracts the target table name from `exp.Delete.this` and returns `(SQLOperation.DELETE, table_name)`.
+
+(c) Add `exp.Delete` to the `isinstance` check at `_extract_source_tables` line 274 (the one that sets `destination_table` via `_extract_operation_and_destination`), not just the `target_table_nodes` check. Without this, DELETE statements will not populate `destination_table`, and the self-reference detection logic will not fire for DELETE targets.
+
+Without these three changes, DELETE statements will not be recognized as targeting a table, and Test 7 (DELETE-then-INSERT) will fail.
+
+This ensures `INSERT INTO dim_customer SELECT ... FROM staging` does NOT mark `dim_customer` as self-referenced (the Table node is in the target slot only), while `INSERT INTO dim_customer SELECT ... FROM staging LEFT JOIN dim_customer` DOES (the JOIN's Table node is in source scope).
+
+Acceptance criterion #3 updated to match: `_extract_source_tables` does not filter `destination_table` from `source_tables` when the table appears in a **source scope** (FROM/JOIN/USING), as detected by AST node identity rather than name matching alone.
+
### Phase 2: Cycle-safe dependency graph
+#### Phase 2a: `_build_query_dependencies` self-exclusion
+
File: `table.py`, method `_build_query_dependencies`.
Change: when computing deps for a query, exclude the query itself from its own dependency set. A query that writes to and reads from `dim_customer` should depend on *prior* queries that wrote `dim_customer`, not on itself.
+Note: this self-exclusion applies **globally** to all queries in `_build_query_dependencies`, not only to self-referencing ones. This is correct because a query should never depend on itself regardless of the reason. The guard is harmless for non-self-referencing queries (their `query_id` never appears in their own `modified_by`).
+
```python
if table_node.created_by and table_node.created_by != query_id:
deps[query_id].add(table_node.created_by)
@@ -241,17 +290,134 @@ for mod_id in table_node.modified_by:
deps[query_id].add(mod_id)
```
+#### Phase 2b: `_build_table_dependencies` self-exclusion
+
+File: `table.py`, method `_build_table_dependencies` (lines 206-243).
+
+This method builds table-level dependencies and is used by `get_dependencies`, `get_downstream`, and `get_execution_order`. Without a fix, a self-referencing table like `dim_customer` would list itself as its own dependency (since it appears in both source and destination roles). This creates a spurious self-dependency cycle at the table level even after Phase 2a fixes the query level.
+
+Change: when iterating source tables for a given table, skip entries where `source_table == table_name`:
+
+```python
+for source_table in query.source_tables:
+ if source_table == table_name:
+ continue # skip self-dependency
+ if source_table in self.tables:
+ deps[table_name].add(source_table)
+```
+
### Phase 3: Self-read node creation
-File: `pipeline_lineage_builder.py`, methods `_infer_table_name` and `_make_full_name`.
+Files: `pipeline_lineage_builder.py`, methods `_make_full_name`, `_add_query_columns`, and `_is_physical_table_column`.
+
+**3a. Naming (`_make_full_name`, line 708).** Add a branch before the physical-table check: if the node is an input-layer column whose inferred table is in `query.self_referenced_tables`, return query-scoped naming instead of shared physical naming.
+
+```python
+def _make_full_name(self, node: ColumnNode, query: ParsedQuery) -> str:
+ table_name = self._infer_table_name(node, query)
+ unit_id = node.unit_id
+
+ # Self-read input columns get query-scoped naming to avoid colliding
+ # with the physical output node for the same table.column.
+ #
+ # Fallback: _infer_table_name may return None for ambiguous columns
+ # (e.g., when no alias qualifier is present). For input-layer columns
+ # in a query with self-referenced tables, fall back to node.table_name
+ # (the raw alias/table name from the SQL AST) before giving up.
+ if table_name is None and node.layer == "input":
+ candidate = node.table_name
+ if candidate and candidate in getattr(query, "self_referenced_tables", set()):
+ table_name = candidate
+
+ if (
+ node.layer == "input"
+ and table_name
+ and table_name in getattr(query, "self_referenced_tables", set())
+ ):
+ return f"{query.query_id}:self_read:{table_name}.{node.column_name}"
+
+ # ... existing physical / CTE / subquery branches unchanged ...
+```
+
+When `_infer_table_name` returns `None` for an input-layer column in a query with self-referenced tables, the implementation attempts to match by `node.table_name` (the raw alias from the SQL AST) before falling through to physical-table naming. Without this fallback, ambiguous columns would silently receive shared physical naming, causing the exact node collapse this design is intended to fix. The fallback is safe because `node.table_name` is only used when it matches a known self-referenced table; for non-self-referenced tables, the existing physical-table path applies as before.
+
+The `getattr` guard ensures backward compatibility if `self_referenced_tables` is not yet populated on older `ParsedQuery` instances (e.g., during incremental rollout or tests that construct `ParsedQuery` manually without the new field).
+
+**3b. Node attributes (`_add_query_columns`, line 285).** When creating the `ColumnNode` for a self-read input column, set `node_type="self_read"` on the pipeline-level node (not the query-lineage-level node). This enables renderers to display a human-friendly label like "dim_customer (prior state)." `operation` retains the original query-lineage-level `node.node_type` (e.g., `"direct_column"`, `"expression"`); only the pipeline-level `node_type` is set to `"self_read"`.
+
+**Diagnostic note:** In `_add_query_columns`, when a column with a physical-table `full_name` is dropped due to the deduplication guard (`if full_name in pipeline.columns: continue`) AND the current query has non-empty `self_referenced_tables`, emit a debug-level log warning. This helps diagnose cases where self-read detection fails and a self-read column silently falls back to physical naming, colliding with an existing output node.
+
+**Recommendation:** Both Phase 3a (`_make_full_name`) and Phase 3b (`_add_query_columns`) independently compute whether a node is a self-read column using slightly different expressions. To avoid drift if one evolves without the other, extract a shared helper `_is_self_read_column(node, query) -> bool` that encapsulates the self-read detection logic (`node.layer == "input"` and inferred table name is in `query.self_referenced_tables`). Both call sites should delegate to this helper.
+
+```python
+# Inside the node creation loop in _add_query_columns:
+is_self_read = (
+ node.layer == "input"
+ and (self._infer_table_name(node, query) or node.table_name)
+ in getattr(query, "self_referenced_tables", set())
+)
+
+column = ColumnNode(
+ column_name=node.column_name,
+ table_name=self._infer_table_name(node, query) or node.table_name,
+ full_name=full_name,
+ query_id=query.query_id,
+ unit_id=node.unit_id,
+ node_type="self_read" if is_self_read else node.node_type,
+ layer=node.layer,
+ # ... remaining fields unchanged ...
+)
+```
+
+**3d. Alias resolution for self-referenced tables.** When SQL aliases a self-referenced table (e.g., `LEFT JOIN dim_customer t`), the query-lineage-level `ColumnNode` has `table_name="t"` (the alias), not `"dim_customer"`. Phase 3a's fallback checks `node.table_name in self_referenced_tables`, but `self_referenced_tables` contains `"dim_customer"`, not `"t"`. Without alias resolution, aliased self-references are never detected as self-reads.
+
+Alias-to-table resolution belongs in `MultiQueryParser`, where `_get_table_name` and the `TemplateTokenizer` are available. The resolved mapping is stored on `ParsedQuery` so that `PipelineLineageBuilder` can consume it without needing AST-walking or tokenizer access.
+
+**During parsing** (`MultiQueryParser._parse_single_query` or `_extract_source_tables`): after populating `self_referenced_tables`, build the alias mapping in the same scope that already walks `exp.Table` nodes and calls `_get_table_name`.
+
+```python
+# Inside _extract_source_tables (or _parse_single_query), after self_referenced_tables is populated:
+self_ref_aliases: Dict[str, str] = {}
+for table_node in ast.find_all(exp.Table):
+ resolved_name = self._get_table_name(table_node, tokenizer)
+ if resolved_name in self_referenced_tables:
+ alias = table_node.alias
+ if alias:
+ self_ref_aliases[alias] = resolved_name
+
+# Store on ParsedQuery:
+# parsed_query.self_ref_aliases = self_ref_aliases
+```
+
+`ParsedQuery` gains a new field: `self_ref_aliases: Dict[str, str]` — a mapping from SQL alias to resolved table name, populated only for tables in `self_referenced_tables`.
-Change: for input-layer columns whose table is in `query.self_referenced_tables`, produce a query-scoped name like `{query_id}:self_read:{table_name}.{column_name}` instead of the shared physical name.
+**At `PipelineLineageBuilder` level** (Phase 3a fallback and Phase 3b): read `query.self_ref_aliases` directly — no AST walking or `_get_table_name` calls needed.
-### Phase 4: Cross-query wiring
+```python
+# In the self-read check (Phase 3a fallback and condition):
+candidate = table_name or node.table_name
+resolved = query.self_ref_aliases.get(candidate, candidate)
+if node.layer == "input" and resolved in query.self_referenced_tables:
+ return f"{query.query_id}:self_read:{resolved}.{node.column_name}"
+```
+
+The `self_ref_aliases` mapping must be available to both Phase 3a (`_make_full_name`) and Phase 3b (`_add_query_columns`). Store it as a per-query local or pass it through the shared helper `_is_self_read_column`. All `candidate in self_referenced_tables` checks in Phases 3a and 3b must resolve through this alias mapping first.
+
+**3c. Physical-table exclusion (`_is_physical_table_column`, line 757).** Self-read input columns must NOT be classified as physical table columns (which would route them to shared naming in `_make_full_name`). The current `_make_full_name` code calls `_is_physical_table_column` at line 732 and branches on it at line 734 **before** the self-read check would execute. Therefore, `_make_full_name` must be restructured so the self-read branch (from Phase 3a) is evaluated **before** the existing `_is_physical_table_column` call. The Phase 3a pseudocode already shows this correct ordering (the self-read check appears first); the implementation must match that ordering by moving the `_is_physical_table_column` call below the self-read branch.
+
+### Phase 4: Cross-query wiring (column-granular, topo-ordered)
File: `pipeline_lineage_builder.py`, method `_add_cross_query_edges`.
-Change: after the existing cross-query edge logic, add a pass that connects prior-statement output columns to self-read input columns. For each self-referenced table in a query, find the most recent query that wrote to that table (from topological order), and connect its output columns to the self-read nodes.
+Change: the self-read cross-query wiring needs its own dedicated loop (or a separate method such as `_add_self_read_cross_query_edges`), because the existing `_add_cross_query_edges` loop starts with `if not table_node.created_by: continue`, which skips DML-only tables. MERGE/INSERT targets have `created_by=None` (they use `modified_by`), so the existing loop filters out the exact tables this feature targets. Do not attempt to add self-read wiring inside the existing loop.
+
+The new loop connects prior-statement output columns to self-read input columns. For each self-referenced table in a query, find the most recent query **that wrote each specific column** (from topological order, not `modified_by` list order), and connect that column's output node to the self-read input node. This lookup assumes all output nodes already exist when Phase 4 executes; this is guaranteed by the existing `PipelineLineageBuilder.build()` call sequence, which calls `_add_query_columns` (creating all output nodes) before `_add_cross_query_edges` (wiring cross-query relationships).
+
+**Note on DELETE queries:** DELETE queries produce no column lineage because `_extract_select_from_query` returns `None` for `exp.Delete`. The cross-query wiring loop naturally skips them (no output nodes to connect from). No special handling is needed — the loop simply finds no output columns for DELETE queries when searching for "most recent writer" of a given column.
+
+Column-granular lookup matters when multiple prior statements write to the same table but touch different columns. Example: Step 1 writes `id` + `name`, Step 2 writes only `name`, Step 3 self-reads both — Step 3's `self_read:id` should wire to Step 1 (only writer of `id`), while `self_read:name` wires to Step 2 (most recent writer of `name`).
+
+Implementation: the `sorted_query_ids` list from `PipelineLineageBuilder.build` (line 68, topological sort output) is the authoritative execution order. Pass it to the cross-query edge builder. For each self-read column, walk `sorted_query_ids` in reverse up to (but not including) the current query, and find the most recent query whose output nodes include that `{table_name}.{column_name}`. Connect that output node to the self-read input node. If no prior writer is found for a column, connect to the original source-state node (the physical input column from the table's pre-pipeline state). If no such pre-pipeline source-state node exists in `pipeline.columns` (e.g., the table is only ever self-referenced and was never an explicit source in a non-self-referencing query), the implementation must create one — a physical input `ColumnNode` with `full_name="{table_name}.{column_name}"`, `layer="input"`, representing the table's state before the pipeline began.
### Phase 5: Edge annotation
@@ -259,6 +425,33 @@ File: `models.py`, class `ColumnEdge`.
Change: add `statement_order: Optional[int] = None` and `edge_role: Optional[str] = None`. Populate these during Phase 3 and Phase 4.
+`statement_order` is the **topological sort index** (0-based position in the `sorted_query_ids` list from `PipelineLineageBuilder.build`), not the insertion order or `query_id` string. This matches the execution-order semantics required by Phase 4 and aligns with Attack 8's design revision.
+
+`statement_order` is populated on **all** edges (not only self-read edges) for consistency — every edge carries the topo sort index of the query that produced it. This enables consumers to sort or filter edges by execution order without checking `edge_role` first. To make this possible, `PipelineLineageBuilder` stores `sorted_query_ids` as an instance attribute (set once during `build()` after topological sort completes), and `_add_query_edges` reads `self.sorted_query_ids` to look up the index for the current `query_id` when stamping each edge.
+
+`edge_role` values: `"prior_state_read"` (self-read edge within a query), `"cross_query_self_ref"` (cross-query edge connecting prior output to self-read input), or `None` (normal edge). These values are string literals, consistent with the existing `edge_type` convention on `ColumnEdge` (which also uses string literals like `"lineage"`, `"rename"`, etc.). An enum is not introduced to avoid a new import dependency for a three-value field.
+
+### Phase 6: `get_self_read_columns` convenience method
+
+File: `pipeline.py`, class `Pipeline`.
+
+```python
+def get_self_read_columns(self, table_name: str) -> list[ColumnNode]:
+ return [
+ node
+ for node in self.column_graph.nodes
+ if node.node_type == "self_read" and node.table_name == table_name
+ ]
+```
+
+This method filters the pipeline's column graph for nodes tagged `node_type="self_read"` with a matching `table_name`, providing a convenient API for acceptance criterion 7. It returns an empty list for tables that are never self-referenced.
+
+**`get_column` disambiguation:** After this change, `get_column(table_name, column_name)` could match both an output node and a self-read node for the same table and column. To avoid nondeterministic results, `get_column` should prefer `layer="output"` over `layer="input"` when multiple candidates match, regardless of whether `query_id` is specified. Even with a `query_id`, there can be two matches for the same `(table_name, column_name, query_id)` tuple: a self-read input node and an output node. Implementation: filter candidates as before, then prefer `layer="output"` over `layer="input"` when multiple matches exist.
+
+Note on backward compatibility: `ColumnEdge.__eq__` (`models.py:632-639`) compares only `from_node`, `to_node`, and `edge_type` — not all fields. `__hash__` uses the same three fields. Adding optional fields with `None` defaults does not change equality or hash behavior. All existing test assertions use either attribute access (`edge.is_merge_operation`) or keyword construction (`ColumnEdge(from_node=..., to_node=..., edge_type=...)`) — neither is affected. Verified by grep: 14 `ColumnEdge(` constructions across `test_metadata_propagation.py` and `test_pipeline_diff.py`, all use keyword arguments with no positional args.
+
+`ColumnNode.__hash__` and `__eq__` use `full_name` only, so self-read nodes with unique `{query_id}:self_read:*` names introduce no collision risk with existing physical or CTE column nodes.
+
## Scope
### In scope
@@ -273,27 +466,30 @@ Change: add `statement_order: Optional[int] = None` and `edge_role: Optional[str
- **Cross-notebook state.** Each `Pipeline` instance is self-contained; table state does not carry across separate `Pipeline()` calls.
- **Idempotent re-runs.** The design assumes statements execute once in order. Re-execution semantics (e.g., Airflow retries) are not modeled.
- **Gap 7 (join-predicate columns).** The self-read nodes will not capture join-predicate influence on output columns. That is a separate design.
-- **MERGE within a single statement that reads its own target.** A single MERGE statement's `USING` subquery referencing the target table is already handled by the query parser's alias resolution within one `QueryUnit`. This design addresses *cross-statement* self-reference only.
+- **MERGE within a single statement that reads its own target via USING.** A single MERGE statement's `USING` subquery referencing the target table is already handled by the query parser's alias resolution within one `QueryUnit`. The USING clause creates an explicit source alias, so the target and source are already distinct within the single-query lineage builder. This non-goal applies specifically to MERGE's USING clause. **Note:** single-statement `INSERT INTO t SELECT ... FROM t` is **in scope** — it has a genuine self-reference (the FROM/JOIN reads the target's prior state) that is not resolved by alias separation. Test 9 covers this case. The distinction: MERGE's USING clause already separates target from source by design; INSERT's FROM clause does not.
## Acceptance Criteria
### Structural
-1. `ParsedQuery` has a `self_referenced_tables: Set[str]` field.
+1. `ParsedQuery` has a `self_referenced_tables: Set[str]` field and a `self_ref_aliases: Dict[str, str]` field (mapping SQL aliases to resolved table names for self-referenced tables only, populated during parsing).
2. `ColumnEdge` has `statement_order: Optional[int]` and `edge_role: Optional[str]` fields.
-3. `_extract_source_tables` does not filter `destination_table` from `source_tables` when the table appears in the query body (not just as the INSERT/MERGE target).
+3. `_extract_source_tables` does not filter `destination_table` from `source_tables` when the table appears in a **source scope** (FROM/JOIN/USING), as detected by AST node identity (target-slot vs. source-scope), not name matching alone. Table nodes in the target slot (`Insert.this`, `Merge.this`, etc.) are still excluded.
4. `_build_query_dependencies` does not create self-dependency cycles.
5. Self-read input columns use query-scoped naming (`{query_id}:self_read:{table}.{column}`).
+6. Self-read `ColumnNode` instances have `node_type="self_read"`.
+7. `Pipeline` exposes `get_self_read_columns(table_name: str) -> List[ColumnNode]` returning all self-read nodes for a given physical table.
+8. `Pipeline.trace_column_forward` and `Pipeline.trace_column_backward` traverse all edge roles (including `"prior_state_read"` and `"cross_query_self_ref"`) by default. Note: the underlying BFS functions in `lineage_tracer.py` are module-level functions, not methods on a `LineageTracer` class. Verify that these functions traverse all edge types without filtering by `edge_role`.
### Functional
-6. For the SCD2 two-step fixture (MERGE + INSERT on `dim_customer`), the pipeline graph contains:
+9. For the SCD2 two-step fixture (MERGE + INSERT on `dim_customer`), the pipeline graph contains:
- Edges from `staging_customer_latest` columns to `dim_customer` output columns (both steps).
- Self-read nodes for `dim_customer` columns read by Step 2.
- Cross-query edges from Step 1's `dim_customer` output to Step 2's self-read nodes.
- No self-loop edges (no edge where `from_node.table_name == to_node.table_name` on the same physical node with no intermediate).
-7. Impact analysis: "what depends on `staging_customer_latest.city`?" returns `dim_customer.city` (both steps) and `dim_customer.end_time` (Step 1 close), `dim_customer.is_active` (Step 1 close).
-8. Backward lineage: "where does `dim_customer.id` come from?" returns both `staging_customer_latest.id` (direct) and the self-read chain through prior `dim_customer.id`.
+10. Impact analysis: "what depends on `staging_customer_latest.city`?" returns `dim_customer.city` (both steps) and `dim_customer.end_time` (Step 1 close), `dim_customer.is_active` (Step 1 close).
+11. Backward lineage: "where does `dim_customer.id` come from?" returns both `staging_customer_latest.id` (direct) and the self-read chain through prior `dim_customer.id`.
### Test Plan
@@ -302,9 +498,12 @@ Tests live in `tests/test_cdc_scd_pipeline.py` (created by the parent design's d
**Test 1: Self-reference detected.**
Given the SCD2 two-step SQL, assert that Step 2's `ParsedQuery.self_referenced_tables` contains `"dim_customer"` and `ParsedQuery.source_tables` contains `"dim_customer"`.
-**Test 2: No topological cycle.**
+**Test 2a: No topological cycle.**
Given the SCD2 two-step SQL, assert that `pipeline.table_graph.topological_sort()` succeeds (does not raise `CycleError`) and returns Step 1 before Step 2.
+**Test 2b: Direct self-exclusion in `_build_query_dependencies`.**
+Given the SCD2 two-step SQL, call `pipeline.table_graph._build_query_dependencies()` directly and assert that `deps["query_1"]` does not contain `"query_1"` (no self-dependency). This tests the Phase 2 fix independently of topological sort's cycle-handling behavior.
+
**Test 3: Self-read nodes exist.**
Assert that the pipeline column graph contains nodes matching `*:self_read:dim_customer.*` with `layer="input"`.
@@ -318,10 +517,38 @@ Assert that no edge in `pipeline.column_graph.edges` has `from_node.full_name ==
Starting from `staging_customer_latest.city`, forward-traverse the graph and assert `dim_customer.city` is reachable through both Step 1 and Step 2 paths.
**Test 7: DELETE-then-INSERT pattern.**
-Same shape as SCD2 but using `DELETE FROM dim_customer WHERE ...` followed by `INSERT INTO dim_customer SELECT ...`. Assert self-read nodes and cross-query edges are created.
+Using `DELETE FROM dim_customer WHERE ...` followed by `INSERT INTO dim_customer SELECT ... FROM staging LEFT JOIN dim_customer ...`. Assert:
+(a) DELETE is recognized as DML targeting `dim_customer` (requires `_extract_operation_and_destination` to handle `exp.Delete`).
+(b) INSERT's self-read nodes exist for the `dim_customer` columns referenced in the LEFT JOIN.
+(c) INSERT's self-read nodes wire to the original source-state `dim_customer` columns (pre-pipeline), not to DELETE output — because DELETE produces no column lineage (no output columns).
+(d) No cross-query edges from DELETE to INSERT exist (since DELETE has no column output nodes to connect from).
**Test 8: Non-self-referencing pipeline unchanged.**
-A pipeline with `CREATE TABLE a AS SELECT ... FROM b; CREATE TABLE c AS SELECT ... FROM a` should produce zero self-read nodes and zero `edge_role="prior_state_read"` edges. Regression guard.
+A pipeline with `CREATE TABLE a AS SELECT ... FROM b; CREATE TABLE c AS SELECT ... FROM a` should produce zero self-read nodes, zero `edge_role="prior_state_read"` edges, **and `self_referenced_tables == set()` on every `ParsedQuery`**. This locks down both the detection step (ParsedQuery field) and the wiring step (graph nodes/edges) independently.
+
+**Test 9: Single-statement self-reference.**
+`INSERT INTO t SELECT a, b FROM source LEFT JOIN t ON source.id = t.id` as a single-query pipeline. Assert self-read nodes exist for `t.id` and no self-loop edges.
+
+**Test 10: `statement_order` reflects topo sort output for independent statements.**
+Submit two DML statements that both target `dim_customer` but have no inter-dependency (neither reads a table the other creates — e.g., both read only from `staging_customer_latest`). Since there is no dependency edge between them, topo sort may return them in any valid order. The test asserts that `statement_order` on edges matches the topo sort output (whatever it is), not the submission order. Specifically: retrieve `sorted_query_ids` from the pipeline's topological sort, then for each edge verify that `edge.statement_order` equals the index of `edge.query_id` in `sorted_query_ids`. Note: for independent statements the topo sort order may coincide with submission order — the test validates the assignment mechanism, not forced reordering. Forced reordering requires an explicit dependency between statements (covered by Tests 2a, 4, and 11).
+
+**Test 11: Column-granular cross-query wiring (three-step chain).**
+Three-query pipeline: Step 1 writes `{id, name, city}` to `dim_customer`, Step 2 writes only `{name}` to `dim_customer`, Step 3 self-reads `{id, name}` from `dim_customer`. Assert that Step 3's `self_read:id` wires to Step 1's output (the only writer of `id`) and `self_read:name` wires to Step 2's output (the most recent writer of `name`). This validates Phase 4's per-column "most recent writer" resolution.
+
+**Test 12: INSERT with explicit column list does not spuriously self-reference.**
+`INSERT INTO dim_customer (id, name, city) SELECT s.id, s.name, s.city FROM staging s` (no self-read in the body). Assert `self_referenced_tables == set()` — the target's `exp.Schema`-wrapped Table node must be excluded by Phase 1's `target_table_nodes` set.
+
+**Test 13: MERGE with USING does not spuriously self-reference.**
+`MERGE INTO dim_customer t USING staging s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.name = s.name WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.id, s.name)` as a single-query pipeline. Assert `self_referenced_tables == set()`. The MERGE's USING clause already separates target from source by design (the target `t` is resolved via `Merge.this`, which is in the target slot). This validates the non-goal boundary claim that MERGE's USING-based self-reference is handled by existing alias resolution and should not trigger self-read node creation. Additionally, the test should include a structural assertion: verify that `_extract_source_tables` returns only `{"staging"}` (not `{"staging", "dim_customer"}`) for the MERGE statement, confirming the target's `exp.Table` node is correctly identified as target-slot-only and excluded from `source_tables`.
+
+**Test 14: `get_self_read_columns` API.**
+Given the SCD2 two-step SQL, assert that `pipeline.get_self_read_columns("dim_customer")` returns a non-empty list of `ColumnNode` instances, each with `node_type="self_read"` and `table_name="dim_customer"`. For the non-self-referencing pipeline from Test 8, assert it returns an empty list.
+
+**Test 15: LineageTracer traverses self-read edges.**
+Given the SCD2 two-step SQL, use `pipeline.trace_column_forward("staging_customer_latest", "city")` and assert the result includes `dim_customer.city` reached via the self-read path (through `query_1:self_read:dim_customer.*` nodes). Similarly, `pipeline.trace_column_backward("dim_customer", "id")` must include the self-read chain. This validates acceptance criterion 8 (Pipeline traversal).
+
+**Test 16: Aliased self-reference detected.**
+`INSERT INTO dim_customer SELECT s.id, s.name, s.city, s.email FROM staging s LEFT JOIN dim_customer t ON s.id = t.id WHERE t.id IS NULL` as a single-query pipeline. The alias `t` refers to `dim_customer`. Assert that self-read nodes are created for the aliased reference (e.g., `query_0:self_read:dim_customer.id`), not silently missed because `node.table_name == "t"` fails to match `self_referenced_tables == {"dim_customer"}`. This validates Phase 3d's alias resolution.
## Hostile Review
@@ -329,9 +556,9 @@ A pipeline with `CREATE TABLE a AS SELECT ... FROM b; CREATE TABLE c AS SELECT .
**Concern:** Users who call `pipeline.get_column("dim_customer", "id")` or iterate `pipeline.columns` filtering by `table_name == "dim_customer"` will not find the self-read nodes. Impact queries that walk edges will silently miss the self-read path.
-**Response:** This is valid. The `get_column` API already accepts an optional `query_id` parameter (`pipeline.py:201`). Self-read nodes should be findable via `get_column("dim_customer", "id", query_id="step2")`. Additionally, impact-analysis traversal must be updated to follow cross-query edges through self-read nodes. The `LineageTracer` (lazy-loaded in `pipeline.py:148`) must be audited to confirm it traverses `edge_role="prior_state_read"` edges.
+**Response:** This is valid. The `get_column` API already accepts an optional `query_id` parameter (`pipeline.py:201`). Self-read nodes should be findable via `get_column("dim_customer", "id", query_id="query_1")`. Additionally, impact-analysis traversal must be updated to follow cross-query edges through self-read nodes. The `LineageTracer` (lazy-loaded in `pipeline.py:148`) must be audited to confirm it traverses `edge_role="prior_state_read"` edges.
-**Design revision:** Add a convenience method `pipeline.get_self_read_columns(table_name)` that returns all self-read nodes for a given physical table. Document that `pipeline.columns` includes self-read nodes (they are not hidden). Ensure `LineageTracer.trace_forward` and `trace_backward` traverse all edge roles by default.
+**Design revision:** Add a convenience method `pipeline.get_self_read_columns(table_name)` that returns all self-read nodes for a given physical table. Document that `pipeline.columns` includes self-read nodes (they are not hidden). Ensure `LineageTracer.trace_column_forward` and `trace_column_backward` traverse all edge roles by default.
### Attack 2: Graph rendering blow-up — self-read nodes double the node count for self-referenced tables
@@ -339,7 +566,7 @@ A pipeline with `CREATE TABLE a AS SELECT ... FROM b; CREATE TABLE c AS SELECT .
**Response:** Accepted limitation with mitigation. The number of self-read nodes scales with `(number of self-referencing statements - 1) * columns_read`, not total columns. In practice, SCD2 Step 2 reads only the match columns (id, is_active) from the prior state, not all 50 columns. The column extractor only creates input nodes for columns actually referenced in the SQL.
-For rendering, add an optional `collapse_self_reads=True` parameter to the GraphViz/pyvis exporters that merges self-read nodes back into their physical counterparts (restoring the current collapsed view) for visual simplicity while keeping the full graph for programmatic queries.
+For rendering, a future follow-up could add an optional `collapse_self_reads=True` parameter to the GraphViz/pyvis exporters that merges self-read nodes back into their physical counterparts (restoring the current collapsed view) for visual simplicity while keeping the full graph for programmatic queries. **This exporter enhancement is deferred to a separate PR** to keep this change focused on correctness. The core graph is always complete; rendering collapse is a presentation concern. **Tracked as:** Deferred Item D1 in the Deferred Work section below.
### Attack 3: Performance on large pipelines — extra pass over all queries for self-reference detection
@@ -353,11 +580,11 @@ For rendering, add an optional `collapse_self_reads=True` parameter to the Graph
**Response:** This is a real concern but is inherent to static analysis. clgraph does not execute SQL; it infers lineage from the SQL text. The "read prior state" assumption is correct for the dominant use case (Databricks/Delta SCD2) and is a reasonable default for all dialects. For dialects where MERGE has different visibility semantics, the lineage is still a useful approximation.
-**Design revision:** Add a `dialect_note` to the self-read edge metadata explaining the assumption: "Assumes snapshot isolation: self-read sees table state before this statement executed." This is informational, not behavioral.
+**Design revision:** Add a `dialect_note` to the self-read edge metadata explaining the assumption: "Assumes snapshot isolation: self-read sees table state before this statement executed." This is informational, not behavioral. **Tracked as:** Deferred Item D2 in the Deferred Work section below. The `dialect_note` field is a nice-to-have that does not affect correctness; it can land in a follow-up PR without blocking this design.
### Attack 5: User mental-model confusion — what IS a self-read node?
-**Concern:** Users see `step2:self_read:dim_customer.id` in the graph and don't understand what it means. The naming convention (`query_id:self_read:table.column`) is internal jargon.
+**Concern:** Users see `{query_id}:self_read:dim_customer.id` in the graph and don't understand what it means. The naming convention (`query_id:self_read:table.column`) is internal jargon.
**Response:** Valid. The naming follows the existing convention for CTE nodes (`query_id:cte:name.column`) and subquery nodes (`query_id:subq:name.column`), both of which are already exposed in the graph. Users who understand CTE nodes will understand self-read nodes by analogy.
@@ -367,9 +594,13 @@ For rendering, add an optional `collapse_self_reads=True` parameter to the Graph
**Concern:** Adding `statement_order` and `edge_role` to `ColumnEdge` means every test that constructs or asserts on edges must account for new optional fields. If defaults are `None`, existing tests should pass, but assertion helpers that do exact-match comparisons may break.
-**Response:** Both fields default to `None`. The `ColumnEdge` dataclass uses keyword arguments with defaults, so existing construction code is unaffected. Assertion helpers in tests use attribute access (`edge.is_merge_operation`, `edge.merge_action`), not positional matching. No existing test should break.
+**Response:** Both fields default to `None`. The `ColumnEdge` dataclass uses keyword arguments with defaults, so existing construction code is unaffected.
-**Verification:** grep all test files for `ColumnEdge(` construction and `assert.*edge` patterns to confirm no exact-match comparisons exist.
+**Verification (completed):** Grep results confirm safety:
+- 14 `ColumnEdge(` constructions across `test_metadata_propagation.py` (12) and `test_pipeline_diff.py` (2). All use keyword arguments (`from_node=..., to_node=..., edge_type=...`), no positional args.
+- `ColumnEdge.__eq__` (`models.py:632-639`) compares only `from_node`, `to_node`, and `edge_type` — not all fields. `__hash__` (`models.py:629-630`) uses the same three fields.
+- All test edge assertions use attribute access (`edge.is_merge_operation`, `edge.qualify_function`, etc.) or count checks (`len(edges) == N`), not full-object equality comparisons.
+- No existing test should break.
### Attack 7: What happens when the same table is self-referenced within a SINGLE statement?
@@ -389,10 +620,19 @@ This is actually **desirable** — a single `INSERT INTO t SELECT ... FROM t` do
**Design revision:** Phase 4 (cross-query wiring) must use the topological sort order, not `modified_by` list order, to determine the "most recent prior writer." The `sorted_query_ids` list from `PipelineLineageBuilder.build` (line 68) is the correct ordering. Store it and pass it to the cross-query edge builder.
-### Revised Test Plan Addition
+*(Tests 9–15 are in the main Test Plan section above.)*
-**Test 9: Single-statement self-reference.**
-`INSERT INTO t SELECT a, b FROM source LEFT JOIN t ON source.id = t.id` as a single-query pipeline. Assert self-read nodes exist for `t.id` and no self-loop edges.
+## Deferred Work
+
+Items deferred from this design to keep the PR focused on correctness. Each should be tracked as a follow-up issue or PR.
+
+**D1: Exporter collapse mode for self-read nodes.**
+Add an optional `collapse_self_reads=True` parameter to GraphViz/pyvis exporters that merges self-read nodes back into their physical counterparts for visual simplicity. Origin: Attack 2.
+
+**D2: `dialect_note` metadata on self-read edges.**
+Add a `dialect_note: Optional[str]` field to `ColumnEdge` populated with "Assumes snapshot isolation: self-read sees table state before this statement executed." for self-read edges. Informational only, no behavioral impact. Origin: Attack 4.
+
+## Known Limitations
-**Test 10: Out-of-order query submission.**
-Submit Step 2 (INSERT) before Step 1 (MERGE) in the pipeline constructor. Assert that topological sort still places MERGE before INSERT and self-read wiring is correct.
+**Unqualified columns in self-referencing queries remain ambiguous.**
+When a self-referencing query references columns without a table qualifier (e.g., bare `id` instead of `t.id`) and the query has multiple source tables, `_infer_table_name` may return `None` and `node.table_name` may also be `None`. In this case, the column falls through to physical naming and will not receive self-read treatment. This is consistent with existing behavior for any multi-source query where unqualified columns are ambiguous — it is not a regression introduced by this design. Users should qualify columns with table aliases in self-referencing queries for correct lineage tracking.
diff --git a/docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md b/docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md
index c09f7c3..35cf578 100644
--- a/docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md
+++ b/docs/superpowers/specs/2026-04-13-gap7-join-predicate-columns-design.md
@@ -36,7 +36,7 @@ for join in joins:
self._parse_from_sources(join, unit, depth)
```
-`_parse_from_sources` (`query_parser.py:1370-1415`) processes the `join.this` (the table/subquery being joined) to extract table names, aliases, and subqueries. The ON clause (`join.args.get("on")`) is **never accessed** in the non-MERGE path. No column references from the ON clause are extracted or stored on the `QueryUnit`.
+`_parse_from_sources` (`query_parser.py:693`) processes the `join.this` (the table/subquery being joined) to extract table names, aliases, and subqueries. The ON clause (`join.args.get("on")`) is **never accessed** in the non-MERGE path. No column references from the ON clause are extracted or stored on the `QueryUnit`.
### 2. The QueryUnit has no field for join predicates
@@ -46,7 +46,7 @@ for join in joins:
`lineage_builder.py:130-158` processes each unit by extracting output columns (`_extract_output_columns`) and tracing their dependencies (`_trace_column_dependencies`). Both operate exclusively on SELECT-list expressions. The ON clause is not examined.
-`_extract_source_column_refs` (`lineage_builder.py:889-948`) walks expression ASTs to find `exp.Column` nodes -- but it only receives SELECT-list expressions, never ON-clause expressions.
+`_extract_source_column_refs` (`lineage_builder.py:889-974`) walks expression ASTs to find `exp.Column` nodes -- but it only receives SELECT-list expressions, never ON-clause expressions.
### 4. MERGE already has the pattern we need
@@ -96,7 +96,7 @@ join_side: Optional[str] = None # "left" or "right" (which side of the jo
**Code touched:**
- `models.py`: add three fields to `ColumnEdge`.
-- `query_parser.py`: in `_parse_select_statement`, after parsing JOINs, extract ON-clause column refs and store on `QueryUnit` as `join_predicates: List[JoinPredicateInfo]`.
+- `query_parser.py`: in `_parse_select_unit`, after parsing JOINs, extract ON-clause column refs and store on `QueryUnit` as `join_predicates: List[JoinPredicateInfo]`.
- `models.py`: add `JoinPredicateInfo` dataclass (join_condition_sql, left_columns, right_columns, join_type).
- `lineage_builder.py`: in `_process_unit`, after step 4 (trace dependencies), add a new step to create join-predicate edges. For each join predicate, for each projected output column from the joined table, emit an edge from each ON-clause-only column to that output column with `is_join_predicate=True`.
- `trace_strategies.py`: no change needed (regular column tracing stays the same).
@@ -217,9 +217,9 @@ raw_orders.customer_id ───[P]────────> output.customer_cit
raw_orders.order_ts ──────[P]────────> output.customer_city (left-side pred -> right-side projected)
```
-This is the final recommended shape. Predicate columns from both sides connect to projected columns from the *opposite* side of the join (the side whose row selection they constrain). Self-side predicate columns (e.g., `d.start_time` constraining which `d` row is selected) also connect to their own side's projected columns, since they constrain the row that contributes the value.
+This is the final recommended shape.
-**Final refined rule:** all ON-clause columns connect to all projected output columns from the join's non-FROM side (the joined table). For multi-way joins, each JOIN's predicate columns connect to projected columns from that specific JOIN's right-side table.
+**Final refined rule:** all ON-clause columns (from both sides of the join) connect to all projected output columns that are sourced from the JOIN's right-side table. For multi-way joins, each JOIN's predicate columns connect to projected columns from that specific JOIN's right-side table.
## Scope
@@ -230,12 +230,13 @@ This is the final recommended shape. Predicate columns from both sides connect t
- Emit `is_join_predicate=True` edges from predicate columns to projected output columns.
- Support: equi-joins, range joins (BETWEEN), theta joins (>, <, >=, <=, <>), function-wrapped join keys (e.g., `UPPER(a.name) = UPPER(b.name)`), compound predicates (AND/OR).
- All existing dialects (bigquery, postgres, snowflake, databricks, duckdb, spark, trino, redshift, mysql).
+- **Interaction with Gap 4 self-referencing targets.** When a self-referencing query uses a JOIN whose right-side table is the self-referenced target, predicate columns must resolve through Gap 4's self-read naming. See "Gap 4 Interaction" section below.
### Non-goals
- **USING clauses.** USING(col) is syntactic sugar for ON a.col = b.col. It can be handled as a follow-up; the AST shape is different (`exp.Using` vs `exp.On`).
- **NATURAL JOIN.** Implicit equi-join on all same-named columns. Requires schema knowledge to resolve. Out of scope.
-- **LATERAL JOIN correlation.** Already handled by the lateral correlation path (`lineage_builder.py:160-162`). This design does not change that path.
+- **LATERAL JOIN correlation.** The outer correlation reference is already handled by the lateral correlation path (`lineage_builder.py:160-162`). This design does not change that path. JOINs within a LATERAL subquery are regular JOINs and are in scope.
- **Correlated subqueries in ON.** E.g., `ON a.id = (SELECT max(id) FROM ...)`. Rare in practice; the subquery parsing path already handles subqueries in WHERE/HAVING but not in ON. Follow-up.
- **CROSS JOIN.** No ON clause; nothing to extract.
- **Implicit joins (comma joins with WHERE predicate).** WHERE-clause predicates that act as join conditions are a separate problem. Out of scope.
@@ -257,7 +258,7 @@ This is the final recommended shape. Predicate columns from both sides connect t
2. Add `join_predicates: List[JoinPredicateInfo]` field to `QueryUnit`.
-3. In `query_parser._parse_select_statement`, after the JOIN loop (line 178), iterate joins again and extract ON-clause columns:
+3. In `query_parser._parse_select_unit`, after the JOIN loop (line 178), iterate joins again and extract ON-clause columns:
```python
for join in joins:
on_clause = join.args.get("on")
@@ -273,7 +274,8 @@ This is the final recommended shape. Predicate columns from both sides connect t
))
```
-4. Implement `_extract_join_predicate_columns` by reusing `_extract_source_column_refs` or by walking `exp.Column` nodes in the ON clause.
+4. Implement `_extract_join_predicate_columns` by walking `exp.Column` nodes in the ON-clause expression tree directly. Do NOT reuse `_extract_source_column_refs` — it returns 6-tuples with JSON/nested-path metadata irrelevant for join predicates. A simpler dedicated walker that returns `List[Tuple[Optional[str], str]]` is cleaner.
+ Note: literal comparisons in ON clauses (e.g., `t.is_active = 'Y'`) produce column refs for the column side only (`t.is_active`); the literal `'Y'` is not an `exp.Column` node and is correctly ignored by the `exp.Column` walker. No special handling is required.
### Phase 2: Emit predicate edges (lineage_builder.py, models.py)
@@ -284,7 +286,7 @@ This is the final recommended shape. Predicate columns from both sides connect t
join_side: Optional[str] = None
```
-2. In `lineage_builder._process_unit`, add a new step after step 4 (window function edges):
+2. In `lineage_builder._process_unit`, add a new step after step 8 (window function edges):
```python
# 9. Create join predicate edges
if unit.join_predicates:
@@ -293,9 +295,27 @@ This is the final recommended shape. Predicate columns from both sides connect t
3. Implement `_create_join_predicate_edges`:
- For each `JoinPredicateInfo` on the unit:
- - Identify which output columns are sourced from the joined (right-side) table.
- - For each column ref in the predicate, resolve to a source node.
+ - **Identify which output columns are sourced from the right-side table** using alias qualification:
+ 1. For each output column, walk its source expression AST to find `exp.Column` nodes.
+ 2. Check `column.table` (the alias qualifier) against `info.right_table` (the right-side alias).
+ 3. If the alias matches → this output column is from the right side; it receives predicate edges.
+ 4. If an output column is unqualified and ambiguous (could belong to either side), emit a
+ debug-level warning and skip predicate edges for that column. Users are expected to write
+ lineage-friendly SQL with explicit table qualifiers.
+ 5. If an output column's expression combines columns from both sides (e.g., `COALESCE(a.val, b.val)`),
+ treat it as right-side if *any* source column is from the right side.
+ - **Resolve each predicate column ref to a source node:**
+ 1. Use `unit.alias_mapping` to resolve the table alias from `(table_ref, col_name)` to a physical table name.
+ 2. Look up the corresponding input `ColumnNode` in the lineage graph by `(resolved_table, col_name)`.
+ 3. If not found (column is referenced in ON but never created as an input node), create the input node.
+ This parallels how `_create_qualify_edges` resolves columns via `_resolve_qualify_column`.
+ Note: unlike `_create_qualify_edges` (which targets a single representative output column — the first non-star output column), `_create_join_predicate_edges` targets ALL output columns sourced from the right-side table.
- Emit a `ColumnEdge` from each predicate column to each right-side projected column, with `is_join_predicate=True`, `join_condition=info.condition_sql`, `edge_type="join_predicate"`.
+ Note: `"join_predicate"` is a new addition to the `edge_type` value set. Existing code that switches on `edge_type` (e.g., visualization, serialization, export) should be audited for exhaustive handling to ensure this new value is not silently ignored.
+ Note: `edge_type="join_predicate"` is intentionally distinct from the existing `"join"` type. Because `ColumnEdge.__eq__` and `__hash__` include `edge_type`, a column can have both a value edge (`edge_type="direct"`) and a predicate edge (`edge_type="join_predicate"`) to the same target node — this is by design and ensures both are preserved.
+ - **`join_side` assignment:** For each predicate column edge, set `join_side="left"` if the predicate column's table alias matches a left-side (FROM) table, `join_side="right"` if it matches the right-side table. This is determined by checking `table_ref` against `info.right_table`.
+
+4. **Update `_add_query_edges` in `pipeline_lineage_builder.py`** (around line 460): Add `is_join_predicate`, `join_condition`, and `join_side` to the explicit field list in `_add_query_edges` (pipeline_lineage_builder.py, around line 460). Without this, predicate edge metadata is silently lost when edges are copied into the pipeline graph.
### Phase 3: Tests
@@ -377,6 +397,78 @@ Using `SQLColumnTracer`, verify that:
The CDC BETWEEN join produces identical predicate edges across bigquery, postgres, snowflake, databricks.
+### Test 8: Self-referencing query with JOIN predicates (Gap 4 interaction)
+
+```sql
+INSERT INTO dim_customer
+SELECT s.id, s.name, s.city, s.email,
+ COALESCE(t.is_active, 'Y') AS is_active
+FROM staging s
+LEFT JOIN dim_customer t
+ ON s.id = t.id AND t.is_active = 'Y'
+WHERE t.id IS NULL OR (t.name <> s.name OR t.city <> s.city)
+```
+
+Single-query pipeline. Assert:
+- Gap 4 self-read nodes exist: `query_0:self_read:dim_customer.id`, `query_0:self_read:dim_customer.is_active`.
+- Gap 7 predicate edges exist from self-read nodes (not physical nodes):
+ - `query_0:self_read:dim_customer.id` -[P]-> `dim_customer.is_active` (output) with `is_join_predicate=True`.
+ - `query_0:self_read:dim_customer.is_active` -[P]-> `dim_customer.is_active` (output) with `is_join_predicate=True`.
+ - `staging.id` -[P]-> `dim_customer.is_active` (output) with `is_join_predicate=True`.
+- No predicate edge has a `from_node` with physical `dim_customer.*` naming (all self-ref predicate sources go through self-read nodes).
+- No predicate edge exists from `dim_customer.name` or `dim_customer.city` — those appear only in the WHERE clause, which is out of scope for this feature.
+
+### Test 9: Self-referencing multi-statement pipeline with JOIN predicates (Gap 4 + Gap 7)
+
+SCD2 two-step fixture (MERGE + INSERT). Assert:
+- Step 2's ON-clause predicate columns (`t.id`, `t.is_active`) resolve to self-read nodes.
+- Cross-query edges from Step 1's output to Step 2's self-read nodes exist (Gap 4).
+- Predicate edges from self-read nodes to Step 2's output columns exist (Gap 7).
+- Impact analysis from `staging.id` reaches `dim_customer.is_active` through both the value path and the predicate path.
+
+### Test 10: Unqualified predicate column emits warning
+
+```sql
+SELECT a.id, name
+FROM table_a a
+INNER JOIN table_b b ON a.id = b.id
+```
+
+`name` is unqualified and ambiguous (could be from `a` or `b`). Assert:
+- A debug-level warning is emitted about the ambiguous column.
+- Predicate edges are still created for the qualified ON-clause columns (`a.id`, `b.id`).
+- The unqualified `name` output column does not receive predicate edges (conservative: skip on ambiguity).
+
+## Gap 4 Interaction
+
+Gap 4 (self-referencing targets, landed in PR #61) introduced query-scoped self-read nodes for tables that appear as both source and destination in the same query or across consecutive pipeline statements. Gap 7 must integrate with this mechanism.
+
+### How it works
+
+When a self-referencing query has a JOIN whose right-side table is the self-referenced target:
+
+1. **Parsing (Phase 1):** `JoinPredicateInfo.right_table` captures the alias of the joined table (e.g., `"t"` for `LEFT JOIN dim_customer t`). `ParsedQuery.self_ref_aliases` (from Gap 4) maps `"t"` -> `"dim_customer"`.
+
+2. **Predicate column resolution (Phase 2):** When resolving a predicate column ref like `(table_ref="t", col_name="is_active")`:
+ - Resolve alias: `t` -> `dim_customer` via `unit.alias_mapping`.
+ - Check if `dim_customer` is in `query.self_referenced_tables`.
+ - If yes: the input node uses Gap 4's self-read naming: `{query_id}:self_read:dim_customer.is_active`.
+ - Look up this self-read node in the lineage graph (it was created by `pipeline_lineage_builder._add_query_columns` during Gap 4 processing).
+
+3. **Edge emission:** The predicate edge's `from_node` is the self-read node, not the physical `dim_customer.is_active` node. This is correct: the predicate reads the *prior state* of `dim_customer`, which is exactly what self-read nodes represent.
+
+### Implementation note
+
+Gap 7's predicate column resolution in `_create_join_predicate_edges` must check `query.self_referenced_tables` (or the pipeline-level equivalent) before resolving column names. If the predicate column's table is self-referenced, use the `{query_id}:self_read:{table}.{column}` naming convention to find the source node.
+
+At the single-query `lineage_builder.py` level, `self_referenced_tables` is not directly available (it lives on `ParsedQuery`, which is a multi-query concept). Two options:
+
+**Option A (recommended):** Gap 7's predicate edge emission happens at the `lineage_builder.py` level (single-query). The self-read renaming happens at the `pipeline_lineage_builder.py` level (multi-query) via `_make_full_name`. Since `_make_full_name` already routes self-read input columns to query-scoped names, predicate edges created at the single-query level will get renamed when `pipeline_lineage_builder._add_query_edges` copies them into the pipeline graph. **However, `_add_query_edges` (pipeline_lineage_builder.py:427-471) explicitly names every field it copies — it does NOT automatically copy new fields.** The new `is_join_predicate`, `join_condition`, and `join_side` fields must be added to the explicit field list in `_add_query_edges` (see Phase 2 required step below). Without this wiring, predicate edge metadata is silently lost when edges are copied into the pipeline graph. No special self-read handling is needed in `_create_join_predicate_edges` itself — but the pipeline-level copy must be updated.
+
+**Option B:** Add self-read awareness to `_create_join_predicate_edges`. This is only needed if the single-query lineage graph must be self-read-aware independently of the pipeline builder.
+
+Option A is preferred because it keeps Gap 7 implementation simple and leverages Gap 4's existing renaming infrastructure.
+
## Hostile Review
### Attack 1: Graph blow-up on wide schemas
@@ -447,6 +539,8 @@ However, tests that use `graph.edges` directly (e.g., `total_edges = [e for e in
**Revision:** Before landing, run the full test suite against the implementation and fix any count-based assertions. The likely fix is to filter: `[e for e in graph.edges if e.to_node.full_name == X and not e.is_join_predicate]` in tests that care about value-only edges, or update count expectations.
+**Risk note:** `test_join_types.py:254` uses `edges[0]` index access, which depends on edge insertion order. Since `_create_join_predicate_edges` runs as step 9 (after value edges are created), `[0]` will still return the value edge. However, this ordering is fragile. The implementation should verify this test still passes, and consider updating it to filter by `not e.is_join_predicate` if needed.
+
### Attack 7: Self-join predicate edges
**Concern:** In a self-join (`SELECT a.id, b.name FROM users a JOIN users b ON a.manager_id = b.id`), the "left" and "right" tables are the same physical table. Does `join_side` still work? Does `right_table` resolve correctly?
@@ -480,3 +574,13 @@ However, tests that use `graph.edges` directly (e.g., `total_edges = [e for e in
For adversarial cases (50 joins, 100 columns each), the edge count could reach tens of thousands. This is a pre-existing concern (star expansion on wide tables already produces similar edge counts) and is not specific to this change.
**Accepted limitation:** No performance guardrails added. Monitor in practice; add edge-count warnings if needed (similar to existing `UNQUALIFIED_STAR_MULTIPLE_TABLES` warnings).
+
+### Attack 11: Gap 4 self-read nodes as predicate sources
+
+**Concern:** With Gap 4 landed (PR #61), self-referencing queries create `{query_id}:self_read:{table}.{column}` input nodes. If Gap 7 creates predicate edges from *physical* `dim_customer.id` instead of `query_0:self_read:dim_customer.id`, the edge connects to the wrong node — the output node rather than the prior-state input node. Impact analysis would miss the self-read chain.
+
+**Response:** This is handled by the implementation architecture. Gap 7 emits predicate edges at the single-query `lineage_builder.py` level, where nodes use raw `table.column` naming. When `pipeline_lineage_builder.py` copies these edges into the pipeline graph, `_make_full_name` (modified by Gap 4) already renames self-read input columns to `{query_id}:self_read:{table}.{column}`. The predicate edges are automatically remapped to self-read nodes without any Gap-7-specific code.
+
+**Verification:** Test 8 and Test 9 in the test plan validate this interaction explicitly. If the remapping fails, these tests will catch it.
+
+**No revision needed.** The existing Gap 4 renaming infrastructure handles this transparently.
diff --git a/examples/cdc_scd_pipeline.ipynb b/examples/cdc_scd_pipeline.ipynb
new file mode 100644
index 0000000..1fa6560
--- /dev/null
+++ b/examples/cdc_scd_pipeline.ipynb
@@ -0,0 +1,2048 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "a32b26c7",
+ "metadata": {},
+ "source": [
+ "# CDC / SCD Type 2 Pipeline — End-to-End Column Lineage\n",
+ "\n",
+ "**Example: A realistic Change Data Capture + Slowly Changing Dimension Type 2 pipeline, with column lineage through every layer.**\n",
+ "\n",
+ "This notebook stress-tests clgraph against the kind of pipeline you actually see in production:\n",
+ "\n",
+ "- **Raw CDC envelope** (Debezium-style `before`/`after` structs, operation codes)\n",
+ "- **Dedup staging** using `ROW_NUMBER()` + `WHERE rn = 1`\n",
+ "- **SCD2 target** via a two-step MERGE-then-INSERT pattern on the same dim table\n",
+ "- **Fact table** joined to the dim with a **point-in-time** range predicate\n",
+ "- **Mart** rollup by day and dimension attribute\n",
+ "\n",
+ "Along the way it demonstrates ten column-lineage features that were gaps in clgraph before this iteration (see `docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md`):\n",
+ "\n",
+ "| # | Feature | Edge / metadata |\n",
+ "|---|---------|-----------------|\n",
+ "| 1 | Struct dot-access on CDC envelope | `access_type=\"struct\"`, `nested_path` |\n",
+ "| 2 | Dedup via subquery + `WHERE rn = 1` | `is_qualify_column`, `qualify_function` |\n",
+ "| 3/10 | MERGE `WHEN MATCHED AND (...)` condition columns | `merge_column_role=\"condition\"` |\n",
+ "| 4 | Self-referencing target across statements | self-read nodes, `edge_role=\"prior_state_read\"` |\n",
+ "| 5 | Literal-only output columns (`'Y' AS is_active`) | terminal nodes, no upstream edges |\n",
+ "| 6 | Function-only output (`current_timestamp()`) | output nodes, no incoming edges |\n",
+ "| 7 | JOIN ON predicate columns (`BETWEEN`) | `is_join_predicate=True`, `join_side` |\n",
+ "| 8 | WHERE clause columns | `is_where_filter=True`, `where_condition` |\n",
+ "| 9 | MERGE ON literal-bound predicate (`t.is_active = 'Y'`) | `merge_match_filter` edges |\n",
+ "\n",
+ "Dialect: **Databricks / Delta Lake** (the natural target for Debezium CDC landing)."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "fe790cad",
+ "metadata": {},
+ "source": [
+ "## 1. Pipeline architecture\n",
+ "\n",
+ "```\n",
+ "raw_customer_cdc (Debezium envelope: op, ts_ms, before STRUCT, after STRUCT)\n",
+ " |\n",
+ " v\n",
+ "staging_customer_latest (flatten after.*, dedup by PK via ROW_NUMBER + WHERE rn = 1)\n",
+ " |\n",
+ " +------------------> dim_customer (SCD2)\n",
+ " | - Step 1: MERGE WHEN MATCHED UPDATE (close old row)\n",
+ " | - Step 2: INSERT ... LEFT JOIN (open new version)\n",
+ " |\n",
+ " v\n",
+ "fact_orders (append-only; joins dim_customer with BETWEEN valid_from/valid_to)\n",
+ " |\n",
+ " v\n",
+ "mart_daily_revenue (rollup by day and customer_city)\n",
+ "```"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "e873e24a",
+ "metadata": {},
+ "source": [
+ "## 2. Define the SQL statements\n",
+ "\n",
+ "Six statements across four layers. Note the `-- Deletes filtered out; tombstone handling is a future extension` comment on the staging CTE — for this example we only handle creates and updates."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "id": "8af2f37a",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.770752Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.770595Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.836924Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.836505Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Queries: 5\n",
+ "Columns: 40\n",
+ "Edges: 95\n",
+ "\n",
+ "Topological order:\n",
+ " 0. l1_l2_staging CREATE OR REPLACE TABLE -> staging_customer_latest\n",
+ " 1. l2_l3a_close MERGE -> dim_customer\n",
+ " 2. l2_l3a_open INSERT -> dim_customer\n",
+ " 3. l3_fact INSERT -> fact_orders\n",
+ " 4. fact_mart CREATE OR REPLACE TABLE -> mart_daily_revenue\n"
+ ]
+ }
+ ],
+ "source": [
+ "from clgraph import Pipeline\n",
+ "\n",
+ "L1_L2_STAGING_SQL = \"\"\"\n",
+ "CREATE OR REPLACE TABLE staging_customer_latest AS\n",
+ "SELECT after.id, after.name, after.city, after.email, ts_ms, op\n",
+ "FROM (\n",
+ " SELECT *,\n",
+ " ROW_NUMBER() OVER (PARTITION BY after.id ORDER BY ts_ms DESC) AS rn\n",
+ " FROM raw_customer_cdc\n",
+ " WHERE op IN ('c', 'u') -- Deletes filtered out; tombstone handling is a future extension\n",
+ ")\n",
+ "WHERE rn = 1\n",
+ "\"\"\"\n",
+ "\n",
+ "L2_L3A_CLOSE_SQL = \"\"\"\n",
+ "MERGE INTO dim_customer t\n",
+ "USING staging_customer_latest s ON t.id = s.id AND t.is_active = 'Y'\n",
+ "WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city OR t.email <> s.email) THEN\n",
+ " UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N'\n",
+ "\"\"\"\n",
+ "\n",
+ "L2_L3A_OPEN_SQL = \"\"\"\n",
+ "INSERT INTO dim_customer\n",
+ "SELECT s.id, s.name, s.city, s.email,\n",
+ " current_timestamp() AS start_time,\n",
+ " TIMESTAMP '9999-12-31 00:00:00' AS end_time,\n",
+ " 'Y' AS is_active\n",
+ "FROM staging_customer_latest s\n",
+ "LEFT JOIN dim_customer t\n",
+ " ON s.id = t.id AND t.is_active = 'Y'\n",
+ "WHERE t.id IS NULL OR (t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\n",
+ "\"\"\"\n",
+ "\n",
+ "L3_FACT_SQL = \"\"\"\n",
+ "INSERT INTO fact_orders\n",
+ "SELECT o.order_id, o.customer_id, o.order_ts, o.amount,\n",
+ " d.city AS customer_city_at_order\n",
+ "FROM raw_orders o\n",
+ "LEFT JOIN dim_customer d\n",
+ " ON o.customer_id = d.id\n",
+ " AND o.order_ts BETWEEN d.start_time AND d.end_time\n",
+ "\"\"\"\n",
+ "\n",
+ "FACT_MART_SQL = \"\"\"\n",
+ "CREATE OR REPLACE TABLE mart_daily_revenue AS\n",
+ "SELECT DATE(order_ts) AS order_date, customer_city_at_order,\n",
+ " SUM(amount) AS revenue, COUNT(*) AS orders\n",
+ "FROM fact_orders\n",
+ "GROUP BY 1, 2\n",
+ "\"\"\"\n",
+ "\n",
+ "pipeline = Pipeline(\n",
+ " queries=[\n",
+ " (\"l1_l2_staging\", L1_L2_STAGING_SQL),\n",
+ " (\"l2_l3a_close\", L2_L3A_CLOSE_SQL),\n",
+ " (\"l2_l3a_open\", L2_L3A_OPEN_SQL),\n",
+ " (\"l3_fact\", L3_FACT_SQL),\n",
+ " (\"fact_mart\", FACT_MART_SQL),\n",
+ " ],\n",
+ " dialect=\"databricks\",\n",
+ ")\n",
+ "\n",
+ "print(f\"Queries: {len(pipeline.table_graph.queries)}\")\n",
+ "print(f\"Columns: {len(pipeline.columns)}\")\n",
+ "print(f\"Edges: {len(pipeline.edges)}\")\n",
+ "print()\n",
+ "print(\"Topological order:\")\n",
+ "for i, qid in enumerate(pipeline.table_graph.topological_sort()):\n",
+ " q = pipeline.table_graph.queries[qid]\n",
+ " print(f\" {i}. {qid:18s} {q.operation.value:6s} -> {q.destination_table}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "cc144e72",
+ "metadata": {},
+ "source": [
+ "## 3. Visualize the pipeline\n",
+ "\n",
+ "### Table-level DAG"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "7b0ba00f",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.838097Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.838019Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.893404Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.892971Z"
+ }
+ },
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "import shutil\n",
+ "\n",
+ "from clgraph import visualize_pipeline_lineage, visualize_table_dependencies\n",
+ "\n",
+ "if shutil.which(\"dot\") is None:\n",
+ " print(\"Graphviz not installed. Install with: brew install graphviz\")\n",
+ "else:\n",
+ " display(visualize_table_dependencies(pipeline.table_graph))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1afd6697",
+ "metadata": {},
+ "source": [
+ "### Column-level lineage (simplified)\n",
+ "\n",
+ "The simplified view collapses per-query subquery intermediate nodes but keeps all cross-table flows — ideal for a bird's-eye view."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "45ebee32",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.894700Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.894594Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.950728Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.950205Z"
+ }
+ },
+ "outputs": [
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "if shutil.which(\"dot\") is None:\n",
+ " print(\"Graphviz not installed. Install with: brew install graphviz\")\n",
+ "else:\n",
+ " display(visualize_pipeline_lineage(pipeline.column_graph.to_simplified()))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "181e8cbc",
+ "metadata": {},
+ "source": [
+ "## 4. What clgraph captures — a gap-by-gap showcase\n",
+ "\n",
+ "Each section below demonstrates a lineage feature on the actual pipeline above."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2fbaa92d",
+ "metadata": {},
+ "source": [
+ "### Gap 1 — Struct dot-access (`after.id`, `after.city`, ...)\n",
+ "\n",
+ "Debezium-style CDC envelopes put the row payload under nested structs. sqlglot parses `after.id` as `Column(table=\"after\", name=\"id\")` — indistinguishable from a table-qualified reference. clgraph now detects when `Column.table` can't be resolved as any known table/alias and emits a **struct edge** instead, carrying `nested_path` and `access_type=\"struct\"` metadata."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "d0a8e597",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.951847Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.951749Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.953942Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.953592Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Struct dot-access edges: 5\n",
+ "\n",
+ " raw_customer_cdc.after -> l1_l2_staging:subq:subquery_0.rn\n",
+ " nested_path='.id' access_type='struct'\n",
+ " raw_customer_cdc.after -> staging_customer_latest.id\n",
+ " nested_path='.id' access_type='struct'\n",
+ " raw_customer_cdc.after -> staging_customer_latest.name\n",
+ " nested_path='.name' access_type='struct'\n",
+ " raw_customer_cdc.after -> staging_customer_latest.city\n",
+ " nested_path='.city' access_type='struct'\n",
+ " raw_customer_cdc.after -> staging_customer_latest.email\n",
+ " nested_path='.email' access_type='struct'\n"
+ ]
+ }
+ ],
+ "source": [
+ "struct_edges = [e for e in pipeline.edges if e.access_type == \"struct\"]\n",
+ "\n",
+ "print(f\"Struct dot-access edges: {len(struct_edges)}\\n\")\n",
+ "for e in struct_edges:\n",
+ " print(f\" {e.from_node.full_name:30s} -> {e.to_node.full_name}\")\n",
+ " print(f\" nested_path={e.nested_path!r:12s} access_type={e.access_type!r}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "040cdbf6",
+ "metadata": {},
+ "source": [
+ "### Gap 2 — Dedup via subquery + `WHERE rn = 1`\n",
+ "\n",
+ "The canonical dedup pattern: `ROW_NUMBER() OVER (PARTITION BY ...)` in a subquery, then `WHERE rn = 1` in the outer query. clgraph detects this shape and promotes qualify metadata — partition / order columns become visible as qualify-context sources of the final output."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "b92ade0b",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.954949Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.954870Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.956718Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.956409Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Qualify (dedup) edges: 1\n",
+ "\n",
+ " raw_customer_cdc.after -> staging_customer_latest.id\n",
+ " qualify_context='partition' qualify_function='ROW_NUMBER'\n"
+ ]
+ }
+ ],
+ "source": [
+ "qualify_edges = [e for e in pipeline.edges if e.is_qualify_column]\n",
+ "\n",
+ "print(f\"Qualify (dedup) edges: {len(qualify_edges)}\\n\")\n",
+ "for e in qualify_edges:\n",
+ " print(f\" {e.from_node.full_name:40s} -> {e.to_node.full_name}\")\n",
+ " print(f\" qualify_context={e.qualify_context!r} qualify_function={e.qualify_function!r}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1917568d",
+ "metadata": {},
+ "source": [
+ "### Gap 3 / Gap 10 — MERGE `WHEN MATCHED AND (...)` condition columns\n",
+ "\n",
+ "The close-old-row MERGE only fires when `t.name <> s.name OR t.city <> s.city OR t.email <> s.email`. Those columns are not *assigned* — but they **gate** the assignment. Before this fix, impact analysis on `staging.name` would miss that it drives whether `dim_customer.end_time` gets updated.\n",
+ "\n",
+ "Condition edges now carry `merge_column_role=\"condition\"` (as distinct from `\"assignment\"`)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "5dfc65e8",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.957614Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.957552Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.959468Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.959210Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "MERGE condition edges: 13\n",
+ "\n",
+ " staging_customer_latest.city -> l2_l3a_close:subq:main.end_time (merge_update)\n",
+ " staging_customer_latest.city -> l2_l3a_close:subq:main.end_time (merge_update)\n",
+ " staging_customer_latest.city -> l2_l3a_close:subq:main.is_active (merge_update)\n",
+ " staging_customer_latest.city -> l2_l3a_close:subq:main.is_active (merge_update)\n",
+ " staging_customer_latest.email -> l2_l3a_close:subq:main.end_time (merge_update)\n",
+ " staging_customer_latest.email -> l2_l3a_close:subq:main.end_time (merge_update)\n",
+ " staging_customer_latest.email -> l2_l3a_close:subq:main.is_active (merge_update)\n",
+ " staging_customer_latest.email -> l2_l3a_close:subq:main.is_active (merge_update)\n",
+ " staging_customer_latest.is_active -> l2_l3a_close:subq:main.is_active (merge_match_filter)\n",
+ " staging_customer_latest.name -> l2_l3a_close:subq:main.end_time (merge_update)\n",
+ " staging_customer_latest.name -> l2_l3a_close:subq:main.end_time (merge_update)\n",
+ " staging_customer_latest.name -> l2_l3a_close:subq:main.is_active (merge_update)\n",
+ " staging_customer_latest.name -> l2_l3a_close:subq:main.is_active (merge_update)\n"
+ ]
+ }
+ ],
+ "source": [
+ "cond_edges = [e for e in pipeline.edges if e.merge_column_role == \"condition\"]\n",
+ "\n",
+ "print(f\"MERGE condition edges: {len(cond_edges)}\\n\")\n",
+ "for e in sorted(cond_edges, key=lambda e: (e.from_node.full_name, e.to_node.full_name)):\n",
+ " print(f\" {e.from_node.full_name:30s} -> {e.to_node.full_name:50s} ({e.edge_type})\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "4450cd84",
+ "metadata": {},
+ "source": [
+ "### Gap 4 — Self-referencing target across statements\n",
+ "\n",
+ "Step 2 (`INSERT INTO dim_customer ... LEFT JOIN dim_customer t`) reads from a table that Step 1 just mutated. Naively, this would produce a self-loop in the pipeline graph. clgraph instead creates **self-read nodes** — statement-scoped representations of the prior table state — and wires a **cross-query self-ref edge** from Step 1's output to Step 2's self-read input."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "id": "9ffbe56b",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.960547Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.960482Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.962590Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.962279Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Self-read columns for dim_customer: 4\n",
+ " l2_l3a_open:self_read:dim_customer.id node_type=self_read\n",
+ " l2_l3a_open:self_read:dim_customer.email node_type=self_read\n",
+ " l2_l3a_open:self_read:dim_customer.name node_type=self_read\n",
+ " l2_l3a_open:self_read:dim_customer.city node_type=self_read\n",
+ "\n",
+ "prior_state_read edges (inside step 2): 28\n",
+ "cross_query_self_ref edges (step 1 -> step 2): 4\n"
+ ]
+ }
+ ],
+ "source": [
+ "sr_cols = pipeline.get_self_read_columns(\"dim_customer\")\n",
+ "print(f\"Self-read columns for dim_customer: {len(sr_cols)}\")\n",
+ "for c in sr_cols:\n",
+ " print(f\" {c.full_name:60s} node_type={c.node_type}\")\n",
+ "\n",
+ "print()\n",
+ "prior_state_edges = [e for e in pipeline.edges if e.edge_role == \"prior_state_read\"]\n",
+ "cross_query_edges = [e for e in pipeline.edges if e.edge_role == \"cross_query_self_ref\"]\n",
+ "print(f\"prior_state_read edges (inside step 2): {len(prior_state_edges)}\")\n",
+ "print(f\"cross_query_self_ref edges (step 1 -> step 2): {len(cross_query_edges)}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "6bd7c242",
+ "metadata": {},
+ "source": [
+ "### Gap 5 — Literal-only output columns (`'Y' AS is_active`)\n",
+ "\n",
+ "Columns derived purely from literals (e.g., `'Y' AS is_active`, `TIMESTAMP '9999-12-31 ...'`) appear as terminal output nodes with **zero upstream edges**. They're not silently dropped — they're visible as first-class columns on the dim table."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "10528c00",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.963407Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.963343Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.965592Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.965311Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Literal-only / function-only output columns on dim_customer:\n",
+ "\n",
+ " city (1 value edges)\n",
+ " email (1 value edges)\n",
+ " end_time (literal/function only)\n",
+ " id (1 value edges)\n",
+ " is_active (literal/function only)\n",
+ " name (1 value edges)\n",
+ " start_time (literal/function only)\n"
+ ]
+ }
+ ],
+ "source": [
+ "from collections import defaultdict\n",
+ "\n",
+ "# Find output columns with no incoming value edges — these are literal-only or function-only\n",
+ "incoming = defaultdict(list)\n",
+ "for e in pipeline.edges:\n",
+ " if not e.is_join_predicate and not e.is_where_filter and e.merge_column_role != \"condition\":\n",
+ " incoming[e.to_node.full_name].append(e)\n",
+ "\n",
+ "dim_output_cols = [\n",
+ " c for c in pipeline.columns.values() if c.table_name == \"dim_customer\" and c.layer == \"output\"\n",
+ "]\n",
+ "print(\"Literal-only / function-only output columns on dim_customer:\\n\")\n",
+ "for col in sorted(dim_output_cols, key=lambda c: c.column_name):\n",
+ " n_value_in = len(incoming.get(col.full_name, []))\n",
+ " marker = \"(literal/function only)\" if n_value_in == 0 else f\"({n_value_in} value edges)\"\n",
+ " print(f\" {col.column_name:15s} {marker}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "98ce3eda",
+ "metadata": {},
+ "source": [
+ "### Gap 6 — Function-only outputs (`current_timestamp()`)\n",
+ "\n",
+ "`current_timestamp()` has no column dependencies — it's a function-only source. The resulting columns (`start_time`, `end_time` after MERGE) appear as outputs in the graph without incoming column edges, visible for downstream impact analysis even though nothing upstream drives their value."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "id": "b3dec72f",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.966622Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.966554Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.968728Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.968390Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Edges targeting *.end_time (by query):\n",
+ "\n",
+ " query=l2_l3a_close\n",
+ " staging_customer_latest.email -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n",
+ " staging_customer_latest.email -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n",
+ " staging_customer_latest.name -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n",
+ " staging_customer_latest.name -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n",
+ " staging_customer_latest.city -> l2_l3a_close:subq:main.end_time (merge_update, role=condition)\n",
+ " ... and 1 more\n",
+ "\n",
+ " query=l2_l3a_open\n",
+ " l2_l3a_open:self_read:dim_customer.id -> dim_customer.end_time (where_filter, role=None)\n",
+ " l2_l3a_open:self_read:dim_customer.email -> dim_customer.end_time (where_filter, role=None)\n",
+ " staging_customer_latest.email -> dim_customer.end_time (where_filter, role=None)\n",
+ " l2_l3a_open:self_read:dim_customer.name -> dim_customer.end_time (where_filter, role=None)\n",
+ " staging_customer_latest.name -> dim_customer.end_time (where_filter, role=None)\n",
+ " ... and 2 more\n"
+ ]
+ }
+ ],
+ "source": [
+ "# end_time appears twice: assigned via current_timestamp() in the MERGE,\n",
+ "# and (separately) derived from the '9999-...' literal in the INSERT.\n",
+ "# Both are handled correctly.\n",
+ "\n",
+ "end_time_edges = [e for e in pipeline.edges if e.to_node.column_name == \"end_time\"]\n",
+ "print(\"Edges targeting *.end_time (by query):\")\n",
+ "by_query: dict[str, list] = {}\n",
+ "for e in end_time_edges:\n",
+ " by_query.setdefault(e.query_id or \"?\", []).append(e)\n",
+ "\n",
+ "for qid, edges in by_query.items():\n",
+ " print(f\"\\n query={qid}\")\n",
+ " for e in edges[:5]:\n",
+ " print(\n",
+ " f\" {e.from_node.full_name:50s} -> {e.to_node.full_name} ({e.edge_type}, role={e.merge_column_role})\"\n",
+ " )\n",
+ " if len(edges) > 5:\n",
+ " print(f\" ... and {len(edges) - 5} more\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "94194131",
+ "metadata": {},
+ "source": [
+ "### Gap 7 — JOIN ON predicate columns (including `BETWEEN`)\n",
+ "\n",
+ "The fact-table join uses a point-in-time predicate: `o.order_ts BETWEEN d.start_time AND d.end_time`. Before this fix, `d.start_time` and `d.end_time` were invisible to column lineage (the JOIN ON clause wasn't traced at all).\n",
+ "\n",
+ "Now predicate-only columns produce **predicate edges** with `is_join_predicate=True` to every output column of the joined query, enabling impact analysis (\"if I change the SCD2 `end_time` semantics, what downstream columns are affected?\")."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "id": "352af875",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.969547Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.969476Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.971143Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.970829Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "JOIN predicate edges: 5\n",
+ "\n",
+ " dim_customer.end_time -> fact_orders.customer_city_at_order (side=right)\n",
+ " dim_customer.id -> fact_orders.customer_city_at_order (side=right)\n",
+ " dim_customer.start_time -> fact_orders.customer_city_at_order (side=right)\n",
+ " raw_orders.customer_id -> fact_orders.customer_city_at_order (side=left)\n",
+ " raw_orders.order_ts -> fact_orders.customer_city_at_order (side=left)\n"
+ ]
+ }
+ ],
+ "source": [
+ "join_pred_edges = [e for e in pipeline.edges if e.is_join_predicate]\n",
+ "print(f\"JOIN predicate edges: {len(join_pred_edges)}\\n\")\n",
+ "for e in sorted(join_pred_edges, key=lambda e: (e.from_node.full_name, e.to_node.full_name)):\n",
+ " print(f\" {e.from_node.full_name:30s} -> {e.to_node.full_name:45s} (side={e.join_side})\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "63d46ef8",
+ "metadata": {},
+ "source": [
+ "### Gap 8 — WHERE clause columns\n",
+ "\n",
+ "WHERE predicates don't produce output values directly, but they gate which rows land. `WHERE t.id IS NULL OR (t.name <> s.name OR t.city <> s.city OR t.email <> s.email)` in Step 2 is the sentinel that distinguishes *new* rows from *changed* rows. Column lineage now records these as `where_filter` edges."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "id": "07ffd56f",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.972025Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.971955Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.974004Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.973647Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "WHERE filter edges: 50\n",
+ "\n",
+ " l2_l3a_open:self_read:dim_customer.id -> 7 output columns (where_filter)\n",
+ " l2_l3a_open:self_read:dim_customer.email -> 7 output columns (where_filter)\n",
+ " staging_customer_latest.email -> 7 output columns (where_filter)\n",
+ " l2_l3a_open:self_read:dim_customer.name -> 7 output columns (where_filter)\n",
+ " staging_customer_latest.name -> 7 output columns (where_filter)\n",
+ " l2_l3a_open:self_read:dim_customer.city -> 7 output columns (where_filter)\n",
+ " staging_customer_latest.city -> 7 output columns (where_filter)\n",
+ " raw_customer_cdc.op -> 1 output columns (where_filter)\n",
+ "\n",
+ "Sample where_condition metadata:\n",
+ " raw_customer_cdc.op -> l1_l2_staging:subq:subquery_0.rn\n",
+ " where_condition=\"op IN ('c', 'u') /* Deletes filtered out; tombstone handling is a future extension */\"\n",
+ " l2_l3a_open:self_read:dim_customer.id -> dim_customer.id\n",
+ " where_condition='t.id IS NULL OR (t.name <> s.name OR t.city <> s.city OR t.email <> s.email)'\n",
+ " l2_l3a_open:self_read:dim_customer.id -> dim_customer.name\n",
+ " where_condition='t.id IS NULL OR (t.name <> s.name OR t.city <> s.city OR t.email <> s.email)'\n"
+ ]
+ }
+ ],
+ "source": [
+ "from collections import Counter\n",
+ "\n",
+ "where_edges = [e for e in pipeline.edges if e.is_where_filter]\n",
+ "print(f\"WHERE filter edges: {len(where_edges)}\\n\")\n",
+ "\n",
+ "# Group by source column to avoid spam — each source predicate column fans out\n",
+ "# to every non-star output column of the query\n",
+ "by_source = Counter(e.from_node.full_name for e in where_edges)\n",
+ "for col, count in by_source.most_common():\n",
+ " print(f\" {col:40s} -> {count} output columns (where_filter)\")\n",
+ "\n",
+ "print(\"\\nSample where_condition metadata:\")\n",
+ "for e in where_edges[:3]:\n",
+ " print(f\" {e.from_node.full_name} -> {e.to_node.full_name}\")\n",
+ " print(f\" where_condition={e.where_condition!r}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "4b2d16e5",
+ "metadata": {},
+ "source": [
+ "### Gap 9 — MERGE ON literal-bound predicate (`t.is_active = 'Y'`)\n",
+ "\n",
+ "The close-old-row MERGE's ON clause is `t.id = s.id AND t.is_active = 'Y'`. The second conjunct is a literal filter — `match_columns` extraction only captures `EQ` column pairs, so before this fix `t.is_active` was dropped. Now it's emitted as a **`merge_match_filter`** edge with `merge_column_role=\"condition\"`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "id": "75a6f4b3",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.974812Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.974744Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.976293Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.976031Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "merge_match_filter edges: 1\n",
+ "\n",
+ " staging_customer_latest.is_active -> l2_l3a_close:subq:main.is_active\n",
+ " merge_column_role='condition' merge_condition=None\n"
+ ]
+ }
+ ],
+ "source": [
+ "mmf_edges = [e for e in pipeline.edges if e.edge_type == \"merge_match_filter\"]\n",
+ "print(f\"merge_match_filter edges: {len(mmf_edges)}\\n\")\n",
+ "for e in mmf_edges:\n",
+ " print(f\" {e.from_node.full_name:40s} -> {e.to_node.full_name}\")\n",
+ " print(f\" merge_column_role={e.merge_column_role!r} merge_condition={e.merge_condition!r}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "84235fc7",
+ "metadata": {},
+ "source": [
+ "## 5. Impact analysis — putting it all together\n",
+ "\n",
+ "With all 10 gap features in place, impact analysis queries return realistic, production-accurate answers. Two worked examples:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "id": "d69e186e",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.977182Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.977118Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.979531Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.979104Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Q1. Forward impact of raw_customer_cdc.after.city:\n",
+ "\n",
+ " dim_customer -> ['city', 'email', 'id', 'is_active', 'name']\n",
+ " main -> ['end_time', 'id', 'is_active']\n",
+ " mart_daily_revenue -> ['customer_city_at_order']\n",
+ " subquery_0 -> ['rn']\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Q1: \"What downstream columns depend on raw_customer_cdc.after.city?\"\n",
+ "# (The CDC envelope struct field, via dedup -> staging -> SCD2 dim -> fact -> mart)\n",
+ "print(\"Q1. Forward impact of raw_customer_cdc.after.city:\\n\")\n",
+ "downstream = pipeline.trace_column_forward(\"raw_customer_cdc\", \"after\")\n",
+ "by_table: dict[str, list] = {}\n",
+ "for col in downstream:\n",
+ " by_table.setdefault(col.table_name, []).append(col.column_name)\n",
+ "for table, cols in sorted(by_table.items()):\n",
+ " print(f\" {table:30s} -> {sorted(set(cols))}\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "id": "15739966",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.980423Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.980354Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.982085Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.981804Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Q2. Backward trace of mart_daily_revenue.customer_city_at_order:\n",
+ "\n",
+ " raw_customer_cdc <- ['after']\n",
+ " raw_orders <- ['customer_id', 'order_ts']\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Q2: \"What upstream columns drive mart_daily_revenue.customer_city_at_order?\"\n",
+ "print(\"Q2. Backward trace of mart_daily_revenue.customer_city_at_order:\\n\")\n",
+ "upstream = pipeline.trace_column_backward(\"mart_daily_revenue\", \"customer_city_at_order\")\n",
+ "by_table = {}\n",
+ "for col in upstream:\n",
+ " by_table.setdefault(col.table_name, []).append(col.column_name)\n",
+ "for table, cols in sorted(by_table.items()):\n",
+ " print(f\" {table:30s} <- {sorted(set(cols))}\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "id": "1f0afe39",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.982891Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.982836Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.984722Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.984392Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Q3. Forward impact of dim_customer.end_time (Gap 7 predicate-edge reachability):\n",
+ "\n",
+ " mart_daily_revenue -> ['customer_city_at_order']\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Q3: \"If we change the SCD2 validity semantics (dim_customer.end_time),\n",
+ "# what downstream columns in fact_orders / mart are affected via the BETWEEN join?\"\n",
+ "print(\"Q3. Forward impact of dim_customer.end_time (Gap 7 predicate-edge reachability):\\n\")\n",
+ "downstream = pipeline.trace_column_forward(\"dim_customer\", \"end_time\")\n",
+ "by_table = {}\n",
+ "for col in downstream:\n",
+ " by_table.setdefault(col.table_name, []).append(col.column_name)\n",
+ "for table, cols in sorted(by_table.items()):\n",
+ " print(f\" {table:30s} -> {sorted(set(cols))}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "7f84ebc6",
+ "metadata": {},
+ "source": [
+ "## 6. Edge-metadata summary\n",
+ "\n",
+ "A one-glance summary of what this pipeline produces, by edge type / flag."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "id": "9e27071f",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-15T21:49:27.985588Z",
+ "iopub.status.busy": "2026-04-15T21:49:27.985539Z",
+ "iopub.status.idle": "2026-04-15T21:49:27.988035Z",
+ "shell.execute_reply": "2026-04-15T21:49:27.987656Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Total edges: 95\n",
+ "\n",
+ "By edge_type:\n",
+ " where_filter 50\n",
+ " merge_update 12\n",
+ " direct_column 9\n",
+ " expression 6\n",
+ " join_predicate 5\n",
+ " cross_query_self_ref 4\n",
+ " window 2\n",
+ " star_passthrough 1\n",
+ " window_partition 1\n",
+ " window_order 1\n",
+ " qualify_partition 1\n",
+ " merge_match 1\n",
+ " merge_match_filter 1\n",
+ " aggregate 1\n",
+ "\n",
+ "By feature flag:\n",
+ " is_join_predicate 5\n",
+ " is_where_filter 50\n",
+ " is_qualify_column 1\n",
+ " access_type=struct 5\n",
+ " merge_column_role=condition 13\n",
+ " merge_column_role=assignment 0\n",
+ " edge_role=prior_state_read 28\n",
+ " edge_role=cross_query_self_ref 4\n"
+ ]
+ }
+ ],
+ "source": [
+ "from collections import Counter\n",
+ "\n",
+ "print(f\"Total edges: {len(pipeline.edges)}\\n\")\n",
+ "\n",
+ "edge_type_counts = Counter(e.edge_type for e in pipeline.edges)\n",
+ "print(\"By edge_type:\")\n",
+ "for et, n in edge_type_counts.most_common():\n",
+ " print(f\" {et:25s} {n}\")\n",
+ "\n",
+ "print(\"\\nBy feature flag:\")\n",
+ "flag_counts = {\n",
+ " \"is_join_predicate\": sum(1 for e in pipeline.edges if e.is_join_predicate),\n",
+ " \"is_where_filter\": sum(1 for e in pipeline.edges if e.is_where_filter),\n",
+ " \"is_qualify_column\": sum(1 for e in pipeline.edges if e.is_qualify_column),\n",
+ " \"access_type=struct\": sum(1 for e in pipeline.edges if e.access_type == \"struct\"),\n",
+ " \"merge_column_role=condition\": sum(\n",
+ " 1 for e in pipeline.edges if e.merge_column_role == \"condition\"\n",
+ " ),\n",
+ " \"merge_column_role=assignment\": sum(\n",
+ " 1 for e in pipeline.edges if e.merge_column_role == \"assignment\"\n",
+ " ),\n",
+ " \"edge_role=prior_state_read\": sum(\n",
+ " 1 for e in pipeline.edges if e.edge_role == \"prior_state_read\"\n",
+ " ),\n",
+ " \"edge_role=cross_query_self_ref\": sum(\n",
+ " 1 for e in pipeline.edges if e.edge_role == \"cross_query_self_ref\"\n",
+ " ),\n",
+ "}\n",
+ "for flag, n in flag_counts.items():\n",
+ " print(f\" {flag:35s} {n}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "104ee433",
+ "metadata": {},
+ "source": [
+ "## 7. Further reading\n",
+ "\n",
+ "- **Design doc:** `docs/superpowers/specs/2026-04-13-cdc-scd-pipeline-gaps-design.md`\n",
+ "- **Gap-specific designs:**\n",
+ " - `2026-04-13-gap4-self-referencing-target-design.md`\n",
+ " - `2026-04-13-gap7-join-predicate-columns-design.md`\n",
+ " - `2026-04-14-gaps-1-2-8-design.md`\n",
+ "- **Focused examples:**\n",
+ " - `examples/self_referencing_lineage.ipynb` — Gap 4 in isolation\n",
+ " - `examples/join_predicate_lineage.ipynb` — Gap 7 in isolation\n",
+ " - `examples/merge_lineage.ipynb` — Gap 3/9/10 MERGE mechanics\n",
+ "- **Integration tests:** `tests/test_cdc_scd_pipeline.py`"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.13.1"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}