diff --git a/examples/merge_lineage.ipynb b/examples/merge_lineage.ipynb
index a7a5cd0..bd56a26 100644
--- a/examples/merge_lineage.ipynb
+++ b/examples/merge_lineage.ipynb
@@ -35,10 +35,10 @@
"id": "76babcb3",
"metadata": {
"execution": {
- "iopub.execute_input": "2025-12-30T20:10:44.159064Z",
- "iopub.status.busy": "2025-12-30T20:10:44.158846Z",
- "iopub.status.idle": "2025-12-30T20:10:44.230550Z",
- "shell.execute_reply": "2025-12-30T20:10:44.230171Z"
+ "iopub.execute_input": "2026-04-14T17:00:42.052943Z",
+ "iopub.status.busy": "2026-04-14T17:00:42.052862Z",
+ "iopub.status.idle": "2026-04-14T17:00:42.108311Z",
+ "shell.execute_reply": "2026-04-14T17:00:42.107982Z"
}
},
"outputs": [
@@ -134,7 +134,8 @@
" \"query_id\": \"merge_op\",\n",
" \"is_merge_operation\": true,\n",
" \"merge_action\": \"match\",\n",
- " \"merge_condition\": null\n",
+ " \"merge_condition\": null,\n",
+ " \"merge_column_role\": null\n",
"}\n",
"{\n",
" \"from_column\": \"source.new_value\",\n",
@@ -144,7 +145,8 @@
" \"query_id\": \"merge_op\",\n",
" \"is_merge_operation\": true,\n",
" \"merge_action\": \"update\",\n",
- " \"merge_condition\": null\n",
+ " \"merge_condition\": null,\n",
+ " \"merge_column_role\": \"value\"\n",
"}\n",
"\n",
"============================================================\n",
@@ -284,16 +286,498 @@
{
"cell_type": "markdown",
"id": "c733616w8q4",
- "source": "### Visualize Pipeline Lineage\n\nDisplay the simplified column lineage for MERGE statement queries.",
- "metadata": {}
+ "metadata": {},
+ "source": [
+ "### Visualize Pipeline Lineage\n",
+ "\n",
+ "Display the simplified column lineage for MERGE statement queries."
+ ]
},
{
"cell_type": "code",
+ "execution_count": 2,
"id": "uzr9h77vv6",
- "source": "import shutil\n\nfrom clgraph import visualize_pipeline_lineage\n\n# Create pipeline for visualization\nsql_merge = \"\"\"\nMERGE INTO target t\nUSING source s ON t.id = s.id\nWHEN MATCHED THEN UPDATE SET t.value = s.new_value\nWHEN NOT MATCHED THEN INSERT (id, value) VALUES (s.id, s.new_value)\n\"\"\"\nmerge_pipeline = Pipeline([(\"merge_op\", sql_merge)], dialect=\"postgres\")\n\nif shutil.which(\"dot\") is None:\n print(\"⚠️ Graphviz not installed. Install with: brew install graphviz\")\nelse:\n print(\"MERGE Pipeline - Simplified Lineage:\")\n display(visualize_pipeline_lineage(merge_pipeline.column_graph.to_simplified()))",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-14T17:00:42.109611Z",
+ "iopub.status.busy": "2026-04-14T17:00:42.109539Z",
+ "iopub.status.idle": "2026-04-14T17:00:42.192679Z",
+ "shell.execute_reply": "2026-04-14T17:00:42.192304Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "MERGE Pipeline - Simplified Lineage:\n"
+ ]
+ },
+ {
+ "data": {
+ "image/svg+xml": [
+ "\n",
+ "\n",
+ "\n",
+ "\n",
+ "\n"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "import shutil\n",
+ "\n",
+ "from clgraph import visualize_pipeline_lineage\n",
+ "\n",
+ "# Create pipeline for visualization\n",
+ "sql_merge = \"\"\"\n",
+ "MERGE INTO target t\n",
+ "USING source s ON t.id = s.id\n",
+ "WHEN MATCHED THEN UPDATE SET t.value = s.new_value\n",
+ "WHEN NOT MATCHED THEN INSERT (id, value) VALUES (s.id, s.new_value)\n",
+ "\"\"\"\n",
+ "merge_pipeline = Pipeline([(\"merge_op\", sql_merge)], dialect=\"postgres\")\n",
+ "\n",
+ "if shutil.which(\"dot\") is None:\n",
+ " print(\"⚠️ Graphviz not installed. Install with: brew install graphviz\")\n",
+ "else:\n",
+ " print(\"MERGE Pipeline - Simplified Lineage:\")\n",
+ " display(visualize_pipeline_lineage(merge_pipeline.column_graph.to_simplified()))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a22e9db8",
+ "metadata": {},
+ "source": [
+ "## Condition Column Lineage (Gaps 3, 9, 10)\n",
+ "\n",
+ "MERGE statements have two kinds of upstream dependencies for assigned target columns:\n",
+ "\n",
+ "- **Value dependencies** — the RHS of `SET` (e.g., `t.end_time = current_timestamp()`)\n",
+ "- **Condition dependencies** — columns in `WHEN MATCHED AND (...)` or literal-bound `ON` predicates (e.g., `t.is_active = 'Y'`)\n",
+ "\n",
+ "clgraph now tracks both, distinguishing them via `merge_column_role` on each edge:\n",
+ "- `None` — ON clause match columns (equi-join pairs)\n",
+ "- `\"value\"` — SET assignment RHS columns\n",
+ "- `\"condition\"` — WHEN guard or ON literal filter columns"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "2a9b42c3",
+ "metadata": {},
+ "source": [
+ "### Example 5: SCD2 MERGE — Condition Columns as Dependencies\n",
+ "\n",
+ "In a Slowly Changing Dimension Type 2 pattern, the WHEN MATCHED condition determines\n",
+ "*which rows get closed*. The columns in that condition (`name`, `city`, `email`) are\n",
+ "gating dependencies of the assigned columns (`end_time`, `is_active`)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "0c55e121",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-14T17:00:42.193907Z",
+ "iopub.status.busy": "2026-04-14T17:00:42.193816Z",
+ "iopub.status.idle": "2026-04-14T17:00:42.202124Z",
+ "shell.execute_reply": "2026-04-14T17:00:42.201754Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "SCD2 MERGE — All lineage edges with roles:\n",
+ "\n",
+ " dim_customer.email -> end_time [condition]\n",
+ " staging_customer_latest.email -> end_time [condition]\n",
+ " dim_customer.name -> end_time [condition]\n",
+ " staging_customer_latest.name -> end_time [condition]\n",
+ " dim_customer.city -> end_time [condition]\n",
+ " staging_customer_latest.city -> end_time [condition]\n",
+ " staging_customer_latest.id -> id [match]\n",
+ " dim_customer.is_active -> is_active [condition]\n",
+ " dim_customer.email -> is_active [condition]\n",
+ " staging_customer_latest.email -> is_active [condition]\n",
+ " dim_customer.name -> is_active [condition]\n",
+ " staging_customer_latest.name -> is_active [condition]\n",
+ " dim_customer.city -> is_active [condition]\n",
+ " staging_customer_latest.city -> is_active [condition]\n"
+ ]
+ }
+ ],
+ "source": [
+ "scd2_sql = \"\"\"\n",
+ "MERGE INTO dim_customer t\n",
+ "USING staging_customer_latest s ON t.id = s.id AND t.is_active = 'Y'\n",
+ "WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city OR t.email <> s.email) THEN\n",
+ " UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N'\n",
+ "\"\"\"\n",
+ "\n",
+ "builder = RecursiveLineageBuilder(scd2_sql, dialect=\"databricks\")\n",
+ "graph = builder.build()\n",
+ "\n",
+ "print(\"SCD2 MERGE — All lineage edges with roles:\\n\")\n",
+ "for edge in sorted(graph.edges, key=lambda e: (e.to_node.column_name, e.merge_column_role or \"\")):\n",
+ " if edge.is_merge_operation:\n",
+ " role = edge.merge_column_role or \"match\"\n",
+ " print(f\" {edge.from_node.full_name:>35s} -> {edge.to_node.column_name:<12s} [{role}]\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c2a11f10",
"metadata": {},
- "execution_count": null,
- "outputs": []
+ "source": [
+ "### Example 6: Impact Analysis — \"What breaks if `staging.name` changes?\"\n",
+ "\n",
+ "Condition edges make impact analysis complete. Without them, `end_time` would appear\n",
+ "to have *no* upstream column dependency (since `current_timestamp()` has none).\n",
+ "With condition edges, we can see that changes to `name`, `city`, or `email` affect\n",
+ "which rows get their `end_time` updated."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "c4fdba7f",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-14T17:00:42.203108Z",
+ "iopub.status.busy": "2026-04-14T17:00:42.203043Z",
+ "iopub.status.idle": "2026-04-14T17:00:42.205168Z",
+ "shell.execute_reply": "2026-04-14T17:00:42.204798Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Impact analysis: What does end_time depend on?\n",
+ "\n",
+ " dim_customer.email role=condition\n",
+ " staging_customer_latest.email role=condition\n",
+ " dim_customer.name role=condition\n",
+ " staging_customer_latest.name role=condition\n",
+ " dim_customer.city role=condition\n",
+ " staging_customer_latest.city role=condition\n",
+ "\n",
+ "---\n",
+ "\n",
+ "Impact analysis: What does is_active depend on?\n",
+ "\n",
+ " dim_customer.is_active role=condition\n",
+ " dim_customer.email role=condition\n",
+ " staging_customer_latest.email role=condition\n",
+ " dim_customer.name role=condition\n",
+ " staging_customer_latest.name role=condition\n",
+ " dim_customer.city role=condition\n",
+ " staging_customer_latest.city role=condition\n"
+ ]
+ }
+ ],
+ "source": [
+ "print(\"Impact analysis: What does end_time depend on?\\n\")\n",
+ "for edge in graph.edges:\n",
+ " if edge.is_merge_operation and edge.to_node.column_name == \"end_time\":\n",
+ " role = edge.merge_column_role or \"match\"\n",
+ " print(f\" {edge.from_node.full_name:<40s} role={role}\")\n",
+ "\n",
+ "print(\"\\n---\")\n",
+ "print(\"\\nImpact analysis: What does is_active depend on?\\n\")\n",
+ "for edge in graph.edges:\n",
+ " if edge.is_merge_operation and edge.to_node.column_name == \"is_active\":\n",
+ " role = edge.merge_column_role or \"match\"\n",
+ " print(f\" {edge.from_node.full_name:<40s} role={role}\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "58cc7953",
+ "metadata": {},
+ "source": [
+ "### Example 7: ON Clause Literal Filter (Gap 9)\n",
+ "\n",
+ "The `ON t.is_active = 'Y'` literal predicate is now tracked as a `merge_match_filter`\n",
+ "edge, distinct from the column-column `merge_match` edge for `t.id = s.id`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "0341d366",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-14T17:00:42.206274Z",
+ "iopub.status.busy": "2026-04-14T17:00:42.206205Z",
+ "iopub.status.idle": "2026-04-14T17:00:42.207959Z",
+ "shell.execute_reply": "2026-04-14T17:00:42.207680Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "ON clause edges:\n",
+ "\n",
+ " staging_customer_latest.id type=merge_match role=None\n",
+ " dim_customer.is_active type=merge_match_filter role=condition\n"
+ ]
+ }
+ ],
+ "source": [
+ "print(\"ON clause edges:\\n\")\n",
+ "for edge in graph.edges:\n",
+ " if edge.is_merge_operation and edge.edge_type in (\"merge_match\", \"merge_match_filter\"):\n",
+ " print(\n",
+ " f\" {edge.from_node.full_name:<35s} type={edge.edge_type:<20s} role={edge.merge_column_role}\"\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "38c12f1b",
+ "metadata": {},
+ "source": [
+ "### Example 8: JSON Export with `merge_column_role`\n",
+ "\n",
+ "The `merge_column_role` field appears in JSON exports, allowing downstream tools\n",
+ "to filter or weight condition vs value dependencies differently."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "f888f685",
+ "metadata": {
+ "execution": {
+ "iopub.execute_input": "2026-04-14T17:00:42.208875Z",
+ "iopub.status.busy": "2026-04-14T17:00:42.208817Z",
+ "iopub.status.idle": "2026-04-14T17:00:42.212202Z",
+ "shell.execute_reply": "2026-04-14T17:00:42.211921Z"
+ }
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Condition edges in JSON export:\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.is_active\",\n",
+ " \"to_column\": \"scd2_close:subq:main.is_active\",\n",
+ " \"edge_type\": \"merge_match_filter\",\n",
+ " \"transformation\": \"merge_match_filter\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"match\",\n",
+ " \"merge_condition\": null,\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.email\",\n",
+ " \"to_column\": \"scd2_close:subq:main.end_time\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.email\",\n",
+ " \"to_column\": \"scd2_close:subq:main.end_time\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.name\",\n",
+ " \"to_column\": \"scd2_close:subq:main.end_time\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.name\",\n",
+ " \"to_column\": \"scd2_close:subq:main.end_time\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.city\",\n",
+ " \"to_column\": \"scd2_close:subq:main.end_time\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.city\",\n",
+ " \"to_column\": \"scd2_close:subq:main.end_time\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.email\",\n",
+ " \"to_column\": \"scd2_close:subq:main.is_active\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.email\",\n",
+ " \"to_column\": \"scd2_close:subq:main.is_active\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.name\",\n",
+ " \"to_column\": \"scd2_close:subq:main.is_active\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.name\",\n",
+ " \"to_column\": \"scd2_close:subq:main.is_active\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.city\",\n",
+ " \"to_column\": \"scd2_close:subq:main.is_active\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n",
+ "{\n",
+ " \"from_column\": \"staging_customer_latest.city\",\n",
+ " \"to_column\": \"scd2_close:subq:main.is_active\",\n",
+ " \"edge_type\": \"merge_update\",\n",
+ " \"transformation\": \"merge_update\",\n",
+ " \"query_id\": \"scd2_close\",\n",
+ " \"is_merge_operation\": true,\n",
+ " \"merge_action\": \"update\",\n",
+ " \"merge_condition\": \"(t.name <> s.name OR t.city <> s.city OR t.email <> s.email)\",\n",
+ " \"merge_column_role\": \"condition\"\n",
+ "}\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "import json\n",
+ "\n",
+ "scd2_pipeline = Pipeline([(\"scd2_close\", scd2_sql)], dialect=\"databricks\")\n",
+ "export_data = JSONExporter().export(scd2_pipeline)\n",
+ "\n",
+ "print(\"Condition edges in JSON export:\\n\")\n",
+ "for edge in export_data.get(\"edges\", []):\n",
+ " if edge.get(\"merge_column_role\") == \"condition\":\n",
+ " print(json.dumps(edge, indent=2))\n",
+ " print()"
+ ]
}
],
"metadata": {
@@ -312,9 +796,9 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
- "version": "3.13.0"
+ "version": "3.13.1"
}
},
"nbformat": 4,
"nbformat_minor": 5
-}
\ No newline at end of file
+}
diff --git a/src/clgraph/column_extractor.py b/src/clgraph/column_extractor.py
index 85059bc..7a04d4f 100644
--- a/src/clgraph/column_extractor.py
+++ b/src/clgraph/column_extractor.py
@@ -284,10 +284,34 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict]
output_cols.append(col_info)
idx += 1
+ # 1b. Literal-bound match filter columns (edges for ON clause literal predicates)
+ match_filter_columns = config.get("match_filter_columns", [])
+ for col_name, literal_val in match_filter_columns:
+ col_info = {
+ "index": idx,
+ "name": col_name,
+ "is_star": False,
+ "type": "merge_match_filter",
+ "expression": f"{target_alias}.{col_name} = {literal_val}",
+ "ast_node": None,
+ "source_columns": [(target_alias, col_name)],
+ "merge_action": "match",
+ "merge_column_role": "condition",
+ }
+ output_cols.append(col_info)
+ idx += 1
+
# 2. WHEN MATCHED -> UPDATE columns
for action in matched_actions:
if action.get("action_type") == "update":
condition = action.get("condition")
+ # Note: target_alias is used as default_table, but WHEN conditions
+ # typically use qualified refs (t.name, s.name). extract_columns_from_expr
+ # uses the qualified table ref when present, so the default_table only
+ # applies to unqualified column names.
+ condition_columns = (
+ extract_columns_from_expr(condition, target_alias) if condition else []
+ )
for target_col, source_expr in action.get("column_mappings", {}).items():
col_info = {
"index": idx,
@@ -299,6 +323,7 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict]
"source_columns": extract_columns_from_expr(source_expr, source_alias),
"merge_action": "update",
"merge_condition": condition,
+ "condition_columns": condition_columns,
}
output_cols.append(col_info)
idx += 1
@@ -307,6 +332,11 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict]
for action in not_matched_actions:
if action.get("action_type") == "insert":
condition = action.get("condition")
+ # Note: source_alias is used as default_table (not target_alias) because
+ # NOT MATCHED conditions reference source rows (target row doesn't exist).
+ condition_columns = (
+ extract_columns_from_expr(condition, source_alias) if condition else []
+ )
for target_col, source_expr in action.get("column_mappings", {}).items():
col_info = {
"index": idx,
@@ -318,6 +348,7 @@ def extract_merge_columns(ctx: ExtractionContext, unit: QueryUnit) -> List[Dict]
"source_columns": extract_columns_from_expr(source_expr, source_alias),
"merge_action": "insert",
"merge_condition": condition,
+ "condition_columns": condition_columns,
}
output_cols.append(col_info)
idx += 1
diff --git a/src/clgraph/export.py b/src/clgraph/export.py
index 1c64437..7e80e48 100644
--- a/src/clgraph/export.py
+++ b/src/clgraph/export.py
@@ -161,6 +161,7 @@ def export(
edge_dict["is_merge_operation"] = True
edge_dict["merge_action"] = getattr(edge, "merge_action", None)
edge_dict["merge_condition"] = getattr(edge, "merge_condition", None)
+ edge_dict["merge_column_role"] = getattr(edge, "merge_column_role", None)
# Include QUALIFY clause metadata if present
if getattr(edge, "is_qualify_column", False):
diff --git a/src/clgraph/lineage_builder.py b/src/clgraph/lineage_builder.py
index 1e190e2..573f47a 100644
--- a/src/clgraph/lineage_builder.py
+++ b/src/clgraph/lineage_builder.py
@@ -849,7 +849,12 @@ def _trace_column_dependencies(self, unit: QueryUnit, output_node: ColumnNode, c
return
# Branch 4: MERGE
- if col_info.get("type") in ("merge_match", "merge_update", "merge_insert"):
+ if col_info.get("type") in (
+ "merge_match",
+ "merge_update",
+ "merge_insert",
+ "merge_match_filter",
+ ):
trace_merge_columns(
self.lineage_graph,
unit,
diff --git a/src/clgraph/models.py b/src/clgraph/models.py
index f096de8..07631f4 100644
--- a/src/clgraph/models.py
+++ b/src/clgraph/models.py
@@ -613,6 +613,9 @@ class ColumnEdge:
is_merge_operation: bool = False # True if this edge is from a MERGE statement
merge_action: Optional[str] = None # "match", "update", "insert", "delete"
merge_condition: Optional[str] = None # Condition for conditional WHEN clauses
+ merge_column_role: Optional[str] = (
+ None # None (match), "value" (SET RHS), or "condition" (WHEN guard / ON filter)
+ )
# ─── QUALIFY Clause Metadata ───
is_qualify_column: bool = False # True if this column is used in QUALIFY clause
diff --git a/src/clgraph/pipeline_lineage_builder.py b/src/clgraph/pipeline_lineage_builder.py
index 55191af..64e4dd8 100644
--- a/src/clgraph/pipeline_lineage_builder.py
+++ b/src/clgraph/pipeline_lineage_builder.py
@@ -450,6 +450,7 @@ def _add_query_edges(
is_merge_operation=getattr(edge, "is_merge_operation", False),
merge_action=getattr(edge, "merge_action", None),
merge_condition=getattr(edge, "merge_condition", None),
+ merge_column_role=getattr(edge, "merge_column_role", None),
# Preserve QUALIFY clause metadata
is_qualify_column=getattr(edge, "is_qualify_column", False),
qualify_context=getattr(edge, "qualify_context", None),
diff --git a/src/clgraph/query_parser.py b/src/clgraph/query_parser.py
index 9c23860..dbc67cc 100644
--- a/src/clgraph/query_parser.py
+++ b/src/clgraph/query_parser.py
@@ -628,12 +628,17 @@ def _parse_merge_statement(
# Extract match columns from ON condition
match_columns: List[Tuple[str, str]] = []
+ match_filter_columns: List[Tuple[str, str]] = []
if match_condition:
for eq in match_condition.find_all(exp.EQ):
left_col = eq.left
right_col = eq.right
if isinstance(left_col, exp.Column) and isinstance(right_col, exp.Column):
match_columns.append((left_col.name, right_col.name))
+ elif isinstance(left_col, exp.Column) and not isinstance(right_col, exp.Column):
+ match_filter_columns.append((left_col.name, right_col.sql()))
+ elif isinstance(right_col, exp.Column) and not isinstance(left_col, exp.Column):
+ match_filter_columns.append((right_col.name, left_col.sql()))
# Parse WHEN clauses from the 'whens' arg
whens = merge_node.args.get("whens")
@@ -698,6 +703,7 @@ def _parse_merge_statement(
"source_alias": source_alias,
"match_condition": match_condition_sql,
"match_columns": match_columns,
+ "match_filter_columns": match_filter_columns,
"matched_actions": matched_actions,
"not_matched_actions": not_matched_actions,
}
diff --git a/src/clgraph/trace_strategies.py b/src/clgraph/trace_strategies.py
index d9fc65a..18867bd 100644
--- a/src/clgraph/trace_strategies.py
+++ b/src/clgraph/trace_strategies.py
@@ -196,25 +196,32 @@ def trace_merge_columns(
merge_action = col_info.get("merge_action", col_info.get("type"))
merge_condition = col_info.get("merge_condition")
source_refs = col_info.get("source_columns", [])
+ condition_refs = col_info.get("condition_columns", [])
- for source_ref in source_refs:
- table_ref, col_name = source_ref[:2]
-
- # Try to resolve as a source unit or base table
+ def _resolve_to_node(table_ref, col_name):
+ """Resolve a (table, column) ref to a ColumnNode."""
source_node = None
source_unit = resolve_source_unit(unit, table_ref) if table_ref else None
if source_unit:
source_node = find_column_in_unit(source_unit, col_name)
if not source_node:
- # Try as base table
base_table = resolve_base_table_name(unit, table_ref) if table_ref else None
if base_table:
source_node = find_or_create_table_column_node(graph, base_table, col_name)
elif table_ref:
- # Fallback: use table_ref directly
source_node = find_or_create_table_column_node(graph, table_ref, col_name)
+ return source_node
+ # Value-assignment edges (RHS of SET) or match/filter edges
+ for source_ref in source_refs:
+ table_ref, col_name = source_ref[:2]
+ source_node = _resolve_to_node(table_ref, col_name)
if source_node:
+ # Determine role: match edges get None, update/insert get "value",
+ # merge_match_filter edges keep their explicit "condition" role
+ role = col_info.get("merge_column_role")
+ if role is None and col_info["type"] in ("merge_update", "merge_insert"):
+ role = "value"
edge = ColumnEdge(
from_node=source_node,
to_node=output_node,
@@ -225,6 +232,26 @@ def trace_merge_columns(
is_merge_operation=True,
merge_action=merge_action,
merge_condition=merge_condition,
+ merge_column_role=role,
+ )
+ graph.add_edge(edge)
+
+ # Condition-gating edges (from WHEN AND clause)
+ for cond_ref in condition_refs:
+ table_ref, col_name = cond_ref[:2]
+ source_node = _resolve_to_node(table_ref, col_name)
+ if source_node:
+ edge = ColumnEdge(
+ from_node=source_node,
+ to_node=output_node,
+ edge_type=col_info["type"],
+ transformation=col_info["type"],
+ context=unit.unit_type.value,
+ expression=merge_condition,
+ is_merge_operation=True,
+ merge_action=merge_action,
+ merge_condition=merge_condition,
+ merge_column_role="condition",
)
graph.add_edge(edge)
diff --git a/tests/test_merge_statements.py b/tests/test_merge_statements.py
index 197c689..e111f4d 100644
--- a/tests/test_merge_statements.py
+++ b/tests/test_merge_statements.py
@@ -132,6 +132,75 @@ def test_multiple_match_columns(self):
assert "region" in col_names
+class TestMatchFilterColumns:
+ """Test literal-bound predicates in MERGE ON clause (Gap 9)."""
+
+ def test_literal_predicate_extracted(self):
+ """ON t.id = s.id AND t.is_active = 'Y' extracts is_active as filter column."""
+ sql = """
+ MERGE INTO dim_customer t
+ USING staging s ON t.id = s.id AND t.is_active = 'Y'
+ WHEN MATCHED THEN UPDATE SET t.name = s.name
+ """
+ parser = RecursiveQueryParser(sql, dialect="postgres")
+ graph = parser.parse()
+
+ merge_units = [u for u in graph.units.values() if u.unit_type == QueryUnitType.MERGE]
+ unit = merge_units[0]
+ config = unit.unpivot_config
+
+ # Column-column pair should still work
+ match_columns = config.get("match_columns", [])
+ col_names = [col[0] for col in match_columns]
+ assert "id" in col_names
+
+ # Literal-bound predicate should be in match_filter_columns
+ match_filter_columns = config.get("match_filter_columns", [])
+ assert len(match_filter_columns) >= 1
+ filter_col_names = [col[0] for col in match_filter_columns]
+ assert "is_active" in filter_col_names
+
+ def test_no_filter_columns_for_pure_equijoin(self):
+ """ON t.id = s.id with no literals has match_filter_columns key but empty list.
+
+ Note: We assert the KEY exists (not just .get() with default) so
+ the test fails before Task 2 adds match_filter_columns to the config.
+ """
+ sql = """
+ MERGE INTO target t
+ USING source s ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET t.value = s.new_value
+ """
+ parser = RecursiveQueryParser(sql, dialect="postgres")
+ graph = parser.parse()
+
+ merge_units = [u for u in graph.units.values() if u.unit_type == QueryUnitType.MERGE]
+ unit = merge_units[0]
+ config = unit.unpivot_config
+
+ assert "match_filter_columns" in config
+ assert config["match_filter_columns"] == []
+
+ def test_multiple_literal_predicates(self):
+ """Multiple literal predicates are all extracted."""
+ sql = """
+ MERGE INTO target t
+ USING source s ON t.id = s.id AND t.is_active = 'Y' AND t.region = 'US'
+ WHEN MATCHED THEN UPDATE SET t.value = s.new_value
+ """
+ parser = RecursiveQueryParser(sql, dialect="postgres")
+ graph = parser.parse()
+
+ merge_units = [u for u in graph.units.values() if u.unit_type == QueryUnitType.MERGE]
+ unit = merge_units[0]
+ config = unit.unpivot_config
+
+ match_filter_columns = config.get("match_filter_columns", [])
+ filter_col_names = [col[0] for col in match_filter_columns]
+ assert "is_active" in filter_col_names
+ assert "region" in filter_col_names
+
+
# ============================================================================
# Test Group 3: WHEN MATCHED Actions
# ============================================================================
@@ -347,5 +416,344 @@ def test_merge_with_delete(self):
assert "delete" in action_types or "update" in action_types
+# ============================================================================
+# Test Group 10: MERGE Condition Column Role (Gap 10)
+# ============================================================================
+
+
+class TestMergeColumnRole:
+ """Test that merge_column_role field exists on ColumnEdge."""
+
+ def test_merge_column_role_field_exists(self):
+ """Test that ColumnEdge has merge_column_role field."""
+ from clgraph.models import ColumnEdge, ColumnNode
+
+ source = ColumnNode(table_name="source", column_name="name", full_name="source.name")
+ target = ColumnNode(
+ table_name="target", column_name="end_time", full_name="target.end_time"
+ )
+ edge = ColumnEdge(
+ from_node=source,
+ to_node=target,
+ edge_type="merge_update",
+ transformation="merge_update",
+ context="merge",
+ is_merge_operation=True,
+ merge_action="update",
+ merge_column_role="condition",
+ )
+ assert edge.merge_column_role == "condition"
+
+ def test_merge_column_role_defaults_none(self):
+ """Test that merge_column_role defaults to None."""
+ from clgraph.models import ColumnEdge, ColumnNode
+
+ source = ColumnNode(table_name="source", column_name="val", full_name="source.val")
+ target = ColumnNode(table_name="target", column_name="val", full_name="target.val")
+ edge = ColumnEdge(
+ from_node=source,
+ to_node=target,
+ edge_type="merge_update",
+ transformation="merge_update",
+ context="merge",
+ is_merge_operation=True,
+ )
+ assert edge.merge_column_role is None
+
+
+class TestMergeFilterLineage:
+ """Test lineage edges for literal-bound ON predicates (Gap 9 lineage)."""
+
+ def test_literal_filter_column_produces_lineage_edge(self):
+ """ON t.is_active = 'Y' should produce a lineage edge for is_active."""
+ sql = """
+ MERGE INTO dim_customer t
+ USING staging s ON t.id = s.id AND t.is_active = 'Y'
+ WHEN MATCHED THEN UPDATE SET t.name = s.name
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+
+ # Should have an edge for is_active with merge_column_role="condition"
+ filter_edges = [
+ e
+ for e in merge_edges
+ if e.merge_column_role == "condition" and e.to_node.column_name == "is_active"
+ ]
+ assert len(filter_edges) >= 1
+
+ # The filter edge should be tagged as merge_match_filter
+ edge = filter_edges[0]
+ assert edge.edge_type == "merge_match_filter"
+ # source_columns uses target_alias "t", but resolve_base_table_name
+ # resolves "t" -> "dim_customer" via alias_mapping
+ assert edge.from_node.table_name == "dim_customer"
+ assert edge.from_node.column_name == "is_active"
+
+ def test_filter_edge_coexists_with_match_edge(self):
+ """Both column-column match edges and literal filter edges are produced."""
+ sql = """
+ MERGE INTO dim_customer t
+ USING staging s ON t.id = s.id AND t.is_active = 'Y'
+ WHEN MATCHED THEN UPDATE SET t.name = s.name
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+
+ match_edges = [e for e in merge_edges if e.edge_type == "merge_match"]
+ assert len(match_edges) >= 1
+
+ filter_edges = [e for e in merge_edges if e.edge_type == "merge_match_filter"]
+ assert len(filter_edges) >= 1
+
+
+# ============================================================================
+# Test Group 11: MERGE Condition Column Parsing (Gap 3)
+# ============================================================================
+
+
+class TestMergeConditionColumns:
+ """Test WHEN MATCHED condition columns extracted as refs (Gap 3)."""
+
+ def test_when_condition_columns_extracted(self):
+ """WHEN MATCHED AND (t.name <> s.name) extracts name as condition column."""
+ sql = """
+ MERGE INTO dim_customer t
+ USING staging s ON t.id = s.id
+ WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city) THEN
+ UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N'
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+
+ end_time_cond_edges = [
+ e
+ for e in merge_edges
+ if e.to_node.column_name == "end_time" and e.merge_column_role == "condition"
+ ]
+ cond_col_names = {e.from_node.column_name for e in end_time_cond_edges}
+ assert "name" in cond_col_names
+ assert "city" in cond_col_names
+
+ def test_is_active_has_same_condition_columns(self):
+ """Both assigned columns share the same condition columns."""
+ sql = """
+ MERGE INTO dim_customer t
+ USING staging s ON t.id = s.id
+ WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city) THEN
+ UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N'
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+
+ is_active_cond_edges = [
+ e
+ for e in merge_edges
+ if e.to_node.column_name == "is_active" and e.merge_column_role == "condition"
+ ]
+ cond_col_names = {e.from_node.column_name for e in is_active_cond_edges}
+ assert "name" in cond_col_names
+ assert "city" in cond_col_names
+
+ def test_condition_columns_include_both_sides(self):
+ """Condition t.name <> s.name includes columns from both target and source."""
+ sql = """
+ MERGE INTO dim_customer t
+ USING staging s ON t.id = s.id
+ WHEN MATCHED AND (t.name <> s.name) THEN
+ UPDATE SET t.end_time = current_timestamp()
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+ end_time_cond_edges = [
+ e
+ for e in merge_edges
+ if e.to_node.column_name == "end_time" and e.merge_column_role == "condition"
+ ]
+ assert len(end_time_cond_edges) >= 2
+ source_tables = {e.from_node.table_name for e in end_time_cond_edges}
+ assert "dim_customer" in source_tables, "Expected target-side condition column"
+ assert "staging" in source_tables, "Expected source-side condition column"
+
+ def test_no_condition_columns_for_unconditional_update(self):
+ """WHEN MATCHED without AND condition produces no condition edges."""
+ sql = """
+ MERGE INTO target t
+ USING source s ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET t.value = s.new_value
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+ assert len(merge_edges) > 0, "Expected merge edges to exist"
+ cond_edges = [e for e in merge_edges if e.merge_column_role == "condition"]
+ assert cond_edges == []
+
+
+class TestMergeValueRole:
+ """Test that value-assignment edges are tagged with merge_column_role='value'."""
+
+ def test_update_value_edges_tagged(self):
+ """UPDATE SET t.value = s.new_value produces edge with role='value'."""
+ sql = """
+ MERGE INTO target t
+ USING source s ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET t.value = s.new_value
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ update_edges = [
+ e for e in graph.edges if e.is_merge_operation and e.merge_action == "update"
+ ]
+ assert len(update_edges) >= 1
+ for edge in update_edges:
+ assert edge.merge_column_role == "value"
+
+ def test_match_edges_have_no_role(self):
+ """ON clause match edges should have merge_column_role=None."""
+ sql = """
+ MERGE INTO target t
+ USING source s ON t.id = s.id
+ WHEN MATCHED THEN UPDATE SET t.value = s.new_value
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ match_edges = [
+ e for e in graph.edges if e.is_merge_operation and e.edge_type == "merge_match"
+ ]
+ for edge in match_edges:
+ assert edge.merge_column_role is None
+
+
+class TestMergeInsertConditionColumns:
+ """Test WHEN NOT MATCHED condition columns for INSERT actions."""
+
+ def test_not_matched_condition_columns_extracted(self):
+ """WHEN NOT MATCHED AND s.op = 'c' should produce condition edges."""
+ sql = """
+ MERGE INTO target t
+ USING source s ON t.id = s.id
+ WHEN NOT MATCHED AND s.op = 'c' THEN
+ INSERT (id, value) VALUES (s.id, s.value)
+ """
+ builder = RecursiveLineageBuilder(sql, dialect="postgres")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+ id_cond_edges = [
+ e
+ for e in merge_edges
+ if e.to_node.column_name == "id" and e.merge_column_role == "condition"
+ ]
+ cond_col_names = {e.from_node.column_name for e in id_cond_edges}
+ assert "op" in cond_col_names
+
+
+# ============================================================================
+# Test Group 12: SCD2 End-to-End Integration (Gaps 3, 9, 10)
+# ============================================================================
+
+
+class TestSCD2MergeConditionLineage:
+ """End-to-end test: SCD2 MERGE with all three gaps fixed."""
+
+ SCD2_SQL = """
+ MERGE INTO dim_customer t
+ USING staging_customer_latest s ON t.id = s.id AND t.is_active = 'Y'
+ WHEN MATCHED AND (t.name <> s.name OR t.city <> s.city OR t.email <> s.email) THEN
+ UPDATE SET t.end_time = current_timestamp(), t.is_active = 'N'
+ """
+
+ def test_gap9_on_literal_filter_in_lineage(self):
+ """Gap 9: is_active from ON clause appears in lineage."""
+ builder = RecursiveLineageBuilder(self.SCD2_SQL, dialect="databricks")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+ filter_edges = [
+ e
+ for e in merge_edges
+ if e.edge_type == "merge_match_filter" and e.from_node.column_name == "is_active"
+ ]
+ assert len(filter_edges) >= 1
+ assert filter_edges[0].merge_column_role == "condition"
+
+ def test_gap3_when_condition_columns_in_lineage(self):
+ """Gap 3: name, city, email from WHEN condition are upstream of end_time."""
+ builder = RecursiveLineageBuilder(self.SCD2_SQL, dialect="databricks")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+ end_time_cond_edges = [
+ e
+ for e in merge_edges
+ if e.to_node.column_name == "end_time" and e.merge_column_role == "condition"
+ ]
+ cond_col_names = {e.from_node.column_name for e in end_time_cond_edges}
+ assert "name" in cond_col_names
+ assert "city" in cond_col_names
+ assert "email" in cond_col_names
+
+ def test_gap10_condition_only_on_literal_assigned_column(self):
+ """Gap 10: is_active = 'N' (literal RHS) has condition edges but no value edges."""
+ builder = RecursiveLineageBuilder(self.SCD2_SQL, dialect="databricks")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+
+ is_active_edges = [e for e in merge_edges if e.to_node.column_name == "is_active"]
+ roles = {e.merge_column_role for e in is_active_edges}
+ assert "condition" in roles
+ assert "value" not in roles
+
+ def test_gap10_impact_analysis_name_reaches_end_time(self):
+ """Gap 10: impact analysis from staging.name should reach end_time."""
+ builder = RecursiveLineageBuilder(self.SCD2_SQL, dialect="databricks")
+ graph = builder.build()
+
+ merge_edges = [e for e in graph.edges if e.is_merge_operation]
+
+ end_time_upstream = [
+ e.from_node.column_name for e in merge_edges if e.to_node.column_name == "end_time"
+ ]
+ assert "name" in end_time_upstream
+ assert "city" in end_time_upstream
+ assert "email" in end_time_upstream
+
+
+class TestMergeRoleExport:
+ """Test that merge_column_role appears in JSON export."""
+
+ def test_merge_column_role_in_export(self):
+ """merge_column_role should appear in exported edge data."""
+ sql = """
+ MERGE INTO dim_customer t
+ USING staging s ON t.id = s.id
+ WHEN MATCHED AND (t.name <> s.name) THEN
+ UPDATE SET t.end_time = current_timestamp()
+ """
+ pipeline = Pipeline([("scd2_close", sql)], dialect="postgres")
+
+ exporter = JSONExporter()
+ data = exporter.export(pipeline)
+
+ edges = data.get("edges", [])
+ condition_edges = [e for e in edges if e.get("merge_column_role") == "condition"]
+ assert len(condition_edges) >= 1
+
+
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])