From f5fe584d4e61d5ed70bfd4609b5d7d27b2c6c446 Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 12:30:21 -0400 Subject: [PATCH 1/9] feat: add merge_column_role field to ColumnEdge model Adds Optional[str] field to distinguish 'value' (RHS assignment) from 'condition' (WHEN clause gating) edges. No behavior change yet. Refs #63 --- src/clgraph/models.py | 1 + tests/test_merge_statements.py | 45 ++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/clgraph/models.py b/src/clgraph/models.py index f096de8..6639274 100644 --- a/src/clgraph/models.py +++ b/src/clgraph/models.py @@ -613,6 +613,7 @@ class ColumnEdge: is_merge_operation: bool = False # True if this edge is from a MERGE statement merge_action: Optional[str] = None # "match", "update", "insert", "delete" merge_condition: Optional[str] = None # Condition for conditional WHEN clauses + merge_column_role: Optional[str] = None # "value" or "condition" # ─── QUALIFY Clause Metadata ─── is_qualify_column: bool = False # True if this column is used in QUALIFY clause diff --git a/tests/test_merge_statements.py b/tests/test_merge_statements.py index 197c689..f46a11e 100644 --- a/tests/test_merge_statements.py +++ b/tests/test_merge_statements.py @@ -347,5 +347,50 @@ def test_merge_with_delete(self): assert "delete" in action_types or "update" in action_types +# ============================================================================ +# Test Group 10: MERGE Condition Column Role (Gap 10) +# ============================================================================ + + +class TestMergeColumnRole: + """Test that merge_column_role field exists on ColumnEdge.""" + + def test_merge_column_role_field_exists(self): + """Test that ColumnEdge has merge_column_role field.""" + from clgraph.models import ColumnEdge, ColumnNode + + source = ColumnNode(table_name="source", column_name="name", full_name="source.name") + target = ColumnNode( + table_name="target", column_name="end_time", full_name="target.end_time" + ) + edge = ColumnEdge( + from_node=source, + to_node=target, + edge_type="merge_update", + transformation="merge_update", + context="merge", + is_merge_operation=True, + merge_action="update", + merge_column_role="condition", + ) + assert edge.merge_column_role == "condition" + + def test_merge_column_role_defaults_none(self): + """Test that merge_column_role defaults to None.""" + from clgraph.models import ColumnEdge, ColumnNode + + source = ColumnNode(table_name="source", column_name="val", full_name="source.val") + target = ColumnNode(table_name="target", column_name="val", full_name="target.val") + edge = ColumnEdge( + from_node=source, + to_node=target, + edge_type="merge_update", + transformation="merge_update", + context="merge", + is_merge_operation=True, + ) + assert edge.merge_column_role is None + + if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"]) From dfbd6b777c8ab48df0549807a9165f2b30adf0f0 Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 12:31:51 -0400 Subject: [PATCH 2/9] feat: extract literal-bound ON predicates in MERGE (Gap 9) Column-literal EQ pairs in ON clause (e.g., t.is_active = 'Y') are now captured in match_filter_columns. Column-column pairs continue to flow through match_columns unchanged. Refs #63 --- src/clgraph/query_parser.py | 6 +++ tests/test_merge_statements.py | 69 ++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/src/clgraph/query_parser.py b/src/clgraph/query_parser.py index 9c23860..dbc67cc 100644 --- a/src/clgraph/query_parser.py +++ b/src/clgraph/query_parser.py @@ -628,12 +628,17 @@ def _parse_merge_statement( # Extract match columns from ON condition match_columns: List[Tuple[str, str]] = [] + match_filter_columns: List[Tuple[str, str]] = [] if match_condition: for eq in match_condition.find_all(exp.EQ): left_col = eq.left right_col = eq.right if isinstance(left_col, exp.Column) and isinstance(right_col, exp.Column): match_columns.append((left_col.name, right_col.name)) + elif isinstance(left_col, exp.Column) and not isinstance(right_col, exp.Column): + match_filter_columns.append((left_col.name, right_col.sql())) + elif isinstance(right_col, exp.Column) and not isinstance(left_col, exp.Column): + match_filter_columns.append((right_col.name, left_col.sql())) # Parse WHEN clauses from the 'whens' arg whens = merge_node.args.get("whens") @@ -698,6 +703,7 @@ def _parse_merge_statement( "source_alias": source_alias, "match_condition": match_condition_sql, "match_columns": match_columns, + "match_filter_columns": match_filter_columns, "matched_actions": matched_actions, "not_matched_actions": not_matched_actions, } diff --git a/tests/test_merge_statements.py b/tests/test_merge_statements.py index f46a11e..51c565d 100644 --- a/tests/test_merge_statements.py +++ b/tests/test_merge_statements.py @@ -132,6 +132,75 @@ def test_multiple_match_columns(self): assert "region" in col_names +class TestMatchFilterColumns: + """Test literal-bound predicates in MERGE ON clause (Gap 9).""" + + def test_literal_predicate_extracted(self): + """ON t.id = s.id AND t.is_active = 'Y' extracts is_active as filter column.""" + sql = """ + MERGE INTO dim_customer t + USING staging s ON t.id = s.id AND t.is_active = 'Y' + WHEN MATCHED THEN UPDATE SET t.name = s.name + """ + parser = RecursiveQueryParser(sql, dialect="postgres") + graph = parser.parse() + + merge_units = [u for u in graph.units.values() if u.unit_type == QueryUnitType.MERGE] + unit = merge_units[0] + config = unit.unpivot_config + + # Column-column pair should still work + match_columns = config.get("match_columns", []) + col_names = [col[0] for col in match_columns] + assert "id" in col_names + + # Literal-bound predicate should be in match_filter_columns + match_filter_columns = config.get("match_filter_columns", []) + assert len(match_filter_columns) >= 1 + filter_col_names = [col[0] for col in match_filter_columns] + assert "is_active" in filter_col_names + + def test_no_filter_columns_for_pure_equijoin(self): + """ON t.id = s.id with no literals has match_filter_columns key but empty list. + + Note: We assert the KEY exists (not just .get() with default) so + the test fails before Task 2 adds match_filter_columns to the config. + """ + sql = """ + MERGE INTO target t + USING source s ON t.id = s.id + WHEN MATCHED THEN UPDATE SET t.value = s.new_value + """ + parser = RecursiveQueryParser(sql, dialect="postgres") + graph = parser.parse() + + merge_units = [u for u in graph.units.values() if u.unit_type == QueryUnitType.MERGE] + unit = merge_units[0] + config = unit.unpivot_config + + assert "match_filter_columns" in config + assert config["match_filter_columns"] == [] + + def test_multiple_literal_predicates(self): + """Multiple literal predicates are all extracted.""" + sql = """ + MERGE INTO target t + USING source s ON t.id = s.id AND t.is_active = 'Y' AND t.region = 'US' + WHEN MATCHED THEN UPDATE SET t.value = s.new_value + """ + parser = RecursiveQueryParser(sql, dialect="postgres") + graph = parser.parse() + + merge_units = [u for u in graph.units.values() if u.unit_type == QueryUnitType.MERGE] + unit = merge_units[0] + config = unit.unpivot_config + + match_filter_columns = config.get("match_filter_columns", []) + filter_col_names = [col[0] for col in match_filter_columns] + assert "is_active" in filter_col_names + assert "region" in filter_col_names + + # ============================================================================ # Test Group 3: WHEN MATCHED Actions # ============================================================================ From 2dd657d99fd968f004422ab9e8044f9a02dc6c61 Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 12:33:33 -0400 Subject: [PATCH 3/9] feat: emit lineage edges for MERGE ON literal filter columns (Gap 9) Literal-bound ON predicates now produce merge_match_filter edges with merge_column_role='condition'. The target-side column (e.g., dim_customer.is_active) appears in lineage as a self-referencing condition dependency. Refs #63 --- src/clgraph/column_extractor.py | 17 +++++++++++ src/clgraph/lineage_builder.py | 7 ++++- src/clgraph/trace_strategies.py | 1 + tests/test_merge_statements.py | 50 +++++++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 1 deletion(-) diff --git a/src/clgraph/column_extractor.py b/src/clgraph/column_extractor.py index 85059bc..18a8279 100644 --- a/src/clgraph/column_extractor.py +++ b/src/clgraph/column_extractor.py @@ -284,6 +284,23 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict] output_cols.append(col_info) idx += 1 + # 1b. Literal-bound match filter columns (edges for ON clause literal predicates) + match_filter_columns = config.get("match_filter_columns", []) + for col_name, literal_val in match_filter_columns: + col_info = { + "index": idx, + "name": col_name, + "is_star": False, + "type": "merge_match_filter", + "expression": f"{target_alias}.{col_name} = {literal_val}", + "ast_node": None, + "source_columns": [(target_alias, col_name)], + "merge_action": "match", + "merge_column_role": "condition", + } + output_cols.append(col_info) + idx += 1 + # 2. WHEN MATCHED -> UPDATE columns for action in matched_actions: if action.get("action_type") == "update": diff --git a/src/clgraph/lineage_builder.py b/src/clgraph/lineage_builder.py index 1e190e2..573f47a 100644 --- a/src/clgraph/lineage_builder.py +++ b/src/clgraph/lineage_builder.py @@ -849,7 +849,12 @@ def _trace_column_dependencies(self, unit: QueryUnit, output_node: ColumnNode, c return # Branch 4: MERGE - if col_info.get("type") in ("merge_match", "merge_update", "merge_insert"): + if col_info.get("type") in ( + "merge_match", + "merge_update", + "merge_insert", + "merge_match_filter", + ): trace_merge_columns( self.lineage_graph, unit, diff --git a/src/clgraph/trace_strategies.py b/src/clgraph/trace_strategies.py index d9fc65a..23bcdf2 100644 --- a/src/clgraph/trace_strategies.py +++ b/src/clgraph/trace_strategies.py @@ -225,6 +225,7 @@ def trace_merge_columns( is_merge_operation=True, merge_action=merge_action, merge_condition=merge_condition, + merge_column_role=col_info.get("merge_column_role"), ) graph.add_edge(edge) diff --git a/tests/test_merge_statements.py b/tests/test_merge_statements.py index 51c565d..6b054b5 100644 --- a/tests/test_merge_statements.py +++ b/tests/test_merge_statements.py @@ -461,5 +461,55 @@ def test_merge_column_role_defaults_none(self): assert edge.merge_column_role is None +class TestMergeFilterLineage: + """Test lineage edges for literal-bound ON predicates (Gap 9 lineage).""" + + def test_literal_filter_column_produces_lineage_edge(self): + """ON t.is_active = 'Y' should produce a lineage edge for is_active.""" + sql = """ + MERGE INTO dim_customer t + USING staging s ON t.id = s.id AND t.is_active = 'Y' + WHEN MATCHED THEN UPDATE SET t.name = s.name + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + + # Should have an edge for is_active with merge_column_role="condition" + filter_edges = [ + e + for e in merge_edges + if e.merge_column_role == "condition" and e.to_node.column_name == "is_active" + ] + assert len(filter_edges) >= 1 + + # The filter edge should be tagged as merge_match_filter + edge = filter_edges[0] + assert edge.edge_type == "merge_match_filter" + # source_columns uses target_alias "t", but resolve_base_table_name + # resolves "t" -> "dim_customer" via alias_mapping + assert edge.from_node.table_name == "dim_customer" + assert edge.from_node.column_name == "is_active" + + def test_filter_edge_coexists_with_match_edge(self): + """Both column-column match edges and literal filter edges are produced.""" + sql = """ + MERGE INTO dim_customer t + USING staging s ON t.id = s.id AND t.is_active = 'Y' + WHEN MATCHED THEN UPDATE SET t.name = s.name + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + + match_edges = [e for e in merge_edges if e.edge_type == "merge_match"] + assert len(match_edges) >= 1 + + filter_edges = [e for e in merge_edges if e.edge_type == "merge_match_filter"] + assert len(filter_edges) >= 1 + + if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"]) From 778f918e1be50dbc901317989292b5d4f9087b21 Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 12:35:46 -0400 Subject: [PATCH 4/9] feat: parse WHEN condition columns and emit condition-gating edges (Gap 3+10) Reuses extract_columns_from_expr to extract column references from WHEN MATCHED AND conditions. trace_merge_columns emits condition edges with merge_column_role='condition'. Impact analysis on condition columns (e.g., staging.name -> dim_customer.end_time) now works. Also tags value-assignment edges with merge_column_role='value' and keeps merge_match edges with merge_column_role=None. Refs #63 --- src/clgraph/column_extractor.py | 8 ++ src/clgraph/trace_strategies.py | 40 ++++++++-- tests/test_merge_statements.py | 127 ++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 7 deletions(-) diff --git a/src/clgraph/column_extractor.py b/src/clgraph/column_extractor.py index 18a8279..1daa970 100644 --- a/src/clgraph/column_extractor.py +++ b/src/clgraph/column_extractor.py @@ -305,6 +305,13 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict] for action in matched_actions: if action.get("action_type") == "update": condition = action.get("condition") + # Note: target_alias is used as default_table, but WHEN conditions + # typically use qualified refs (t.name, s.name). extract_columns_from_expr + # uses the qualified table ref when present, so the default_table only + # applies to unqualified column names. + condition_columns = ( + extract_columns_from_expr(condition, target_alias) if condition else [] + ) for target_col, source_expr in action.get("column_mappings", {}).items(): col_info = { "index": idx, @@ -316,6 +323,7 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict] "source_columns": extract_columns_from_expr(source_expr, source_alias), "merge_action": "update", "merge_condition": condition, + "condition_columns": condition_columns, } output_cols.append(col_info) idx += 1 diff --git a/src/clgraph/trace_strategies.py b/src/clgraph/trace_strategies.py index 23bcdf2..18867bd 100644 --- a/src/clgraph/trace_strategies.py +++ b/src/clgraph/trace_strategies.py @@ -196,25 +196,32 @@ def trace_merge_columns( merge_action = col_info.get("merge_action", col_info.get("type")) merge_condition = col_info.get("merge_condition") source_refs = col_info.get("source_columns", []) + condition_refs = col_info.get("condition_columns", []) - for source_ref in source_refs: - table_ref, col_name = source_ref[:2] - - # Try to resolve as a source unit or base table + def _resolve_to_node(table_ref, col_name): + """Resolve a (table, column) ref to a ColumnNode.""" source_node = None source_unit = resolve_source_unit(unit, table_ref) if table_ref else None if source_unit: source_node = find_column_in_unit(source_unit, col_name) if not source_node: - # Try as base table base_table = resolve_base_table_name(unit, table_ref) if table_ref else None if base_table: source_node = find_or_create_table_column_node(graph, base_table, col_name) elif table_ref: - # Fallback: use table_ref directly source_node = find_or_create_table_column_node(graph, table_ref, col_name) + return source_node + # Value-assignment edges (RHS of SET) or match/filter edges + for source_ref in source_refs: + table_ref, col_name = source_ref[:2] + source_node = _resolve_to_node(table_ref, col_name) if source_node: + # Determine role: match edges get None, update/insert get "value", + # merge_match_filter edges keep their explicit "condition" role + role = col_info.get("merge_column_role") + if role is None and col_info["type"] in ("merge_update", "merge_insert"): + role = "value" edge = ColumnEdge( from_node=source_node, to_node=output_node, @@ -225,7 +232,26 @@ def trace_merge_columns( is_merge_operation=True, merge_action=merge_action, merge_condition=merge_condition, - merge_column_role=col_info.get("merge_column_role"), + merge_column_role=role, + ) + graph.add_edge(edge) + + # Condition-gating edges (from WHEN AND clause) + for cond_ref in condition_refs: + table_ref, col_name = cond_ref[:2] + source_node = _resolve_to_node(table_ref, col_name) + if source_node: + edge = ColumnEdge( + from_node=source_node, + to_node=output_node, + edge_type=col_info["type"], + transformation=col_info["type"], + context=unit.unit_type.value, + expression=merge_condition, + is_merge_operation=True, + merge_action=merge_action, + merge_condition=merge_condition, + merge_column_role="condition", ) graph.add_edge(edge) diff --git a/tests/test_merge_statements.py b/tests/test_merge_statements.py index 6b054b5..c8526e8 100644 --- a/tests/test_merge_statements.py +++ b/tests/test_merge_statements.py @@ -511,5 +511,132 @@ def test_filter_edge_coexists_with_match_edge(self): assert len(filter_edges) >= 1 +# ============================================================================ +# Test Group 11: MERGE Condition Column Parsing (Gap 3) +# ============================================================================ + + +class TestMergeConditionColumns: + """Test WHEN MATCHED condition columns extracted as refs (Gap 3).""" + + def test_when_condition_columns_extracted(self): + """WHEN MATCHED AND (t.name <> s.name) extracts name as condition column.""" + sql = """ + MERGE INTO dim_customer t + USING staging s ON t.id = s.id + WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city) THEN + UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N' + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + + end_time_cond_edges = [ + e + for e in merge_edges + if e.to_node.column_name == "end_time" and e.merge_column_role == "condition" + ] + cond_col_names = {e.from_node.column_name for e in end_time_cond_edges} + assert "name" in cond_col_names + assert "city" in cond_col_names + + def test_is_active_has_same_condition_columns(self): + """Both assigned columns share the same condition columns.""" + sql = """ + MERGE INTO dim_customer t + USING staging s ON t.id = s.id + WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city) THEN + UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N' + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + + is_active_cond_edges = [ + e + for e in merge_edges + if e.to_node.column_name == "is_active" and e.merge_column_role == "condition" + ] + cond_col_names = {e.from_node.column_name for e in is_active_cond_edges} + assert "name" in cond_col_names + assert "city" in cond_col_names + + def test_condition_columns_include_both_sides(self): + """Condition t.name <> s.name includes columns from both target and source.""" + sql = """ + MERGE INTO dim_customer t + USING staging s ON t.id = s.id + WHEN MATCHED AND (t.name <> s.name) THEN + UPDATE SET t.end_time = current_timestamp() + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + end_time_cond_edges = [ + e + for e in merge_edges + if e.to_node.column_name == "end_time" and e.merge_column_role == "condition" + ] + assert len(end_time_cond_edges) >= 2 + source_tables = {e.from_node.table_name for e in end_time_cond_edges} + assert "dim_customer" in source_tables, "Expected target-side condition column" + assert "staging" in source_tables, "Expected source-side condition column" + + def test_no_condition_columns_for_unconditional_update(self): + """WHEN MATCHED without AND condition produces no condition edges.""" + sql = """ + MERGE INTO target t + USING source s ON t.id = s.id + WHEN MATCHED THEN UPDATE SET t.value = s.new_value + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + assert len(merge_edges) > 0, "Expected merge edges to exist" + cond_edges = [e for e in merge_edges if e.merge_column_role == "condition"] + assert cond_edges == [] + + +class TestMergeValueRole: + """Test that value-assignment edges are tagged with merge_column_role='value'.""" + + def test_update_value_edges_tagged(self): + """UPDATE SET t.value = s.new_value produces edge with role='value'.""" + sql = """ + MERGE INTO target t + USING source s ON t.id = s.id + WHEN MATCHED THEN UPDATE SET t.value = s.new_value + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + update_edges = [ + e for e in graph.edges if e.is_merge_operation and e.merge_action == "update" + ] + assert len(update_edges) >= 1 + for edge in update_edges: + assert edge.merge_column_role == "value" + + def test_match_edges_have_no_role(self): + """ON clause match edges should have merge_column_role=None.""" + sql = """ + MERGE INTO target t + USING source s ON t.id = s.id + WHEN MATCHED THEN UPDATE SET t.value = s.new_value + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + match_edges = [ + e for e in graph.edges if e.is_merge_operation and e.edge_type == "merge_match" + ] + for edge in match_edges: + assert edge.merge_column_role is None + + if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"]) From 189953665bcd41b72142d7751214711dc74513ac Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 12:39:16 -0400 Subject: [PATCH 5/9] feat: extract condition columns for NOT MATCHED INSERT actions Applies the same condition_columns extraction to WHEN NOT MATCHED INSERT actions, so conditional inserts like 'AND s.op = c' produce condition-gating lineage edges. Refs #63 --- src/clgraph/column_extractor.py | 4 ++++ tests/test_merge_statements.py | 24 ++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/src/clgraph/column_extractor.py b/src/clgraph/column_extractor.py index 1daa970..24fd9fb 100644 --- a/src/clgraph/column_extractor.py +++ b/src/clgraph/column_extractor.py @@ -332,6 +332,9 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict] for action in not_matched_actions: if action.get("action_type") == "insert": condition = action.get("condition") + condition_columns = ( + extract_columns_from_expr(condition, source_alias) if condition else [] + ) for target_col, source_expr in action.get("column_mappings", {}).items(): col_info = { "index": idx, @@ -343,6 +346,7 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict] "source_columns": extract_columns_from_expr(source_expr, source_alias), "merge_action": "insert", "merge_condition": condition, + "condition_columns": condition_columns, } output_cols.append(col_info) idx += 1 diff --git a/tests/test_merge_statements.py b/tests/test_merge_statements.py index c8526e8..fa0afd5 100644 --- a/tests/test_merge_statements.py +++ b/tests/test_merge_statements.py @@ -638,5 +638,29 @@ def test_match_edges_have_no_role(self): assert edge.merge_column_role is None +class TestMergeInsertConditionColumns: + """Test WHEN NOT MATCHED condition columns for INSERT actions.""" + + def test_not_matched_condition_columns_extracted(self): + """WHEN NOT MATCHED AND s.op = 'c' should produce condition edges.""" + sql = """ + MERGE INTO target t + USING source s ON t.id = s.id + WHEN NOT MATCHED AND s.op = 'c' THEN + INSERT (id, value) VALUES (s.id, s.value) + """ + builder = RecursiveLineageBuilder(sql, dialect="postgres") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + id_cond_edges = [ + e + for e in merge_edges + if e.to_node.column_name == "id" and e.merge_column_role == "condition" + ] + cond_col_names = {e.from_node.column_name for e in id_cond_edges} + assert "op" in cond_col_names + + if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"]) From e6e2867d9a9baea83c3fb4e347ce52317a31d08e Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 12:40:13 -0400 Subject: [PATCH 6/9] test: add SCD2 integration tests for MERGE condition lineage End-to-end tests verify all three gaps (3, 9, 10) working together on the canonical SCD2 MERGE pattern from the CDC pipeline design. Refs #63 --- tests/test_merge_statements.py | 72 ++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/tests/test_merge_statements.py b/tests/test_merge_statements.py index fa0afd5..ada9688 100644 --- a/tests/test_merge_statements.py +++ b/tests/test_merge_statements.py @@ -662,5 +662,77 @@ def test_not_matched_condition_columns_extracted(self): assert "op" in cond_col_names +# ============================================================================ +# Test Group 12: SCD2 End-to-End Integration (Gaps 3, 9, 10) +# ============================================================================ + + +class TestSCD2MergeConditionLineage: + """End-to-end test: SCD2 MERGE with all three gaps fixed.""" + + SCD2_SQL = """ + MERGE INTO dim_customer t + USING staging_customer_latest s ON t.id = s.id AND t.is_active = 'Y' + WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city OR t.email <> s.email) THEN + UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N' + """ + + def test_gap9_on_literal_filter_in_lineage(self): + """Gap 9: is_active from ON clause appears in lineage.""" + builder = RecursiveLineageBuilder(self.SCD2_SQL, dialect="databricks") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + filter_edges = [ + e + for e in merge_edges + if e.edge_type == "merge_match_filter" and e.from_node.column_name == "is_active" + ] + assert len(filter_edges) >= 1 + assert filter_edges[0].merge_column_role == "condition" + + def test_gap3_when_condition_columns_in_lineage(self): + """Gap 3: name, city, email from WHEN condition are upstream of end_time.""" + builder = RecursiveLineageBuilder(self.SCD2_SQL, dialect="databricks") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + end_time_cond_edges = [ + e + for e in merge_edges + if e.to_node.column_name == "end_time" and e.merge_column_role == "condition" + ] + cond_col_names = {e.from_node.column_name for e in end_time_cond_edges} + assert "name" in cond_col_names + assert "city" in cond_col_names + assert "email" in cond_col_names + + def test_gap10_condition_only_on_literal_assigned_column(self): + """Gap 10: is_active = 'N' (literal RHS) has condition edges but no value edges.""" + builder = RecursiveLineageBuilder(self.SCD2_SQL, dialect="databricks") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + + is_active_edges = [e for e in merge_edges if e.to_node.column_name == "is_active"] + roles = {e.merge_column_role for e in is_active_edges} + assert "condition" in roles + assert "value" not in roles + + def test_gap10_impact_analysis_name_reaches_end_time(self): + """Gap 10: impact analysis from staging.name should reach end_time.""" + builder = RecursiveLineageBuilder(self.SCD2_SQL, dialect="databricks") + graph = builder.build() + + merge_edges = [e for e in graph.edges if e.is_merge_operation] + + end_time_upstream = [ + e.from_node.column_name for e in merge_edges if e.to_node.column_name == "end_time" + ] + assert "name" in end_time_upstream + assert "city" in end_time_upstream + assert "email" in end_time_upstream + + if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"]) From 8429f89aa5d170c028d1384053125d91a1dbfa9b Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 12:41:42 -0400 Subject: [PATCH 7/9] feat: include merge_column_role in JSON export MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds merge_column_role to the MERGE metadata block in JSONExporter so condition vs value edges are visible in exported data. Also fixes pipeline_lineage_builder.py to propagate merge_column_role when reconstructing ColumnEdge objects during pipeline assembly — without this, the field was always None even though trace_strategies set it correctly. Refs #63 --- src/clgraph/export.py | 1 + src/clgraph/pipeline_lineage_builder.py | 1 + tests/test_merge_statements.py | 21 +++++++++++++++++++++ 3 files changed, 23 insertions(+) diff --git a/src/clgraph/export.py b/src/clgraph/export.py index 1c64437..7e80e48 100644 --- a/src/clgraph/export.py +++ b/src/clgraph/export.py @@ -161,6 +161,7 @@ def export( edge_dict["is_merge_operation"] = True edge_dict["merge_action"] = getattr(edge, "merge_action", None) edge_dict["merge_condition"] = getattr(edge, "merge_condition", None) + edge_dict["merge_column_role"] = getattr(edge, "merge_column_role", None) # Include QUALIFY clause metadata if present if getattr(edge, "is_qualify_column", False): diff --git a/src/clgraph/pipeline_lineage_builder.py b/src/clgraph/pipeline_lineage_builder.py index 55191af..64e4dd8 100644 --- a/src/clgraph/pipeline_lineage_builder.py +++ b/src/clgraph/pipeline_lineage_builder.py @@ -450,6 +450,7 @@ def _add_query_edges( is_merge_operation=getattr(edge, "is_merge_operation", False), merge_action=getattr(edge, "merge_action", None), merge_condition=getattr(edge, "merge_condition", None), + merge_column_role=getattr(edge, "merge_column_role", None), # Preserve QUALIFY clause metadata is_qualify_column=getattr(edge, "is_qualify_column", False), qualify_context=getattr(edge, "qualify_context", None), diff --git a/tests/test_merge_statements.py b/tests/test_merge_statements.py index ada9688..e111f4d 100644 --- a/tests/test_merge_statements.py +++ b/tests/test_merge_statements.py @@ -734,5 +734,26 @@ def test_gap10_impact_analysis_name_reaches_end_time(self): assert "email" in end_time_upstream +class TestMergeRoleExport: + """Test that merge_column_role appears in JSON export.""" + + def test_merge_column_role_in_export(self): + """merge_column_role should appear in exported edge data.""" + sql = """ + MERGE INTO dim_customer t + USING staging s ON t.id = s.id + WHEN MATCHED AND (t.name <> s.name) THEN + UPDATE SET t.end_time = current_timestamp() + """ + pipeline = Pipeline([("scd2_close", sql)], dialect="postgres") + + exporter = JSONExporter() + data = exporter.export(pipeline) + + edges = data.get("edges", []) + condition_edges = [e for e in edges if e.get("merge_column_role") == "condition"] + assert len(condition_edges) >= 1 + + if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"]) From 68fe1a6c1523b75d4b752a68661b9e240c035477 Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 12:44:57 -0400 Subject: [PATCH 8/9] docs: improve comments for merge_column_role and NOT MATCHED default_table Refs #63 --- src/clgraph/column_extractor.py | 2 ++ src/clgraph/models.py | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/clgraph/column_extractor.py b/src/clgraph/column_extractor.py index 24fd9fb..7a04d4f 100644 --- a/src/clgraph/column_extractor.py +++ b/src/clgraph/column_extractor.py @@ -332,6 +332,8 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict] for action in not_matched_actions: if action.get("action_type") == "insert": condition = action.get("condition") + # Note: source_alias is used as default_table (not target_alias) because + # NOT MATCHED conditions reference source rows (target row doesn't exist). condition_columns = ( extract_columns_from_expr(condition, source_alias) if condition else [] ) diff --git a/src/clgraph/models.py b/src/clgraph/models.py index 6639274..07631f4 100644 --- a/src/clgraph/models.py +++ b/src/clgraph/models.py @@ -613,7 +613,9 @@ class ColumnEdge: is_merge_operation: bool = False # True if this edge is from a MERGE statement merge_action: Optional[str] = None # "match", "update", "insert", "delete" merge_condition: Optional[str] = None # Condition for conditional WHEN clauses - merge_column_role: Optional[str] = None # "value" or "condition" + merge_column_role: Optional[str] = ( + None # None (match), "value" (SET RHS), or "condition" (WHEN guard / ON filter) + ) # ─── QUALIFY Clause Metadata ─── is_qualify_column: bool = False # True if this column is used in QUALIFY clause From 56a50a6ccbf316ece3088c3448dc1c6f0c6a055a Mon Sep 17 00:00:00 2001 From: Ming Jer Lee Date: Tue, 14 Apr 2026 13:00:55 -0400 Subject: [PATCH 9/9] docs: add condition column lineage examples to merge notebook Examples 5-8 demonstrate SCD2 condition dependencies, impact analysis, ON clause literal filters, and JSON export with merge_column_role. Refs #63 --- examples/merge_lineage.ipynb | 510 ++++++++++++++++++++++++++++++++++- 1 file changed, 497 insertions(+), 13 deletions(-) diff --git a/examples/merge_lineage.ipynb b/examples/merge_lineage.ipynb index a7a5cd0..bd56a26 100644 --- a/examples/merge_lineage.ipynb +++ b/examples/merge_lineage.ipynb @@ -35,10 +35,10 @@ "id": "76babcb3", "metadata": { "execution": { - "iopub.execute_input": "2025-12-30T20:10:44.159064Z", - "iopub.status.busy": "2025-12-30T20:10:44.158846Z", - "iopub.status.idle": "2025-12-30T20:10:44.230550Z", - "shell.execute_reply": "2025-12-30T20:10:44.230171Z" + "iopub.execute_input": "2026-04-14T17:00:42.052943Z", + "iopub.status.busy": "2026-04-14T17:00:42.052862Z", + "iopub.status.idle": "2026-04-14T17:00:42.108311Z", + "shell.execute_reply": "2026-04-14T17:00:42.107982Z" } }, "outputs": [ @@ -134,7 +134,8 @@ " \"query_id\": \"merge_op\",\n", " \"is_merge_operation\": true,\n", " \"merge_action\": \"match\",\n", - " \"merge_condition\": null\n", + " \"merge_condition\": null,\n", + " \"merge_column_role\": null\n", "}\n", "{\n", " \"from_column\": \"source.new_value\",\n", @@ -144,7 +145,8 @@ " \"query_id\": \"merge_op\",\n", " \"is_merge_operation\": true,\n", " \"merge_action\": \"update\",\n", - " \"merge_condition\": null\n", + " \"merge_condition\": null,\n", + " \"merge_column_role\": \"value\"\n", "}\n", "\n", "============================================================\n", @@ -284,16 +286,498 @@ { "cell_type": "markdown", "id": "c733616w8q4", - "source": "### Visualize Pipeline Lineage\n\nDisplay the simplified column lineage for MERGE statement queries.", - "metadata": {} + "metadata": {}, + "source": [ + "### Visualize Pipeline Lineage\n", + "\n", + "Display the simplified column lineage for MERGE statement queries." + ] }, { "cell_type": "code", + "execution_count": 2, "id": "uzr9h77vv6", - "source": "import shutil\n\nfrom clgraph import visualize_pipeline_lineage\n\n# Create pipeline for visualization\nsql_merge = \"\"\"\nMERGE INTO target t\nUSING source s ON t.id = s.id\nWHEN MATCHED THEN UPDATE SET t.value = s.new_value\nWHEN NOT MATCHED THEN INSERT (id, value) VALUES (s.id, s.new_value)\n\"\"\"\nmerge_pipeline = Pipeline([(\"merge_op\", sql_merge)], dialect=\"postgres\")\n\nif shutil.which(\"dot\") is None:\n print(\"⚠️ Graphviz not installed. Install with: brew install graphviz\")\nelse:\n print(\"MERGE Pipeline - Simplified Lineage:\")\n display(visualize_pipeline_lineage(merge_pipeline.column_graph.to_simplified()))", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-14T17:00:42.109611Z", + "iopub.status.busy": "2026-04-14T17:00:42.109539Z", + "iopub.status.idle": "2026-04-14T17:00:42.192679Z", + "shell.execute_reply": "2026-04-14T17:00:42.192304Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "MERGE Pipeline - Simplified Lineage:\n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "cluster_merge_op__external__source\n", + "\n", + "📊 source\n", + "\n", + "\n", + "\n", + "source_id\n", + "\n", + "\n", + "id\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "source_new_value\n", + "\n", + "\n", + "new_value\n", + "\n", + "\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import shutil\n", + "\n", + "from clgraph import visualize_pipeline_lineage\n", + "\n", + "# Create pipeline for visualization\n", + "sql_merge = \"\"\"\n", + "MERGE INTO target t\n", + "USING source s ON t.id = s.id\n", + "WHEN MATCHED THEN UPDATE SET t.value = s.new_value\n", + "WHEN NOT MATCHED THEN INSERT (id, value) VALUES (s.id, s.new_value)\n", + "\"\"\"\n", + "merge_pipeline = Pipeline([(\"merge_op\", sql_merge)], dialect=\"postgres\")\n", + "\n", + "if shutil.which(\"dot\") is None:\n", + " print(\"⚠️ Graphviz not installed. Install with: brew install graphviz\")\n", + "else:\n", + " print(\"MERGE Pipeline - Simplified Lineage:\")\n", + " display(visualize_pipeline_lineage(merge_pipeline.column_graph.to_simplified()))" + ] + }, + { + "cell_type": "markdown", + "id": "a22e9db8", + "metadata": {}, + "source": [ + "## Condition Column Lineage (Gaps 3, 9, 10)\n", + "\n", + "MERGE statements have two kinds of upstream dependencies for assigned target columns:\n", + "\n", + "- **Value dependencies** — the RHS of `SET` (e.g., `t.end_time = current_timestamp()`)\n", + "- **Condition dependencies** — columns in `WHEN MATCHED AND (...)` or literal-bound `ON` predicates (e.g., `t.is_active = 'Y'`)\n", + "\n", + "clgraph now tracks both, distinguishing them via `merge_column_role` on each edge:\n", + "- `None` — ON clause match columns (equi-join pairs)\n", + "- `\"value\"` — SET assignment RHS columns\n", + "- `\"condition\"` — WHEN guard or ON literal filter columns" + ] + }, + { + "cell_type": "markdown", + "id": "2a9b42c3", + "metadata": {}, + "source": [ + "### Example 5: SCD2 MERGE — Condition Columns as Dependencies\n", + "\n", + "In a Slowly Changing Dimension Type 2 pattern, the WHEN MATCHED condition determines\n", + "*which rows get closed*. The columns in that condition (`name`, `city`, `email`) are\n", + "gating dependencies of the assigned columns (`end_time`, `is_active`)." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "0c55e121", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-14T17:00:42.193907Z", + "iopub.status.busy": "2026-04-14T17:00:42.193816Z", + "iopub.status.idle": "2026-04-14T17:00:42.202124Z", + "shell.execute_reply": "2026-04-14T17:00:42.201754Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "SCD2 MERGE — All lineage edges with roles:\n", + "\n", + " dim_customer.email -> end_time [condition]\n", + " staging_customer_latest.email -> end_time [condition]\n", + " dim_customer.name -> end_time [condition]\n", + " staging_customer_latest.name -> end_time [condition]\n", + " dim_customer.city -> end_time [condition]\n", + " staging_customer_latest.city -> end_time [condition]\n", + " staging_customer_latest.id -> id [match]\n", + " dim_customer.is_active -> is_active [condition]\n", + " dim_customer.email -> is_active [condition]\n", + " staging_customer_latest.email -> is_active [condition]\n", + " dim_customer.name -> is_active [condition]\n", + " staging_customer_latest.name -> is_active [condition]\n", + " dim_customer.city -> is_active [condition]\n", + " staging_customer_latest.city -> is_active [condition]\n" + ] + } + ], + "source": [ + "scd2_sql = \"\"\"\n", + "MERGE INTO dim_customer t\n", + "USING staging_customer_latest s ON t.id = s.id AND t.is_active = 'Y'\n", + "WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city OR t.email <> s.email) THEN\n", + " UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N'\n", + "\"\"\"\n", + "\n", + "builder = RecursiveLineageBuilder(scd2_sql, dialect=\"databricks\")\n", + "graph = builder.build()\n", + "\n", + "print(\"SCD2 MERGE — All lineage edges with roles:\\n\")\n", + "for edge in sorted(graph.edges, key=lambda e: (e.to_node.column_name, e.merge_column_role or \"\")):\n", + " if edge.is_merge_operation:\n", + " role = edge.merge_column_role or \"match\"\n", + " print(f\" {edge.from_node.full_name:>35s} -> {edge.to_node.column_name:<12s} [{role}]\")" + ] + }, + { + "cell_type": "markdown", + "id": "c2a11f10", "metadata": {}, - "execution_count": null, - "outputs": [] + "source": [ + "### Example 6: Impact Analysis — \"What breaks if `staging.name` changes?\"\n", + "\n", + "Condition edges make impact analysis complete. Without them, `end_time` would appear\n", + "to have *no* upstream column dependency (since `current_timestamp()` has none).\n", + "With condition edges, we can see that changes to `name`, `city`, or `email` affect\n", + "which rows get their `end_time` updated." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "c4fdba7f", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-14T17:00:42.203108Z", + "iopub.status.busy": "2026-04-14T17:00:42.203043Z", + "iopub.status.idle": "2026-04-14T17:00:42.205168Z", + "shell.execute_reply": "2026-04-14T17:00:42.204798Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Impact analysis: What does end_time depend on?\n", + "\n", + " dim_customer.email role=condition\n", + " staging_customer_latest.email role=condition\n", + " dim_customer.name role=condition\n", + " staging_customer_latest.name role=condition\n", + " dim_customer.city role=condition\n", + " staging_customer_latest.city role=condition\n", + "\n", + "---\n", + "\n", + "Impact analysis: What does is_active depend on?\n", + "\n", + " dim_customer.is_active role=condition\n", + " dim_customer.email role=condition\n", + " staging_customer_latest.email role=condition\n", + " dim_customer.name role=condition\n", + " staging_customer_latest.name role=condition\n", + " dim_customer.city role=condition\n", + " staging_customer_latest.city role=condition\n" + ] + } + ], + "source": [ + "print(\"Impact analysis: What does end_time depend on?\\n\")\n", + "for edge in graph.edges:\n", + " if edge.is_merge_operation and edge.to_node.column_name == \"end_time\":\n", + " role = edge.merge_column_role or \"match\"\n", + " print(f\" {edge.from_node.full_name:<40s} role={role}\")\n", + "\n", + "print(\"\\n---\")\n", + "print(\"\\nImpact analysis: What does is_active depend on?\\n\")\n", + "for edge in graph.edges:\n", + " if edge.is_merge_operation and edge.to_node.column_name == \"is_active\":\n", + " role = edge.merge_column_role or \"match\"\n", + " print(f\" {edge.from_node.full_name:<40s} role={role}\")" + ] + }, + { + "cell_type": "markdown", + "id": "58cc7953", + "metadata": {}, + "source": [ + "### Example 7: ON Clause Literal Filter (Gap 9)\n", + "\n", + "The `ON t.is_active = 'Y'` literal predicate is now tracked as a `merge_match_filter`\n", + "edge, distinct from the column-column `merge_match` edge for `t.id = s.id`." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "0341d366", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-14T17:00:42.206274Z", + "iopub.status.busy": "2026-04-14T17:00:42.206205Z", + "iopub.status.idle": "2026-04-14T17:00:42.207959Z", + "shell.execute_reply": "2026-04-14T17:00:42.207680Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "ON clause edges:\n", + "\n", + " staging_customer_latest.id type=merge_match role=None\n", + " dim_customer.is_active type=merge_match_filter role=condition\n" + ] + } + ], + "source": [ + "print(\"ON clause edges:\\n\")\n", + "for edge in graph.edges:\n", + " if edge.is_merge_operation and edge.edge_type in (\"merge_match\", \"merge_match_filter\"):\n", + " print(\n", + " f\" {edge.from_node.full_name:<35s} type={edge.edge_type:<20s} role={edge.merge_column_role}\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "38c12f1b", + "metadata": {}, + "source": [ + "### Example 8: JSON Export with `merge_column_role`\n", + "\n", + "The `merge_column_role` field appears in JSON exports, allowing downstream tools\n", + "to filter or weight condition vs value dependencies differently." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "f888f685", + "metadata": { + "execution": { + "iopub.execute_input": "2026-04-14T17:00:42.208875Z", + "iopub.status.busy": "2026-04-14T17:00:42.208817Z", + "iopub.status.idle": "2026-04-14T17:00:42.212202Z", + "shell.execute_reply": "2026-04-14T17:00:42.211921Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Condition edges in JSON export:\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.is_active\",\n", + " \"to_column\": \"scd2_close:subq:main.is_active\",\n", + " \"edge_type\": \"merge_match_filter\",\n", + " \"transformation\": \"merge_match_filter\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"match\",\n", + " \"merge_condition\": null,\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.email\",\n", + " \"to_column\": \"scd2_close:subq:main.end_time\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.email\",\n", + " \"to_column\": \"scd2_close:subq:main.end_time\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.name\",\n", + " \"to_column\": \"scd2_close:subq:main.end_time\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.name\",\n", + " \"to_column\": \"scd2_close:subq:main.end_time\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.city\",\n", + " \"to_column\": \"scd2_close:subq:main.end_time\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.city\",\n", + " \"to_column\": \"scd2_close:subq:main.end_time\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.email\",\n", + " \"to_column\": \"scd2_close:subq:main.is_active\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.email\",\n", + " \"to_column\": \"scd2_close:subq:main.is_active\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.name\",\n", + " \"to_column\": \"scd2_close:subq:main.is_active\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.name\",\n", + " \"to_column\": \"scd2_close:subq:main.is_active\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.city\",\n", + " \"to_column\": \"scd2_close:subq:main.is_active\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n", + "{\n", + " \"from_column\": \"staging_customer_latest.city\",\n", + " \"to_column\": \"scd2_close:subq:main.is_active\",\n", + " \"edge_type\": \"merge_update\",\n", + " \"transformation\": \"merge_update\",\n", + " \"query_id\": \"scd2_close\",\n", + " \"is_merge_operation\": true,\n", + " \"merge_action\": \"update\",\n", + " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n", + " \"merge_column_role\": \"condition\"\n", + "}\n", + "\n" + ] + } + ], + "source": [ + "import json\n", + "\n", + "scd2_pipeline = Pipeline([(\"scd2_close\", scd2_sql)], dialect=\"databricks\")\n", + "export_data = JSONExporter().export(scd2_pipeline)\n", + "\n", + "print(\"Condition edges in JSON export:\\n\")\n", + "for edge in export_data.get(\"edges\", []):\n", + " if edge.get(\"merge_column_role\") == \"condition\":\n", + " print(json.dumps(edge, indent=2))\n", + " print()" + ] } ], "metadata": { @@ -312,9 +796,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.13.0" + "version": "3.13.1" } }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +}