From 0a0cbdc4ab8250c5ec83254b242fb0d0d1464f9a Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Fri, 5 Jun 2026 06:28:01 -0700 Subject: [PATCH 1/5] deltalake working for views --- .../blueprints/deltalake/api.py | 65 ++++-- .../workflows/deltalake_export.py | 210 +++++++++++++++--- tests/test_deltalake_export.py | 117 ++++++++++ 3 files changed, 343 insertions(+), 49 deletions(-) diff --git a/materializationengine/blueprints/deltalake/api.py b/materializationengine/blueprints/deltalake/api.py index 35fdc897..4d957efb 100644 --- a/materializationengine/blueprints/deltalake/api.py +++ b/materializationengine/blueprints/deltalake/api.py @@ -139,11 +139,14 @@ def discover_specs(): _DEFAULT_DROP_COLUMNS, TableSource, _build_frozen_db_connection_string, + _classify_relation, _get_redis_client, _resolve_select_columns, _validate_identifier, discover_default_output_specs, + discover_view_output_specs, estimate_bytes_per_row, + estimate_view_rows, resolve_n_partitions, ) @@ -207,31 +210,47 @@ def discover_specs(): {"error": f"Cannot connect to frozen DB for version {version}: {e}"} ), 404 - # Look up row count. - with db_manager.session_scope(analysis_database) as session: - metadata_row = ( - session.query(MaterializedMetadata) - .filter(MaterializedMetadata.table_name == table_name) - .first() + # Classify as table vs view. Views are materialized views cloned into the + # frozen DB; they are not tracked in MaterializedMetadata and have no + # segmentation join, so they need view-specific row-count and spec discovery. + relation_kind = _classify_relation(connection_string, table_name) + if relation_kind is None: + return jsonify( + {"error": f"Table {table_name!r} not found in version {version}"} + ), 404 + is_view = relation_kind == "view" + + if is_view: + row_count = estimate_view_rows(connection_string, table_name) + source = TableSource(annotation_table=table_name) + resolved_specs = discover_view_output_specs(source, connection_string) + else: + # Look up row count. + with db_manager.session_scope(analysis_database) as session: + metadata_row = ( + session.query(MaterializedMetadata) + .filter(MaterializedMetadata.table_name == table_name) + .first() + ) + if metadata_row is None: + return jsonify( + {"error": f"Table {table_name!r} not found in version {version}"} + ), 404 + row_count = metadata_row.row_count + + # Detect segmentation table. + seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) + has_seg_table = engine.dialect.has_table(engine, seg_table_name) + segmentation_table_name = seg_table_name if has_seg_table else None + + source = TableSource( + annotation_table=table_name, + segmentation_table=segmentation_table_name, ) - if metadata_row is None: - return jsonify( - {"error": f"Table {table_name!r} not found in version {version}"} - ), 404 - row_count = metadata_row.row_count - - # Detect segmentation table. - seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) - has_seg_table = engine.dialect.has_table(engine, seg_table_name) - segmentation_table_name = seg_table_name if has_seg_table else None - - source = TableSource( - annotation_table=table_name, - segmentation_table=segmentation_table_name, - ) - # Discover specs. - resolved_specs = discover_default_output_specs(source, engine) + # Discover specs. + resolved_specs = discover_default_output_specs(source, engine) + bytes_per_row = estimate_bytes_per_row(connection_string, source) # Track which specs had "auto" before resolution (for caching). diff --git a/materializationengine/workflows/deltalake_export.py b/materializationengine/workflows/deltalake_export.py index 96ce8a3a..a9bb5c37 100644 --- a/materializationengine/workflows/deltalake_export.py +++ b/materializationengine/workflows/deltalake_export.py @@ -86,6 +86,20 @@ def __post_init__(self): raise ValueError("DeltaLakeOutputSpec.name must be a non-empty string") +# Spatial column prefix priority (lower index = higher priority) used when a +# table/view exposes more than one geometry column and we must pick a single +# one to partition on via its Morton code. +_SPATIAL_PREFIX_PRIORITY = ["ctr", "post", "pre", "pt", "bb"] + + +def _spatial_col_rank(col_name: str) -> int: + """Rank a geometry column by ``_SPATIAL_PREFIX_PRIORITY`` (lower = preferred).""" + for i, prefix in enumerate(_SPATIAL_PREFIX_PRIORITY): + if col_name.startswith(prefix): + return i + return len(_SPATIAL_PREFIX_PRIORITY) + + _IDENTIFIER_RE = re.compile(r"^[a-zA-Z_][a-zA-Z0-9_$]*$") @@ -147,6 +161,77 @@ def _resolve_select_columns( return columns +def _classify_relation(connection_string: str, name: str) -> str | None: + """Classify a frozen-DB relation as ``"table"``, ``"view"``, or ``None``. + + Inspects ``pg_class.relkind``: ordinary/partitioned tables (``r``/``p``) + are ``"table"``; ordinary and materialized views (``v``/``m``) are + ``"view"``. Returns ``None`` when no relation with *name* exists. + + Analysis views are cloned into the frozen DB as materialized views (via + ``CREATE DATABASE ... WITH TEMPLATE``) and so are not registered in + ``MaterializedMetadata`` — this lets callers route them to the + view-specific row-count and spec-discovery paths. + """ + _validate_identifier(name) + row = _adbc_fetchone( + connection_string, + f"SELECT relkind FROM pg_class WHERE relname = '{name}'", + ) + if row is None: + return None + relkind = row[0] + if relkind in ("r", "p"): + return "table" + if relkind in ("v", "m"): + return "view" + return None + + +def estimate_view_rows(connection_string: str, name: str) -> int: + """Estimate the row count of a view from ``pg_class.reltuples``. + + Views are not tracked in ``MaterializedMetadata``, so their row count must + come from elsewhere. ``reltuples`` is a fast catalog lookup that is exact + for a materialized view immediately after ``REFRESH``/``ANALYZE``. Because + the cloned frozen DB can carry stale or zero stats, fall back to an exact + ``COUNT(*)`` only when ``reltuples <= 0`` — otherwise partition sizing would + silently collapse to a single file. + + The row count only drives partition-count heuristics, progress reporting, + and the existing-export guard's message; it never gates the data written + (the export streams ``SELECT *`` in full), so the estimate is safe. + """ + _validate_identifier(name) + row = _adbc_fetchone( + connection_string, + f"SELECT reltuples FROM pg_class WHERE relname = '{name}'", + ) + if row is not None and row[0] is not None and row[0] > 0: + return int(row[0]) + + count_row = _adbc_fetchone(connection_string, f'SELECT COUNT(*) FROM "{name}"') + return int(count_row[0]) if count_row is not None else 0 + + +def _get_geometry_columns(connection_string: str, name: str) -> list[str]: + """Return PostGIS geometry column names on *name*, in ordinal order. + + Detects columns via ``information_schema.columns.udt_name = 'geometry'``, + which works identically for tables, views, and materialized views. This is + necessary because we alias the SQLAlchemy ``geometry`` type to ``BYTEA`` for + reflection, so the inspector cannot distinguish geometry from plain bytea. + """ + _validate_identifier(name) + rows = _adbc_fetchall( + connection_string, + f"SELECT column_name FROM information_schema.columns " + f"WHERE table_name = '{name}' AND udt_name = 'geometry' " + f"ORDER BY ordinal_position", + ) + return [col_name for (col_name,) in rows] + + def discover_default_output_specs( source: TableSource, engine: Engine, @@ -212,15 +297,6 @@ def discover_default_output_specs( ) ) - # Spatial column prefix priority (lower index = higher priority). - _spatial_prefix_priority = ["ctr", "post", "pre", "pt", "bb"] - - def _spatial_col_rank(col_name: str) -> int: - for i, prefix in enumerate(_spatial_prefix_priority): - if col_name.startswith(prefix): - return i - return len(_spatial_prefix_priority) - # Collect spatial (GiST) candidates; pick at most one after the loop. spatial_candidates: list[tuple[str, str]] = [] # (column_name, owning_table) @@ -275,6 +351,67 @@ def _spatial_col_rank(col_name: str) -> int: return specs +def discover_view_output_specs( + source: TableSource, + connection_string: str, +) -> list[DeltaLakeOutputSpec]: + """Derive output specs for a view, inferring from columns instead of indexes. + + Materialized views generally lack the B-tree/GiST indexes and primary-key + constraint that :func:`discover_default_output_specs` relies on, so specs + are inferred from the view's columns to give views the same partition + quality tables get: + + * If the view has an ``id`` column → a ``percentile_range`` spec on ``id`` + (mirrors the PK spec emitted for tables). + * Among PostGIS geometry columns, at most **one** is selected by the prefix + priority ``ctr > post > pre > pt > bb`` and partitioned on its Morton code + (``uniform_range``), exactly as the table spatial branch does — so the + downstream geometry-decode / Morton pipeline is reused unchanged. + + If neither an ``id`` column nor any geometry column is present, a single + un-partitioned ``flat`` spec is returned so the export still produces output. + """ + view_name = source.annotation_table + columns = _resolve_select_columns(connection_string, source, []) + + specs: list[DeltaLakeOutputSpec] = [] + + if "id" in columns: + specs.append( + DeltaLakeOutputSpec( + name="id", + partition_by="id", + partition_strategy="percentile_range", + n_partitions="auto", + zorder_columns=["id"], + bloom_filter_columns=[], + source_table=view_name, + ) + ) + + geometry_columns = _get_geometry_columns(connection_string, view_name) + if geometry_columns: + col = min(geometry_columns, key=_spatial_col_rank) + specs.append( + DeltaLakeOutputSpec( + name=f"{col}_morton", + partition_by=f"{col}_morton", + partition_strategy="uniform_range", + n_partitions="auto", + zorder_columns=[f"{col}_morton"], + bloom_filter_columns=[], + source_geometry_column=col, + source_table=view_name, + ) + ) + + if not specs: + specs.append(DeltaLakeOutputSpec(name="flat")) + + return specs + + # Fallback estimate when pg_class stats are unavailable. _DEFAULT_BYTES_PER_ROW = 200 @@ -1504,27 +1641,41 @@ def _phase(phase: str, message: str, **status_kwargs) -> None: # --- Resolve table structure from frozen DB metadata --- engine = db_manager.get_engine(analysis_database) - # Determine if the table was merged and look up row count. - with db_manager.session_scope(analysis_database) as session: - metadata_row = ( - session.query(MaterializedMetadata) - .filter(MaterializedMetadata.table_name == table_name) - .first() + # Views are materialized views cloned into the frozen DB; they are not + # tracked in MaterializedMetadata and have no segmentation join. + relation_kind = _classify_relation(connection_string, table_name) + if relation_kind is None: + raise ValueError( + f"No table or view named {table_name!r} in {analysis_database}" ) - if metadata_row is None: - raise ValueError( - f"No MaterializedMetadata entry for table {table_name!r} " - f"in {analysis_database}" - ) - row_count = metadata_row.row_count + is_view = relation_kind == "view" - # Detect segmentation table presence. - seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) - has_seg_table = engine.dialect.has_table(engine, seg_table_name) - segmentation_table_name = seg_table_name if has_seg_table else None + if is_view: + row_count = estimate_view_rows(connection_string, table_name) + segmentation_table_name = None + else: + # Determine if the table was merged and look up row count. + with db_manager.session_scope(analysis_database) as session: + metadata_row = ( + session.query(MaterializedMetadata) + .filter(MaterializedMetadata.table_name == table_name) + .first() + ) + if metadata_row is None: + raise ValueError( + f"No MaterializedMetadata entry for table {table_name!r} " + f"in {analysis_database}" + ) + row_count = metadata_row.row_count + + # Detect segmentation table presence. + seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) + has_seg_table = engine.dialect.has_table(engine, seg_table_name) + segmentation_table_name = seg_table_name if has_seg_table else None celery_logger.info( - "Table %s (v%d): %d rows, segmentation=%s", + "%s %s (v%d): %d rows, segmentation=%s", + "View" if is_view else "Table", table_name, version, row_count, @@ -1544,6 +1695,13 @@ def _phase(phase: str, message: str, **status_kwargs) -> None: total_rows=row_count, ) resolved_specs = [DeltaLakeOutputSpec(**s) for s in output_specs] + elif is_view: + _phase( + "discovering_specs", + "Discovering output specs...", + total_rows=row_count, + ) + resolved_specs = discover_view_output_specs(source, connection_string) else: _phase( "discovering_specs", diff --git a/tests/test_deltalake_export.py b/tests/test_deltalake_export.py index 6f012556..7803ba7f 100644 --- a/tests/test_deltalake_export.py +++ b/tests/test_deltalake_export.py @@ -16,6 +16,7 @@ _DEFAULT_BYTES_PER_ROW, DeltaLakeOutputSpec, TableSource, + _classify_relation, _compute_sampled_percentile_bounds, _flush_buffer, _validate_identifier, @@ -26,7 +27,9 @@ compute_uniform_range_bounds, decode_geometry_columns, discover_default_output_specs, + discover_view_output_specs, estimate_bytes_per_row, + estimate_view_rows, export_table_to_deltalake, morton_encode_3d, optimize_deltalake, @@ -160,6 +163,120 @@ def test_no_indexes(self): assert specs == [] +# --------------------------------------------------------------------------- +# _classify_relation / estimate_view_rows / discover_view_output_specs +# --------------------------------------------------------------------------- + + +class TestClassifyRelation: + """_classify_relation maps pg_class.relkind to table/view/None.""" + + @pytest.mark.parametrize( + "relkind,expected", + [ + ("r", "table"), + ("p", "table"), + ("v", "view"), + ("m", "view"), + ], + ) + @patch("materializationengine.workflows.deltalake_export._adbc_fetchone") + def test_relkind_mapping(self, mock_fetchone, relkind, expected): + mock_fetchone.return_value = (relkind,) + assert _classify_relation("postgresql://localhost/db", "thing") == expected + + @patch("materializationengine.workflows.deltalake_export._adbc_fetchone") + def test_missing_relation_returns_none(self, mock_fetchone): + mock_fetchone.return_value = None + assert _classify_relation("postgresql://localhost/db", "nope") is None + + +class TestEstimateViewRows: + """estimate_view_rows uses reltuples, falling back to COUNT(*) when <= 0.""" + + @patch("materializationengine.workflows.deltalake_export._adbc_fetchone") + def test_uses_reltuples_when_positive(self, mock_fetchone): + mock_fetchone.return_value = (12345.0,) + result = estimate_view_rows("postgresql://localhost/db", "my_view") + assert result == 12345 + # Only the reltuples query should run — no COUNT(*) fallback. + assert mock_fetchone.call_count == 1 + + @patch("materializationengine.workflows.deltalake_export._adbc_fetchone") + def test_falls_back_to_count_when_zero(self, mock_fetchone): + # First call: reltuples = 0 (stale stats). Second call: COUNT(*). + mock_fetchone.side_effect = [(0.0,), (777,)] + result = estimate_view_rows("postgresql://localhost/db", "my_view") + assert result == 777 + assert mock_fetchone.call_count == 2 + + @patch("materializationengine.workflows.deltalake_export._adbc_fetchone") + def test_falls_back_when_no_stats_row(self, mock_fetchone): + mock_fetchone.side_effect = [None, (42,)] + result = estimate_view_rows("postgresql://localhost/db", "my_view") + assert result == 42 + + +class TestDiscoverViewOutputSpecs: + """discover_view_output_specs infers specs from columns, not indexes.""" + + def _patch(self, columns, geometry_columns): + return ( + patch( + "materializationengine.workflows.deltalake_export." + "_resolve_select_columns", + return_value=columns, + ), + patch( + "materializationengine.workflows.deltalake_export." + "_get_geometry_columns", + return_value=geometry_columns, + ), + ) + + def test_id_and_geometry(self): + source = TableSource(annotation_table="my_view") + cols_patch, geom_patch = self._patch( + ["id", "pt_position", "score"], ["pt_position"] + ) + with cols_patch, geom_patch: + specs = discover_view_output_specs(source, "postgresql://localhost/db") + + assert len(specs) == 2 + id_spec = next(s for s in specs if s.name == "id") + assert id_spec.partition_by == "id" + assert id_spec.partition_strategy == "percentile_range" + assert id_spec.source_table == "my_view" + + morton_spec = next(s for s in specs if s.name == "pt_position_morton") + assert morton_spec.partition_by == "pt_position_morton" + assert morton_spec.partition_strategy == "uniform_range" + assert morton_spec.source_geometry_column == "pt_position" + + def test_multiple_geometry_columns_use_prefix_priority(self): + source = TableSource(annotation_table="my_view") + # pt (rank 3) should win over bb (rank 4). + cols_patch, geom_patch = self._patch( + ["id", "bb_position", "pt_position"], ["bb_position", "pt_position"] + ) + with cols_patch, geom_patch: + specs = discover_view_output_specs(source, "postgresql://localhost/db") + + morton_specs = [s for s in specs if s.source_geometry_column is not None] + assert len(morton_specs) == 1 + assert morton_specs[0].source_geometry_column == "pt_position" + + def test_no_id_no_geometry_returns_flat(self): + source = TableSource(annotation_table="my_view") + cols_patch, geom_patch = self._patch(["score", "label"], []) + with cols_patch, geom_patch: + specs = discover_view_output_specs(source, "postgresql://localhost/db") + + assert len(specs) == 1 + assert specs[0].name == "flat" + assert specs[0].partition_by is None + + # --------------------------------------------------------------------------- # estimate_bytes_per_row # --------------------------------------------------------------------------- From 2873b36018c54d139d5e6d488357a8907417872b Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Fri, 5 Jun 2026 07:11:16 -0700 Subject: [PATCH 2/5] add more robust error handling --- .../blueprints/deltalake/api.py | 216 ++++++++++-------- 1 file changed, 121 insertions(+), 95 deletions(-) diff --git a/materializationengine/blueprints/deltalake/api.py b/materializationengine/blueprints/deltalake/api.py index 4d957efb..51679656 100644 --- a/materializationengine/blueprints/deltalake/api.py +++ b/materializationengine/blueprints/deltalake/api.py @@ -210,109 +210,135 @@ def discover_specs(): {"error": f"Cannot connect to frozen DB for version {version}: {e}"} ), 404 - # Classify as table vs view. Views are materialized views cloned into the - # frozen DB; they are not tracked in MaterializedMetadata and have no - # segmentation join, so they need view-specific row-count and spec discovery. - relation_kind = _classify_relation(connection_string, table_name) - if relation_kind is None: - return jsonify( - {"error": f"Table {table_name!r} not found in version {version}"} - ), 404 - is_view = relation_kind == "view" - - if is_view: - row_count = estimate_view_rows(connection_string, table_name) - source = TableSource(annotation_table=table_name) - resolved_specs = discover_view_output_specs(source, connection_string) - else: - # Look up row count. - with db_manager.session_scope(analysis_database) as session: - metadata_row = ( - session.query(MaterializedMetadata) - .filter(MaterializedMetadata.table_name == table_name) - .first() + # Wrap discovery in a JSON error handler so backend failures (a missing + # frozen DB, an unreachable instance, a bug in the view path, etc.) surface + # as a readable JSON error instead of an HTML 500 page that the wizard + # reports as the opaque "Unexpected token '<'... is not valid JSON". + try: + # Classify as table vs view. Views are materialized views cloned into the + # frozen DB; they are not tracked in MaterializedMetadata and have no + # segmentation join, so they need view-specific row-count and spec + # discovery. + relation_kind = _classify_relation(connection_string, table_name) + if relation_kind is None: + return jsonify( + {"error": f"Table {table_name!r} not found in version {version}"} + ), 404 + is_view = relation_kind == "view" + + if is_view: + row_count = estimate_view_rows(connection_string, table_name) + source = TableSource(annotation_table=table_name) + resolved_specs = discover_view_output_specs(source, connection_string) + else: + # Look up row count. + with db_manager.session_scope(analysis_database) as session: + metadata_row = ( + session.query(MaterializedMetadata) + .filter(MaterializedMetadata.table_name == table_name) + .first() + ) + if metadata_row is None: + return jsonify( + { + "error": f"Table {table_name!r} not found in version {version}" + } + ), 404 + row_count = metadata_row.row_count + + # Detect segmentation table. + seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) + has_seg_table = engine.dialect.has_table(engine, seg_table_name) + segmentation_table_name = seg_table_name if has_seg_table else None + + source = TableSource( + annotation_table=table_name, + segmentation_table=segmentation_table_name, ) - if metadata_row is None: - return jsonify( - {"error": f"Table {table_name!r} not found in version {version}"} - ), 404 - row_count = metadata_row.row_count - - # Detect segmentation table. - seg_table_name = build_segmentation_table_name(table_name, pcg_table_name) - has_seg_table = engine.dialect.has_table(engine, seg_table_name) - segmentation_table_name = seg_table_name if has_seg_table else None - - source = TableSource( - annotation_table=table_name, - segmentation_table=segmentation_table_name, - ) - # Discover specs. - resolved_specs = discover_default_output_specs(source, engine) + # Discover specs. + resolved_specs = discover_default_output_specs(source, engine) - bytes_per_row = estimate_bytes_per_row(connection_string, source) + bytes_per_row = estimate_bytes_per_row(connection_string, source) - # Track which specs had "auto" before resolution (for caching). - was_auto = [spec.n_partitions == "auto" for spec in resolved_specs] + # Track which specs had "auto" before resolution (for caching). + was_auto = [spec.n_partitions == "auto" for spec in resolved_specs] - # Resolve partition counts. - for spec in resolved_specs: - if spec.n_partitions == "auto": - effective_target = spec.target_file_size_mb or target_partition_size_mb - spec.n_partitions = resolve_n_partitions( - "auto", - row_count, - target_file_size_mb=effective_target, - bytes_per_row=bytes_per_row, - ) + # Resolve partition counts. + for spec in resolved_specs: + if spec.n_partitions == "auto": + effective_target = ( + spec.target_file_size_mb or target_partition_size_mb + ) + spec.n_partitions = resolve_n_partitions( + "auto", + row_count, + target_file_size_mb=effective_target, + bytes_per_row=bytes_per_row, + ) - from dataclasses import asdict + from dataclasses import asdict - # Build available columns list (base columns + computed columns from specs). - available_columns = _resolve_select_columns( - connection_string, source, _DEFAULT_DROP_COLUMNS - ) - for spec in resolved_specs: - if spec.source_geometry_column: - col = spec.source_geometry_column - for suffix in ["_x", "_y", "_z", "_morton"]: - computed = f"{col}{suffix}" - if computed not in available_columns: - available_columns.append(computed) - - # Collect geometry columns (position columns that get morton-encoded). - geometry_columns = sorted( - {s.source_geometry_column for s in resolved_specs if s.source_geometry_column} - ) + # Build available columns list (base columns + computed columns from specs). + available_columns = _resolve_select_columns( + connection_string, source, _DEFAULT_DROP_COLUMNS + ) + for spec in resolved_specs: + if spec.source_geometry_column: + col = spec.source_geometry_column + for suffix in ["_x", "_y", "_z", "_morton"]: + computed = f"{col}{suffix}" + if computed not in available_columns: + available_columns.append(computed) + + # Collect geometry columns (position columns that get morton-encoded). + geometry_columns = sorted( + { + s.source_geometry_column + for s in resolved_specs + if s.source_geometry_column + } + ) + + # Cache raw specs (before n_partitions resolution) so the cache stays + # valid regardless of the caller's target_partition_size_mb. + raw_specs = [asdict(s) for s in resolved_specs] + # Reset resolved n_partitions back to "auto" for specs that were auto. + for raw, auto in zip(raw_specs, was_auto): + if auto: + raw["n_partitions"] = "auto" + + cache_result = { + "row_count": row_count, + "bytes_per_row": bytes_per_row, + "available_columns": available_columns, + "geometry_columns": geometry_columns, + "specs": raw_specs, + } + redis_client.set(cache_key, json.dumps(cache_result), ex=600) + + # Return the result with resolved partition counts. + result = { + "row_count": row_count, + "bytes_per_row": bytes_per_row, + "available_columns": available_columns, + "geometry_columns": geometry_columns, + "specs": [asdict(s) for s in resolved_specs], + } - # Cache raw specs (before n_partitions resolution) so the cache stays - # valid regardless of the caller's target_partition_size_mb. - raw_specs = [asdict(s) for s in resolved_specs] - # Reset resolved n_partitions back to "auto" for specs that were auto. - for raw, auto in zip(raw_specs, was_auto): - if auto: - raw["n_partitions"] = "auto" - - cache_result = { - "row_count": row_count, - "bytes_per_row": bytes_per_row, - "available_columns": available_columns, - "geometry_columns": geometry_columns, - "specs": raw_specs, - } - redis_client.set(cache_key, json.dumps(cache_result), ex=600) - - # Return the result with resolved partition counts. - result = { - "row_count": row_count, - "bytes_per_row": bytes_per_row, - "available_columns": available_columns, - "geometry_columns": geometry_columns, - "specs": [asdict(s) for s in resolved_specs], - } - - return jsonify(result) + return jsonify(result) + except Exception as e: + current_app.logger.error( + "discover_specs failed for %s v%s table %r: %s", + datastack, + version, + table_name, + e, + exc_info=True, + ) + return jsonify( + {"error": f"Spec discovery failed for {table_name!r}: {e}"} + ), 500 @deltalake_bp.route("/api/check-exists", methods=["POST"]) From becf590d0bbe3b55bd67d21e30230a27de7710fc Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Fri, 5 Jun 2026 09:11:38 -0700 Subject: [PATCH 3/5] trying two stage process for views --- .../blueprints/deltalake/api.py | 7 +- .../workflows/deltalake_export.py | 516 +++++++++++++++--- tests/test_deltalake_export.py | 216 +++++++- 3 files changed, 637 insertions(+), 102 deletions(-) diff --git a/materializationengine/blueprints/deltalake/api.py b/materializationengine/blueprints/deltalake/api.py index 51679656..bb14d463 100644 --- a/materializationengine/blueprints/deltalake/api.py +++ b/materializationengine/blueprints/deltalake/api.py @@ -227,7 +227,7 @@ def discover_specs(): is_view = relation_kind == "view" if is_view: - row_count = estimate_view_rows(connection_string, table_name) + row_count = estimate_view_rows(engine, table_name) source = TableSource(annotation_table=table_name) resolved_specs = discover_view_output_specs(source, connection_string) else: @@ -308,8 +308,12 @@ def discover_specs(): if auto: raw["n_partitions"] = "auto" + # For views, row_count is a fast Postgres planner estimate (an exact + # count would execute the full view); it can be far off for aggregating + # views and is advisory only — the exact count is determined at export. cache_result = { "row_count": row_count, + "row_count_estimated": is_view, "bytes_per_row": bytes_per_row, "available_columns": available_columns, "geometry_columns": geometry_columns, @@ -320,6 +324,7 @@ def discover_specs(): # Return the result with resolved partition counts. result = { "row_count": row_count, + "row_count_estimated": is_view, "bytes_per_row": bytes_per_row, "available_columns": available_columns, "geometry_columns": geometry_columns, diff --git a/materializationengine/workflows/deltalake_export.py b/materializationengine/workflows/deltalake_export.py index a9bb5c37..cfe94c3d 100644 --- a/materializationengine/workflows/deltalake_export.py +++ b/materializationengine/workflows/deltalake_export.py @@ -15,7 +15,7 @@ import pyarrow as pa import shapely from celery.utils.log import get_task_logger -from sqlalchemy import inspect +from sqlalchemy import inspect, text from sqlalchemy.dialects.postgresql import BYTEA # Register a minimal geometry type so SQLAlchemy's inspector doesn't warn @@ -188,30 +188,48 @@ def _classify_relation(connection_string: str, name: str) -> str | None: return None -def estimate_view_rows(connection_string: str, name: str) -> int: - """Estimate the row count of a view from ``pg_class.reltuples``. +def estimate_view_rows(engine: Engine, name: str) -> int: + """Estimate the row count of a view without scanning it. Views are not tracked in ``MaterializedMetadata``, so their row count must - come from elsewhere. ``reltuples`` is a fast catalog lookup that is exact - for a materialized view immediately after ``REFRESH``/``ANALYZE``. Because - the cloned frozen DB can carry stale or zero stats, fall back to an exact - ``COUNT(*)`` only when ``reltuples <= 0`` — otherwise partition sizing would - silently collapse to a single file. + come from elsewhere. ``pg_class.reltuples`` is a fast catalog lookup that + is exact for a materialized view immediately after ``REFRESH``/``ANALYZE``, + so it is used when positive. Otherwise — a *regular* view (which has no + stored stats) or a materialized view with stale/never-analyzed stats + (``reltuples`` is ``-1``) — fall back to the Postgres **planner estimate** + from ``EXPLAIN``, which is computed from base-table statistics without + executing the query. + + A plain ``COUNT(*)`` is deliberately avoided: for a regular view that + aggregates over a huge base table it runs the full query and can take + minutes (or have its connection dropped by the gateway), which previously + surfaced in the wizard as an HTML timeout / "not valid JSON" error. The row count only drives partition-count heuristics, progress reporting, and the existing-export guard's message; it never gates the data written - (the export streams ``SELECT *`` in full), so the estimate is safe. + (the export streams ``SELECT *`` in full), so an estimate is safe. + + ``EXPLAIN`` is run through the SQLAlchemy *engine* rather than the ADBC + helpers because the ADBC postgres driver wraps statements in + ``COPY (...) TO STDOUT``, which rejects ``EXPLAIN``. """ _validate_identifier(name) - row = _adbc_fetchone( - connection_string, - f"SELECT reltuples FROM pg_class WHERE relname = '{name}'", - ) - if row is not None and row[0] is not None and row[0] > 0: - return int(row[0]) + with engine.connect() as conn: + row = conn.execute( + text("SELECT reltuples FROM pg_class WHERE relname = :name"), + {"name": name}, + ).fetchone() + if row is not None and row[0] is not None and row[0] > 0: + return int(row[0]) + + plan_row = conn.execute( + text(f'EXPLAIN (FORMAT JSON) SELECT * FROM "{name}"') + ).fetchone() - count_row = _adbc_fetchone(connection_string, f'SELECT COUNT(*) FROM "{name}"') - return int(count_row[0]) if count_row is not None else 0 + plan = plan_row[0] + if isinstance(plan, str): + plan = json.loads(plan) + return max(0, int(plan[0]["Plan"]["Plan Rows"])) def _get_geometry_columns(connection_string: str, name: str) -> list[str]: @@ -357,10 +375,14 @@ def discover_view_output_specs( ) -> list[DeltaLakeOutputSpec]: """Derive output specs for a view, inferring from columns instead of indexes. - Materialized views generally lack the B-tree/GiST indexes and primary-key - constraint that :func:`discover_default_output_specs` relies on, so specs - are inferred from the view's columns to give views the same partition - quality tables get: + A view export always produces a ``flat`` (un-partitioned) lake. It is both a + useful full export *and* the staging input for :func:`export_view_to_deltalake`'s + re-partitioning pass — the view query is executed once into the flat lake, and + any partitioned specs are then derived from that materialized data (the view + itself cannot be cheaply counted or sampled; see ``export_view_to_deltalake``). + + In addition to ``flat``, columns are inspected to give views the same + partition quality tables get: * If the view has an ``id`` column → a ``percentile_range`` spec on ``id`` (mirrors the PK spec emitted for tables). @@ -368,14 +390,13 @@ def discover_view_output_specs( priority ``ctr > post > pre > pt > bb`` and partitioned on its Morton code (``uniform_range``), exactly as the table spatial branch does — so the downstream geometry-decode / Morton pipeline is reused unchanged. - - If neither an ``id`` column nor any geometry column is present, a single - un-partitioned ``flat`` spec is returned so the export still produces output. """ view_name = source.annotation_table columns = _resolve_select_columns(connection_string, source, []) - specs: list[DeltaLakeOutputSpec] = [] + # The flat base is always produced first; it doubles as the staging lake + # that partitioned specs are re-partitioned from. + specs: list[DeltaLakeOutputSpec] = [DeltaLakeOutputSpec(name="flat")] if "id" in columns: specs.append( @@ -406,9 +427,6 @@ def discover_view_output_specs( ) ) - if not specs: - specs.append(DeltaLakeOutputSpec(name="flat")) - return specs @@ -1259,6 +1277,324 @@ def export_table_to_deltalake( optimize_callback(spec.name, "vacuum") +def _flush_flat_buffer( + buffer: list[pa.RecordBatch], + base_uri: str, + geometry_columns: list[str], +) -> None: + """Decode geometry and append accumulated batches to a flat Delta Lake. + + Like :func:`_flush_buffer` but writes a single un-partitioned lake (no + partition assignment). Used by :func:`_stream_view_to_flat_lake`. + """ + from deltalake import write_deltalake + + arrow_table = pa.Table.from_batches(buffer) + buffer.clear() + arrow_table = _strip_arrow_extension_types(arrow_table) + df = pl.from_arrow(arrow_table) + del arrow_table + if geometry_columns: + df = decode_geometry_columns(df, geometry_columns) + write_deltalake(base_uri, df.to_arrow(), mode="append") + + +def _stream_view_to_flat_lake( + connection_string: str, + source: TableSource, + base_uri: str, + geometry_columns: list[str], + flush_threshold_bytes: int, + progress_callback: Callable[[int, int | None], None] | None = None, +) -> int: + """Pass 1: stream a view once into a flat Delta Lake; return the row count. + + The view query is executed exactly once. Geometry columns are decoded to + ``{col}_x/_y/_z`` so the materialized lake can be re-partitioned (including + Morton encoding) without re-running the view. The exact row count is + accumulated from the streamed batches — no ``COUNT(*)`` is issued. + """ + buffer: list[pa.RecordBatch] = [] + buffer_bytes = 0 + rows_processed = 0 + + for batch in stream_table_to_arrow( + connection_string, + source, + drop_columns=_DEFAULT_DROP_COLUMNS, + ): + buffer.append(batch) + buffer_bytes += batch.nbytes + rows_processed += batch.num_rows + + if progress_callback is not None: + progress_callback(rows_processed, None) + + if buffer_bytes >= flush_threshold_bytes: + _flush_flat_buffer(buffer, base_uri, geometry_columns) + buffer = [] + buffer_bytes = 0 + + if buffer: + _flush_flat_buffer(buffer, base_uri, geometry_columns) + + return rows_processed + + +def _lake_bytes_per_row(base_uri: str, row_count: int) -> int: + """Average on-disk bytes/row of a Delta Lake, from its add-action stats. + + Parquet is compressed, so this reflects the *output* file size — exactly + what :func:`resolve_n_partitions` needs to target a Delta output file size + (a better basis than the Postgres ``pg_class`` estimate for views). + """ + if row_count <= 0: + return _DEFAULT_BYTES_PER_ROW + from deltalake import DeltaTable + + try: + actions = DeltaTable(base_uri).get_add_actions(flatten=True) + total_bytes = sum(actions.column("size_bytes").to_pylist()) + except Exception: + return _DEFAULT_BYTES_PER_ROW + if total_bytes <= 0: + return _DEFAULT_BYTES_PER_ROW + return max(1, int(total_bytes / row_count)) + + +def _percentile_bounds_from_lake( + base_uri: str, + column: str, + n_partitions: int, +) -> list: + """Percentile breakpoints for *column*, computed from the materialized lake. + + Mirrors :func:`compute_partition_boundaries` but reads from the flat Delta + Lake (cheap, columnar) instead of querying the view. Returns ``n-1`` + breakpoints, or ``[]`` for the trivial / all-null cases. + """ + if n_partitions <= 1: + return [] + fractions = [i / n_partitions for i in range(1, n_partitions)] + lf = pl.scan_delta(base_uri) + row = ( + lf.select( + [ + pl.col(column) + .quantile(f, interpolation="nearest") + .alias(f"q{i}") + for i, f in enumerate(fractions) + ] + ) + .collect() + .row(0) + ) + bounds = [b for b in row if b is not None] + return bounds + + +def _morton_bounds_from_lake( + base_uri: str, + geometry_column: str, + n_partitions: int, +) -> list: + """Uniform Morton-range breakpoints, computed from the materialized lake. + + Mirrors :func:`compute_uniform_range_bounds` + the ``uniform_range`` branch + of :func:`resolve_bounds`: take the bounding box of the decoded + ``{col}_x/_y/_z`` columns, Morton-encode its corners, then ``linspace`` the + interior breakpoints. + """ + if n_partitions <= 1: + return [] + col = geometry_column + stats = ( + pl.scan_delta(base_uri) + .select( + pl.col(f"{col}_x").min().alias("xmin"), + pl.col(f"{col}_x").max().alias("xmax"), + pl.col(f"{col}_y").min().alias("ymin"), + pl.col(f"{col}_y").max().alias("ymax"), + pl.col(f"{col}_z").min().alias("zmin"), + pl.col(f"{col}_z").max().alias("zmax"), + ) + .collect() + .row(0, named=True) + ) + if stats["xmin"] is None: + return [] + corners = morton_encode_3d( + np.array([stats["xmin"], stats["xmax"]], dtype=np.uint64), + np.array([stats["ymin"], stats["ymax"]], dtype=np.uint64), + np.array([stats["zmin"], stats["zmax"]], dtype=np.uint64), + ) + col_min, col_max = float(corners.min()), float(corners.max()) + if col_min == col_max: + return [] + return np.linspace(col_min, col_max, n_partitions + 1)[1:-1].tolist() + + +def _repartition_view_spec_from_lake( + base_uri: str, + spec: DeltaLakeOutputSpec, + output_uri_base: str, + row_count: int, + bytes_per_row: int, + target_partition_size_mb: int, + optimize_max_concurrent_tasks: int = 1, + optimize_target_size: int | None = None, + optimize_max_spill_size: int | None = None, +) -> None: + """Pass 2: write one partitioned spec by reading back the flat base lake. + + Bounds are derived from the materialized data (now that the row count is + known) and rows are streamed batch-by-batch from the base lake — the view + is never touched again. + """ + from deltalake import DeltaTable, write_deltalake + + out_uri = f"{output_uri_base}/{spec.name}" + + n = spec.n_partitions + if n == "auto" or n is None: + effective_target = spec.target_file_size_mb or target_partition_size_mb + n = resolve_n_partitions( + "auto", + row_count, + target_file_size_mb=effective_target, + bytes_per_row=bytes_per_row, + ) + + if spec.partition_strategy == "percentile_range": + bounds = _percentile_bounds_from_lake(base_uri, spec.partition_by, n) + elif spec.partition_strategy == "uniform_range": + bounds = _morton_bounds_from_lake(base_uri, spec.source_geometry_column, n) + else: + bounds = spec.bounds + + partition_col = f"{spec.partition_by}_partition" + dataset = DeltaTable(base_uri).to_pyarrow_dataset() + for batch in dataset.to_batches(): + if batch.num_rows == 0: + continue + df = pl.from_arrow(batch) + if spec.source_geometry_column is not None: + df = add_morton_column(df, spec.source_geometry_column) + if spec.partition_strategy == "hash": + df = assign_hash_partition(df, spec.partition_by, n if isinstance(n, int) else 1) + else: + df = assign_partition(df, spec.partition_by, bounds or []) + write_deltalake( + out_uri, + df.to_arrow(), + mode="append", + partition_by=[partition_col], + ) + + optimize_deltalake( + out_uri, + zorder_columns=spec.zorder_columns or None, + bloom_filter_columns=spec.bloom_filter_columns or None, + fpp=spec.bloom_filter_fpp or 0.001, + max_concurrent_tasks=optimize_max_concurrent_tasks, + target_size=optimize_target_size, + max_spill_size=optimize_max_spill_size, + ) + + +def export_view_to_deltalake( + connection_string: str, + view_name: str, + output_specs: list[DeltaLakeOutputSpec], + output_uri_base: str, + flush_threshold_bytes: int = 2 * 1024 * 1024 * 1024, + target_partition_size_mb: int = 256, + progress_callback: Callable[[int, int | None], None] | None = None, + phase_callback: Callable[[str, str], None] | None = None, + optimize_callback: Callable[[str, str], None] | None = None, + optimize_max_concurrent_tasks: int = 1, + optimize_target_size: int | None = None, + optimize_max_spill_size: int | None = None, +) -> int: + """Two-pass, count-free export of a (regular) view to one or more Delta Lakes. + + A view cannot be cheaply counted or sampled — every upfront query executes + its full (often aggregating) definition. So instead of resolving partition + counts/bounds against the source first, this: + + 1. **Streams the view exactly once** into a flat (un-partitioned) base Delta + Lake, decoding geometry and counting rows as it goes. + 2. **Re-partitions from that materialized lake**: now that the true row + count is known, partition counts and bounds are derived from the cheap + columnar data, and each partitioned spec is written by reading the base + lake back. + + *output_specs* must contain exactly one un-partitioned spec (``partition_by`` + is ``None``) — the flat base; the rest are partitioned specs derived from it. + + Returns the exact streamed row count. + """ + source = TableSource(annotation_table=view_name) + geometry_columns = _get_geometry_columns(connection_string, view_name) + + flat_spec = next((s for s in output_specs if s.partition_by is None), None) + base_name = flat_spec.name if flat_spec is not None else "flat" + base_uri = f"{output_uri_base}/{base_name}" + partitioned_specs = [s for s in output_specs if s.partition_by is not None] + + # --- Pass 1: stream the view once into the flat base lake --- + if phase_callback is not None: + phase_callback("streaming", "Streaming view to flat Delta Lake...") + row_count = _stream_view_to_flat_lake( + connection_string, + source, + base_uri, + geometry_columns, + flush_threshold_bytes, + progress_callback=progress_callback, + ) + + if optimize_callback is not None: + optimize_callback(base_name, "z_order" if flat_spec and flat_spec.zorder_columns else "compact") + optimize_deltalake( + base_uri, + zorder_columns=(flat_spec.zorder_columns or None) if flat_spec else None, + bloom_filter_columns=(flat_spec.bloom_filter_columns or None) if flat_spec else None, + fpp=(flat_spec.bloom_filter_fpp or 0.001) if flat_spec else 0.001, + max_concurrent_tasks=optimize_max_concurrent_tasks, + target_size=optimize_target_size, + max_spill_size=optimize_max_spill_size, + ) + if optimize_callback is not None: + optimize_callback(base_name, "vacuum") + + # --- Pass 2: re-partition each partitioned spec from the base lake --- + bytes_per_row = _lake_bytes_per_row(base_uri, row_count) + for spec in partitioned_specs: + if phase_callback is not None: + phase_callback( + "repartitioning", + f"Re-partitioning '{spec.name}' from flat lake ({row_count:,} rows)...", + ) + if optimize_callback is not None: + optimize_callback(spec.name, "z_order" if spec.zorder_columns else "compact") + _repartition_view_spec_from_lake( + base_uri, + spec, + output_uri_base, + row_count, + bytes_per_row, + target_partition_size_mb, + optimize_max_concurrent_tasks=optimize_max_concurrent_tasks, + optimize_target_size=optimize_target_size, + optimize_max_spill_size=optimize_max_spill_size, + ) + if optimize_callback is not None: + optimize_callback(spec.name, "vacuum") + + return row_count + + def optimize_deltalake( uri: str, zorder_columns: list[str] | None = None, @@ -1651,7 +1987,10 @@ def _phase(phase: str, message: str, **status_kwargs) -> None: is_view = relation_kind == "view" if is_view: - row_count = estimate_view_rows(connection_string, table_name) + # A view's row count is unknown until it is streamed once — counting + # it upfront would execute the full (often aggregating) view query. + # export_view_to_deltalake fills this in from the streamed batches. + row_count = None segmentation_table_name = None else: # Determine if the table was merged and look up row count. @@ -1674,11 +2013,11 @@ def _phase(phase: str, message: str, **status_kwargs) -> None: segmentation_table_name = seg_table_name if has_seg_table else None celery_logger.info( - "%s %s (v%d): %d rows, segmentation=%s", + "%s %s (v%d): %s rows, segmentation=%s", "View" if is_view else "Table", table_name, version, - row_count, + row_count if row_count is not None else "unknown (streamed)", segmentation_table_name or "none", ) @@ -1746,7 +2085,11 @@ def _phase(phase: str, message: str, **status_kwargs) -> None: except Exception: existing_rows = None - if existing_rows is not None and existing_rows != row_count: + if ( + existing_rows is not None + and not is_view + and existing_rows != row_count + ): raise RuntimeError( f"Delta Lake for table {table_name!r} already exists at " f"{uri} but has {existing_rows} rows (expected {row_count}). " @@ -1756,33 +2099,12 @@ def _phase(phase: str, message: str, **status_kwargs) -> None: if existing_rows is not None: raise RuntimeError( - f"Delta Lake for table {table_name!r} already exists at " - f"{uri} with {existing_rows} rows (matches expected count). " + f"Delta Lake for {table_name!r} already exists at " + f"{uri} with {existing_rows} rows. " f"Delete the existing Delta Lake before re-exporting." ) - # --- Estimate bytes per row and resolve partition counts / bounds --- - _phase( - "computing_boundaries", - f"Resolved {len(resolved_specs)} output specs. " - "Computing partition boundaries...", - total_rows=row_count, - ) - bytes_per_row = estimate_bytes_per_row(connection_string, source) - - for spec in resolved_specs: - if spec.n_partitions == "auto" or spec.n_partitions is None: - effective_target = spec.target_file_size_mb or target_partition_size_mb - spec.n_partitions = resolve_n_partitions( - "auto", - row_count, - target_file_size_mb=effective_target, - bytes_per_row=bytes_per_row, - ) - - resolve_all_bounds(resolved_specs, connection_string, table_name) - - # --- Stream and write --- + # --- Progress / optimize callbacks (shared by both paths) --- _last_log_time = {"t": 0.0} def _log_progress(rows_so_far: int, total: int | None) -> None: @@ -1807,12 +2129,6 @@ def _progress(rows_so_far: int, total: int | None) -> None: _log_progress(rows_so_far, total) redis_callback(rows_so_far, total) - _phase( - "streaming", - f"Streaming {row_count:,} rows to Delta Lake...", - total_rows=row_count, - ) - def _optimize_callback(spec_name: str, action: str) -> None: msg = ( f"Vacuuming Delta Lake '{spec_name}'..." @@ -1820,26 +2136,68 @@ def _optimize_callback(spec_name: str, action: str) -> None: else f"Optimizing Delta Lake '{spec_name}' ({action})..." ) _log(msg) - _set_status( - status="exporting", + _set_status(status="exporting", phase="optimizing") + + if is_view: + # Views can't be counted/sampled cheaply, so stream once into a flat + # lake (which yields the exact row count) and re-partition from it. + row_count = export_view_to_deltalake( + connection_string=connection_string, + view_name=table_name, + output_specs=resolved_specs, + output_uri_base=output_uri_base, + flush_threshold_bytes=flush_threshold, + target_partition_size_mb=target_partition_size_mb, + progress_callback=_progress, + phase_callback=lambda phase, message: _phase(phase, message), + optimize_callback=_optimize_callback, + optimize_max_concurrent_tasks=optimize_max_concurrent_tasks, + optimize_target_size=optimize_target_size, + optimize_max_spill_size=optimize_max_spill_size, + ) + else: + # --- Estimate bytes per row and resolve partition counts / bounds --- + _phase( + "computing_boundaries", + f"Resolved {len(resolved_specs)} output specs. " + "Computing partition boundaries...", total_rows=row_count, - rows_processed=row_count, - phase="optimizing", ) + bytes_per_row = estimate_bytes_per_row(connection_string, source) - export_table_to_deltalake( - connection_string=connection_string, - source=source, - output_specs=resolved_specs, - output_uri_base=output_uri_base, - flush_threshold_bytes=flush_threshold, - total_rows=row_count, - progress_callback=_progress, - optimize_callback=_optimize_callback, - optimize_max_concurrent_tasks=optimize_max_concurrent_tasks, - optimize_target_size=optimize_target_size, - optimize_max_spill_size=optimize_max_spill_size, - ) + for spec in resolved_specs: + if spec.n_partitions == "auto" or spec.n_partitions is None: + effective_target = ( + spec.target_file_size_mb or target_partition_size_mb + ) + spec.n_partitions = resolve_n_partitions( + "auto", + row_count, + target_file_size_mb=effective_target, + bytes_per_row=bytes_per_row, + ) + + resolve_all_bounds(resolved_specs, connection_string, table_name) + + _phase( + "streaming", + f"Streaming {row_count:,} rows to Delta Lake...", + total_rows=row_count, + ) + + export_table_to_deltalake( + connection_string=connection_string, + source=source, + output_specs=resolved_specs, + output_uri_base=output_uri_base, + flush_threshold_bytes=flush_threshold, + total_rows=row_count, + progress_callback=_progress, + optimize_callback=_optimize_callback, + optimize_max_concurrent_tasks=optimize_max_concurrent_tasks, + optimize_target_size=optimize_target_size, + optimize_max_spill_size=optimize_max_spill_size, + ) except Exception as e: try: _log(f"Export failed: {e}") diff --git a/tests/test_deltalake_export.py b/tests/test_deltalake_export.py index 7803ba7f..7e1c4e5c 100644 --- a/tests/test_deltalake_export.py +++ b/tests/test_deltalake_export.py @@ -31,6 +31,7 @@ estimate_bytes_per_row, estimate_view_rows, export_table_to_deltalake, + export_view_to_deltalake, morton_encode_3d, optimize_deltalake, resolve_bounds, @@ -192,29 +193,46 @@ def test_missing_relation_returns_none(self, mock_fetchone): class TestEstimateViewRows: - """estimate_view_rows uses reltuples, falling back to COUNT(*) when <= 0.""" + """estimate_view_rows uses reltuples when positive, else the EXPLAIN + planner estimate — never a full COUNT(*).""" - @patch("materializationengine.workflows.deltalake_export._adbc_fetchone") - def test_uses_reltuples_when_positive(self, mock_fetchone): - mock_fetchone.return_value = (12345.0,) - result = estimate_view_rows("postgresql://localhost/db", "my_view") - assert result == 12345 - # Only the reltuples query should run — no COUNT(*) fallback. - assert mock_fetchone.call_count == 1 + def _make_engine(self, fetch_results): + """Return a mock engine whose connection .execute().fetchone() + yields *fetch_results* in order.""" + results = list(fetch_results) + conn = MagicMock() - @patch("materializationengine.workflows.deltalake_export._adbc_fetchone") - def test_falls_back_to_count_when_zero(self, mock_fetchone): - # First call: reltuples = 0 (stale stats). Second call: COUNT(*). - mock_fetchone.side_effect = [(0.0,), (777,)] - result = estimate_view_rows("postgresql://localhost/db", "my_view") - assert result == 777 - assert mock_fetchone.call_count == 2 + def _execute(*args, **kwargs): + res = MagicMock() + res.fetchone.return_value = results.pop(0) + return res - @patch("materializationengine.workflows.deltalake_export._adbc_fetchone") - def test_falls_back_when_no_stats_row(self, mock_fetchone): - mock_fetchone.side_effect = [None, (42,)] - result = estimate_view_rows("postgresql://localhost/db", "my_view") - assert result == 42 + conn.execute.side_effect = _execute + engine = MagicMock() + engine.connect.return_value.__enter__ = lambda s: conn + engine.connect.return_value.__exit__ = MagicMock(return_value=False) + return engine, conn + + def test_uses_reltuples_when_positive(self): + # reltuples query returns a positive value → used directly, no EXPLAIN. + engine, conn = self._make_engine([(12345.0,)]) + assert estimate_view_rows(engine, "my_view") == 12345 + assert conn.execute.call_count == 1 + + def test_explain_estimate_when_reltuples_negative(self): + # reltuples = -1 (never analyzed) → fall back to EXPLAIN planner estimate. + plan_json = [{"Plan": {"Plan Rows": 142358}}] + engine, conn = self._make_engine([(-1.0,), (plan_json,)]) + assert estimate_view_rows(engine, "my_view") == 142358 + assert conn.execute.call_count == 2 + + def test_explain_estimate_parses_json_string(self): + # Some drivers return the EXPLAIN JSON as a string. + import json as _json + + plan_str = _json.dumps([{"Plan": {"Plan Rows": 999}}]) + engine, _ = self._make_engine([(0.0,), (plan_str,)]) + assert estimate_view_rows(engine, "my_view") == 999 class TestDiscoverViewOutputSpecs: @@ -234,6 +252,19 @@ def _patch(self, columns, geometry_columns): ), ) + def test_always_includes_flat_base(self): + # The flat (un-partitioned) base is always the first spec — it doubles + # as the staging lake that partitioned specs are derived from. + source = TableSource(annotation_table="my_view") + cols_patch, geom_patch = self._patch( + ["id", "pt_position", "score"], ["pt_position"] + ) + with cols_patch, geom_patch: + specs = discover_view_output_specs(source, "postgresql://localhost/db") + + assert specs[0].name == "flat" + assert specs[0].partition_by is None + def test_id_and_geometry(self): source = TableSource(annotation_table="my_view") cols_patch, geom_patch = self._patch( @@ -242,7 +273,8 @@ def test_id_and_geometry(self): with cols_patch, geom_patch: specs = discover_view_output_specs(source, "postgresql://localhost/db") - assert len(specs) == 2 + # flat + id + pt_position_morton + assert {s.name for s in specs} == {"flat", "id", "pt_position_morton"} id_spec = next(s for s in specs if s.name == "id") assert id_spec.partition_by == "id" assert id_spec.partition_strategy == "percentile_range" @@ -266,7 +298,7 @@ def test_multiple_geometry_columns_use_prefix_priority(self): assert len(morton_specs) == 1 assert morton_specs[0].source_geometry_column == "pt_position" - def test_no_id_no_geometry_returns_flat(self): + def test_no_id_no_geometry_returns_flat_only(self): source = TableSource(annotation_table="my_view") cols_patch, geom_patch = self._patch(["score", "label"], []) with cols_patch, geom_patch: @@ -1455,3 +1487,143 @@ def test_get_progress_includes_log_entries(self, mock_redis_client): assert result["log_entries"] == ["12:00:01 Starting", "12:00:02 Streaming"] assert result["phase"] == "streaming" assert result["error"] is None + + +# --------------------------------------------------------------------------- +# export_view_to_deltalake — two-pass (stream-flat then re-partition) +# --------------------------------------------------------------------------- + + +class TestExportViewToDeltalake: + """End-to-end of the two-pass view exporter against a real local Delta + Lake, with only the Postgres stream mocked.""" + + def _make_batches(self, n_rows, n_batches=3): + """Yield real pyarrow RecordBatches: id, score, pt_position (3D WKB).""" + per = max(1, n_rows // n_batches) + made = 0 + batches = [] + i = 0 + while made < n_rows: + count = min(per, n_rows - made) + ids = list(range(made, made + count)) + scores = [float(x) * 1.5 for x in ids] + wkb = [ + shapely.to_wkb(shapely.Point(x % 1000, (x * 2) % 1000, (x * 3) % 1000)) + for x in ids + ] + batches.append( + pa.RecordBatch.from_arrays( + [ + pa.array(ids, type=pa.int64()), + pa.array(scores, type=pa.float64()), + pa.array(wkb, type=pa.binary()), + ], + names=["id", "score", "pt_position"], + ) + ) + made += count + i += 1 + return batches + + def test_two_pass_flat_and_partitioned(self, tmp_path): + from deltalake import DeltaTable + + n_rows = 30 + batches = self._make_batches(n_rows) + output_uri_base = str(tmp_path / "out") + + specs = [ + DeltaLakeOutputSpec(name="flat"), + DeltaLakeOutputSpec( + name="id", + partition_by="id", + partition_strategy="percentile_range", + n_partitions=3, + zorder_columns=["id"], + source_table="my_view", + ), + DeltaLakeOutputSpec( + name="pt_position_morton", + partition_by="pt_position_morton", + partition_strategy="uniform_range", + n_partitions=3, + zorder_columns=["pt_position_morton"], + source_geometry_column="pt_position", + source_table="my_view", + ), + ] + + with patch( + "materializationengine.workflows.deltalake_export.stream_table_to_arrow", + return_value=iter(batches), + ), patch( + "materializationengine.workflows.deltalake_export._get_geometry_columns", + return_value=["pt_position"], + ), patch( + # optimize/compact/vacuum is delta-rs internals — skip for the unit test. + "materializationengine.workflows.deltalake_export.optimize_deltalake" + ): + returned = export_view_to_deltalake( + connection_string="postgresql://ignored/db", + view_name="my_view", + output_specs=specs, + output_uri_base=output_uri_base, + flush_threshold_bytes=1, # force a flush per batch + ) + + # Exact row count comes from streaming, not a COUNT(*). + assert returned == n_rows + + # Flat base: decoded geometry, no partition column, all rows. + flat = DeltaTable(f"{output_uri_base}/flat") + assert flat.count() == n_rows + flat_cols = set(flat.schema().to_arrow().names) + assert {"id", "score", "pt_position_x", "pt_position_y", "pt_position_z"} <= flat_cols + assert "pt_position" not in flat_cols # WKB dropped after decode + + # id partition: same rows, gains the Hive partition column. + id_lake = DeltaTable(f"{output_uri_base}/id") + assert id_lake.count() == n_rows + assert "id_partition" in set(id_lake.schema().to_arrow().names) + + # morton partition: same rows, morton + partition columns present. + morton_lake = DeltaTable(f"{output_uri_base}/pt_position_morton") + assert morton_lake.count() == n_rows + m_cols = set(morton_lake.schema().to_arrow().names) + assert "pt_position_morton_partition" in m_cols + + def test_flat_only_view_no_second_pass(self, tmp_path): + from deltalake import DeltaTable + + n_rows = 12 + batches = self._make_batches(n_rows) + output_uri_base = str(tmp_path / "out") + + # A view with no id / geometry → single flat spec. + specs = [DeltaLakeOutputSpec(name="flat")] + + with patch( + "materializationengine.workflows.deltalake_export.stream_table_to_arrow", + return_value=iter(batches), + ), patch( + "materializationengine.workflows.deltalake_export._get_geometry_columns", + return_value=[], + ), patch( + "materializationengine.workflows.deltalake_export.optimize_deltalake" + ): + returned = export_view_to_deltalake( + connection_string="postgresql://ignored/db", + view_name="degree_view", + output_specs=specs, + output_uri_base=output_uri_base, + flush_threshold_bytes=1, + ) + + assert returned == n_rows + flat = DeltaTable(f"{output_uri_base}/flat") + assert flat.count() == n_rows + # No geometry to decode → original columns preserved, no partition col. + cols = set(flat.schema().to_arrow().names) + assert "pt_position" in cols # untouched binary column + assert not any(c.endswith("_partition") for c in cols) From 8ea75c536592ca208ba6041c6f9196ce6eac086a Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Fri, 5 Jun 2026 09:59:04 -0700 Subject: [PATCH 4/5] adding heartbeat for parquet dump of views --- .../workflows/deltalake_export.py | 108 +++++++++++++++--- static/js/deltalakeRunningExports.js | 11 ++ templates/deltalake/running_exports.html | 17 +++ tests/test_deltalake_export.py | 50 ++++++++ 4 files changed, 172 insertions(+), 14 deletions(-) diff --git a/materializationengine/workflows/deltalake_export.py b/materializationengine/workflows/deltalake_export.py index cfe94c3d..35843943 100644 --- a/materializationengine/workflows/deltalake_export.py +++ b/materializationengine/workflows/deltalake_export.py @@ -5,6 +5,7 @@ import re import time from collections.abc import Callable +from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Literal @@ -1775,6 +1776,37 @@ def _get_redis_client(): ) +@contextmanager +def _progress_heartbeat(write_fn: Callable[[], None], interval: float = 10.0): + """Call *write_fn* every *interval* seconds in a daemon thread. + + Used to keep an export's Redis status fresh while the worker is blocked on + a long, silent operation — e.g. an aggregating view where Postgres emits no + rows until the whole aggregation completes, so the per-batch progress + callback would otherwise not fire for minutes. *write_fn* must be safe to + call concurrently with the main thread's own status writes (last-writer-wins + on the same Redis key, both deriving from the same shared counter). + """ + import threading + + stop = threading.Event() + + def _run() -> None: + while not stop.wait(interval): + try: + write_fn() + except Exception: + celery_logger.warning("progress heartbeat failed", exc_info=True) + + thread = threading.Thread(target=_run, daemon=True) + thread.start() + try: + yield + finally: + stop.set() + thread.join(timeout=2) + + def make_redis_progress_callback( datastack: str, version: int, @@ -1815,8 +1847,15 @@ def set_deltalake_export_status( phase: str | None = None, error: str | None = None, job_id: str | None = None, + elapsed_seconds: float | None = None, + rows_per_second: float | None = None, ) -> None: - """Write a terminal status (``complete``, ``failed``, etc.) to Redis.""" + """Write a terminal status (``complete``, ``failed``, etc.) to Redis. + + *elapsed_seconds* and *rows_per_second* provide a live "heartbeat" for + exports whose total row count is unknown (e.g. views): the UI can show that + work is progressing — and how fast — even before/without a percentage. + """ client = _get_redis_client() key = deltalake_export_redis_key(datastack, version, table_name, job_id=job_id) pct = None @@ -1828,6 +1867,12 @@ def set_deltalake_export_status( "rows_processed": rows_processed, "total_rows": total_rows, "percent_complete": pct, + "elapsed_seconds": ( + round(elapsed_seconds, 1) if elapsed_seconds is not None else None + ), + "rows_per_second": ( + round(rows_per_second) if rows_per_second is not None else None + ), "error": error, "last_updated": datetime.now(timezone.utc).isoformat(), } @@ -2141,20 +2186,55 @@ def _optimize_callback(spec_name: str, action: str) -> None: if is_view: # Views can't be counted/sampled cheaply, so stream once into a flat # lake (which yields the exact row count) and re-partition from it. - row_count = export_view_to_deltalake( - connection_string=connection_string, - view_name=table_name, - output_specs=resolved_specs, - output_uri_base=output_uri_base, - flush_threshold_bytes=flush_threshold, - target_partition_size_mb=target_partition_size_mb, - progress_callback=_progress, - phase_callback=lambda phase, message: _phase(phase, message), - optimize_callback=_optimize_callback, - optimize_max_concurrent_tasks=optimize_max_concurrent_tasks, - optimize_target_size=optimize_target_size, - optimize_max_spill_size=optimize_max_spill_size, + # Postgres emits no rows until an aggregating view is fully computed, + # so a heartbeat keeps the UI live (elapsed + rows/s) during that wait + # — without it the per-batch callback wouldn't fire for minutes. + _view_state = {"rows": 0, "phase": "streaming"} + _view_start = time.monotonic() + + def _view_status_write() -> None: + elapsed = time.monotonic() - _view_start + rows = _view_state["rows"] + rate = rows / elapsed if elapsed > 0 else None + _set_status( + status="exporting", + phase=_view_state["phase"], + rows_processed=rows, + elapsed_seconds=elapsed, + rows_per_second=rate, + ) + + def _view_progress(rows_so_far: int, total: int | None) -> None: + _view_state["rows"] = rows_so_far + _log_progress(rows_so_far, total) + _view_status_write() + + def _view_phase(phase: str, message: str) -> None: + _view_state["phase"] = phase + celery_logger.info("%s (v%d): %s", table_name, version, message) + _log(message) + _view_status_write() + + _log( + "Executing view query in Postgres — for large aggregating views " + "the first rows can take several minutes." ) + + with _progress_heartbeat(_view_status_write, interval=10.0): + row_count = export_view_to_deltalake( + connection_string=connection_string, + view_name=table_name, + output_specs=resolved_specs, + output_uri_base=output_uri_base, + flush_threshold_bytes=flush_threshold, + target_partition_size_mb=target_partition_size_mb, + progress_callback=_view_progress, + phase_callback=_view_phase, + optimize_callback=_optimize_callback, + optimize_max_concurrent_tasks=optimize_max_concurrent_tasks, + optimize_target_size=optimize_target_size, + optimize_max_spill_size=optimize_max_spill_size, + ) else: # --- Estimate bytes per row and resolve partition counts / bounds --- _phase( diff --git a/static/js/deltalakeRunningExports.js b/static/js/deltalakeRunningExports.js index 4941715d..7e4d5c1b 100644 --- a/static/js/deltalakeRunningExports.js +++ b/static/js/deltalakeRunningExports.js @@ -22,6 +22,17 @@ document.addEventListener("alpine:init", () => { return this.progress[this.exportId(exp)] || {}; }, + fmtDuration(seconds) { + if (seconds == null) return ""; + const total = Math.floor(seconds); + const s = total % 60; + const m = Math.floor((total / 60) % 60); + const h = Math.floor(total / 3600); + if (h > 0) return `${h}h ${m}m ${s}s`; + if (m > 0) return `${m}m ${s}s`; + return `${s}s`; + }, + async pollAll() { await Promise.all(this.exports.map((exp) => this.pollOne(exp))); // Stop polling if all are terminal diff --git a/templates/deltalake/running_exports.html b/templates/deltalake/running_exports.html index c43955f7..fecb61fc 100644 --- a/templates/deltalake/running_exports.html +++ b/templates/deltalake/running_exports.html @@ -122,6 +122,23 @@

No Exports

+ +
+
+ + rows processed + + + +
+
+
+
+ + Postgres is computing the view; the first rows can take several minutes for large aggregating views. + +
+
Error: diff --git a/tests/test_deltalake_export.py b/tests/test_deltalake_export.py index 7e1c4e5c..8946e179 100644 --- a/tests/test_deltalake_export.py +++ b/tests/test_deltalake_export.py @@ -4,6 +4,7 @@ bucket boundary computation, and bucket assignment strategies. """ +import time from unittest.mock import MagicMock, patch import numpy as np @@ -1441,6 +1442,55 @@ def test_status_includes_phase_and_error(self, mock_redis_client): assert payload["error"] == "Connection reset" assert payload["percent_complete"] == 50.0 + @patch("materializationengine.workflows.deltalake_export._get_redis_client") + def test_status_includes_heartbeat_fields(self, mock_redis_client): + import json + + from materializationengine.workflows.deltalake_export import ( + set_deltalake_export_status, + ) + + mock_client = MagicMock() + mock_redis_client.return_value = mock_client + + # No total_rows (e.g. a view) — heartbeat carries elapsed + rate. + set_deltalake_export_status( + "minnie65", + 943, + "my_view", + "exporting", + rows_processed=12345, + phase="streaming", + elapsed_seconds=42.37, + rows_per_second=291.6, + ) + + payload = json.loads(mock_client.set.call_args[0][1]) + assert payload["rows_processed"] == 12345 + assert payload["total_rows"] is None + assert payload["percent_complete"] is None + assert payload["elapsed_seconds"] == 42.4 # rounded to 1 decimal + assert payload["rows_per_second"] == 292 # rounded to int + + +class TestProgressHeartbeat: + """_progress_heartbeat calls write_fn periodically until the block exits.""" + + def test_heartbeat_ticks_and_stops(self): + from materializationengine.workflows.deltalake_export import ( + _progress_heartbeat, + ) + + calls = [] + with _progress_heartbeat(lambda: calls.append(1), interval=0.05): + time.sleep(0.18) # expect ~3 ticks + ticks_at_exit = len(calls) + + assert ticks_at_exit >= 2 # fired periodically while inside the block + time.sleep(0.12) + # No further ticks after the context exits (thread stopped). + assert len(calls) == ticks_at_exit + @patch("materializationengine.workflows.deltalake_export._get_redis_client") def test_append_deltalake_log(self, mock_redis_client): from materializationengine.workflows.deltalake_export import ( From 5ebb9dcec7c0cce4c6a15efe79d058ae45131c4e Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Fri, 5 Jun 2026 11:31:36 -0700 Subject: [PATCH 5/5] resolve merge conflicts --- .../blueprints/deltalake/api.py | 36 ++++-------- .../workflows/deltalake_export.py | 58 ++++++++----------- 2 files changed, 35 insertions(+), 59 deletions(-) diff --git a/materializationengine/blueprints/deltalake/api.py b/materializationengine/blueprints/deltalake/api.py index afc65d08..a827444f 100644 --- a/materializationengine/blueprints/deltalake/api.py +++ b/materializationengine/blueprints/deltalake/api.py @@ -271,32 +271,18 @@ def discover_specs(datastack_name): bytes_per_row = estimate_bytes_per_row(connection_string, source) + # For a small table, collapse to a single output — partitioning a tiny + # table just produces many undersized files. (For views the first spec + # is the flat base, so this keeps the flat export.) + small_table_threshold_mb = int( + get_config_param("DELTALAKE_SMALL_TABLE_THRESHOLD_MB", 200) + ) + estimated_total_mb = row_count * bytes_per_row / (1024 * 1024) + if estimated_total_mb < small_table_threshold_mb and len(resolved_specs) > 1: + resolved_specs = resolved_specs[:1] + # Track which specs had "auto" before resolution (for caching). was_auto = [spec.n_partitions == "auto" for spec in resolved_specs] - # Discover specs. - resolved_specs = discover_default_output_specs(source, engine) - bytes_per_row = estimate_bytes_per_row(connection_string, source) - - small_table_threshold_mb = int( - get_config_param("DELTALAKE_SMALL_TABLE_THRESHOLD_MB", 200) - ) - estimated_total_mb = row_count * bytes_per_row / (1024 * 1024) - if estimated_total_mb < small_table_threshold_mb and len(resolved_specs) > 1: - resolved_specs = resolved_specs[:1] - - # Track which specs had "auto" before resolution (for caching). - was_auto = [spec.n_partitions == "auto" for spec in resolved_specs] - - # Resolve partition counts. - for spec in resolved_specs: - if spec.n_partitions == "auto": - effective_target = spec.target_file_size_mb or target_partition_size_mb - spec.n_partitions = resolve_n_partitions( - "auto", - row_count, - target_file_size_mb=effective_target, - bytes_per_row=bytes_per_row, - ) # Resolve partition counts. for spec in resolved_specs: @@ -369,7 +355,7 @@ def discover_specs(datastack_name): except Exception as e: current_app.logger.error( "discover_specs failed for %s v%s table %r: %s", - datastack, + datastack_name, version, table_name, e, diff --git a/materializationengine/workflows/deltalake_export.py b/materializationengine/workflows/deltalake_export.py index 94a42f44..2666502d 100644 --- a/materializationengine/workflows/deltalake_export.py +++ b/materializationengine/workflows/deltalake_export.py @@ -2149,40 +2149,11 @@ def _phase(phase: str, message: str, **status_kwargs) -> None: f"Delete the existing Delta Lake before re-exporting." ) - # --- Estimate bytes per row and resolve partition counts / bounds --- - _phase( - "computing_boundaries", - f"Resolved {len(resolved_specs)} output specs. " - "Computing partition boundaries...", - total_rows=row_count, - ) - bytes_per_row = estimate_bytes_per_row(connection_string, source) - - small_table_threshold_mb = get_config_param( - "DELTALAKE_SMALL_TABLE_THRESHOLD_MB", 200 - ) - estimated_total_mb = row_count * bytes_per_row / (1024 * 1024) - if estimated_total_mb < small_table_threshold_mb and len(resolved_specs) > 1: - celery_logger.info( - "Table %s estimated size %.1f MB < threshold %s MB — " - "trimming to single (id) spec", - table_name, - estimated_total_mb, - small_table_threshold_mb, - ) - resolved_specs = resolved_specs[:1] - - for spec in resolved_specs: - if spec.n_partitions == "auto" or spec.n_partitions is None: - effective_target = spec.target_file_size_mb or target_partition_size_mb - spec.n_partitions = resolve_n_partitions( - "auto", - row_count, - target_file_size_mb=effective_target, - bytes_per_row=bytes_per_row, - ) - - resolve_all_bounds(resolved_specs, connection_string, table_name) + # Boundary computation / partition sizing differs by relation kind and + # is handled per-branch below: views can't be counted or sampled + # upfront (it would execute the full view), so the table-only steps + # (estimate_bytes_per_row, small-table trim, resolve_all_bounds) must + # NOT run for views — they are done in the else branch only. # --- Stream and write --- _last_log_time = {"t": 0.0} @@ -2280,6 +2251,25 @@ def _view_phase(phase: str, message: str) -> None: ) bytes_per_row = estimate_bytes_per_row(connection_string, source) + # For a small table, collapse to a single output spec — partitioning + # a tiny table just produces many undersized files. + small_table_threshold_mb = get_config_param( + "DELTALAKE_SMALL_TABLE_THRESHOLD_MB", 200 + ) + estimated_total_mb = row_count * bytes_per_row / (1024 * 1024) + if ( + estimated_total_mb < small_table_threshold_mb + and len(resolved_specs) > 1 + ): + celery_logger.info( + "Table %s estimated size %.1f MB < threshold %s MB — " + "trimming to single spec", + table_name, + estimated_total_mb, + small_table_threshold_mb, + ) + resolved_specs = resolved_specs[:1] + for spec in resolved_specs: if spec.n_partitions == "auto" or spec.n_partitions is None: effective_target = (