diff --git a/python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py
index d9a26bf4..ad210f61 100644
--- a/python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py
+++ b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py
@@ -49,9 +49,9 @@
from .._arrow import write_parquet
from ..ppdb import Ppdb, PpdbReplicaChunk
-from ..ppdb_config import PpdbConfig
from ..sql import PasswordProvider, PpdbSqlBase, PpdbSqlBaseConfig
from .manifest import Manifest, TableStats
+from .ppdb_bigquery_config import PpdbBigQueryConfig
from .ppdb_replica_chunk_extended import ChunkStatus, PpdbReplicaChunkExtended
from .updates.update_records import UpdateRecords
@@ -66,53 +66,6 @@
"""
-class PpdbBigQueryConfig(PpdbConfig):
- """Configuration for BigQuery-based PPDB."""
-
- project_id: str
- """Google Cloud project ID."""
-
- dataset_id: str
- """Target BigQuery dataset ID, without the project."""
-
- bucket_name: str
- """Name of Google Cloud Storage bucket for uploading chunks."""
-
- object_prefix: str
- """Base prefix for the object in cloud storage."""
-
- replication_dir: str
- """Directory where the exported chunks will be stored."""
-
- stage_chunk_topic: str = "stage-chunk-topic"
- """Pub/Sub topic name for triggering chunk staging process."""
-
- parq_batch_size: int = 10000
- """Number of rows to process in each batch when writing parquet files."""
-
- parq_compression: str = "snappy"
- """Compression format for Parquet files."""
-
- delete_existing_dirs: bool = False
- """If `True`, existing directories for chunks will be deleted before
- export. If `False`, an error will be raised if the directory already
- exists.
- """
-
- sql: PpdbSqlBaseConfig
- """SQL database configuration (`~lsst.dax.ppdb.sql.PpdbSqlBaseConfig`)."""
-
- @property
- def replication_path(self) -> Path:
- """Return path for writing replica chunk data (`pathlib.Path`)."""
- return Path(self.replication_dir)
-
- @property
- def fq_dataset_id(self) -> str:
- """Fully qualified BigQuery dataset ID, including project (`str`)."""
- return f"{self.project_id}:{self.dataset_id}"
-
-
class _SecretManagerPasswordProvider(PasswordProvider):
"""Retrieves a database password from Google Cloud Secret Manager.
diff --git a/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py
new file mode 100644
index 00000000..919c7d1b
--- /dev/null
+++ b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py
@@ -0,0 +1,143 @@
+# This file is part of dax_ppdb
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from enum import StrEnum
+from pathlib import Path
+
+from pydantic import BaseModel
+
+from ..ppdb_config import PpdbConfig
+from ..sql import PpdbSqlBaseConfig
+
+PPDB_PREFIX = "ppdb"
+
+
+class DatasetType(StrEnum):
+ """Typed keys for BigQuery dataset categories."""
+
+ STAGING = "staging"
+ INTERNAL = "internal"
+ PUBLIC = "public"
+
+
+class Datasets(BaseModel):
+ """Mapping of dataset types to their target names in BigQuery."""
+
+ staging: str = f"{PPDB_PREFIX}_staging"
+ """Name of the staging dataset used for intermediate storage during
+ replication and promotion processes."""
+
+ internal: str = f"{PPDB_PREFIX}_internal"
+ """Name of the internal dataset containing the fully replicated and
+ promoted data."""
+
+ public: str = f"{PPDB_PREFIX}_public"
+ """Name of the public dataset which is presented through the TAP
+ interface."""
+
+ def name_for(self, dataset_type: DatasetType) -> str:
+ """Return the configured dataset name for a dataset type."""
+ match dataset_type:
+ case DatasetType.INTERNAL:
+ return self.internal
+ case DatasetType.PUBLIC:
+ return self.public
+ case DatasetType.STAGING:
+ return self.staging
+
+ # Defensive check in case a bad value was passed at runtime.
+ raise ValueError(f"Unsupported dataset type: {dataset_type!r}")
+
+
+DEFAULT_FELIS_SCHEMA_URI = "resource://lsst.sdm.schemas/ppdb.yaml"
+"""Default URI for the Felis PPDB schema in SDM Schemas."""
+
+
+class PpdbBigQueryConfig(PpdbConfig):
+ """Configuration for BigQuery-based PPDB."""
+
+ felis_schema_uri: str = DEFAULT_FELIS_SCHEMA_URI
+ """URI for the FELIS schema used in the PPDB."""
+
+ project_id: str
+ """Google Cloud project ID."""
+
+ dataset_id: str # TODO: This will need to be deprecated and removed (DM-54681).
+ """Target BigQuery dataset ID, without the project."""
+
+ bucket_name: str
+ """Name of Google Cloud Storage bucket for uploading chunks."""
+
+ object_prefix: str
+ """Base prefix for the object in cloud storage."""
+
+ replication_dir: str
+ """Directory where the exported chunks will be stored."""
+
+ stage_chunk_topic: str = "stage-chunk-topic"
+ """Pub/Sub topic name for triggering chunk staging process."""
+
+ parq_batch_size: int = 10000
+ """Number of rows to process in each batch when writing parquet files."""
+
+ parq_compression: str = "snappy"
+ """Compression format for Parquet files."""
+
+ delete_existing_dirs: bool = False
+ """If `True`, existing directories for chunks will be deleted before
+ export. If `False`, an error will be raised if the directory already
+ exists.
+ """
+
+ sql: PpdbSqlBaseConfig
+ """SQL database configuration (`~lsst.dax.ppdb.sql.PpdbSqlBaseConfig`)."""
+
+ datasets: Datasets = Datasets()
+ """Names of the BigQuery datasets used for different purposes
+ (`Datasets`).
+ """
+
+ @property
+ def replication_path(self) -> Path:
+ """Return path for writing replica chunk data (`pathlib.Path`)."""
+ return Path(self.replication_dir)
+
+ # TODO: This will need to be deprecated and removed (DM-54681).
+ @property
+ def fq_dataset_id(self) -> str:
+ """Fully qualified BigQuery dataset ID, including project (`str`)."""
+ return f"{self.project_id}:{self.dataset_id}"
+
+ def fqn_for(self, dataset_type: DatasetType) -> str:
+ """Return the fully qualified BigQuery dataset name for a dataset type.
+
+ Parameters
+ ----------
+ dataset_type
+ Type of dataset to get the name for.
+
+ Returns
+ -------
+ str
+ Fully qualified BigQuery dataset name (project.dataset).
+ """
+ dataset_name = self.datasets.name_for(dataset_type)
+ return f"{self.project_id}.{dataset_name}"
diff --git a/python/lsst/dax/ppdb/bigquery/schema/__init__.py b/python/lsst/dax/ppdb/bigquery/schema/__init__.py
new file mode 100644
index 00000000..f6afdfc2
--- /dev/null
+++ b/python/lsst/dax/ppdb/bigquery/schema/__init__.py
@@ -0,0 +1,23 @@
+# This file is part of dax_ppdb
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from .dataset_builder import *
+from .felis_converter import *
diff --git a/python/lsst/dax/ppdb/bigquery/schema/dataset_builder.py b/python/lsst/dax/ppdb/bigquery/schema/dataset_builder.py
new file mode 100644
index 00000000..d66ac0fe
--- /dev/null
+++ b/python/lsst/dax/ppdb/bigquery/schema/dataset_builder.py
@@ -0,0 +1,476 @@
+# This file is part of dax_ppdb
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from __future__ import annotations
+
+__all__ = [
+ "DatasetBuildManager",
+ "DatasetBuilder",
+ "DatasetBuilderError",
+ "InternalDatasetBuilder",
+ "PublicDatasetBuilder",
+ "StagingDatasetBuilder",
+]
+
+import logging
+from abc import ABC, abstractmethod
+from collections.abc import Mapping
+from types import MappingProxyType
+from typing import ClassVar
+
+from felis import Schema
+from google.api_core.exceptions import Conflict
+from google.cloud import bigquery
+
+from lsst.dax.apdb import ApdbTables
+
+from ..ppdb_bigquery_config import DatasetType, PpdbBigQueryConfig
+from .felis_converter import FelisConverter
+
+_LOG = logging.getLogger(__name__)
+
+_DIA_TABLES: tuple[str, ...] = (
+ ApdbTables.DiaObject.value,
+ ApdbTables.DiaSource.value,
+ ApdbTables.DiaForcedSource.value,
+)
+
+
+class DatasetBuilderError(RuntimeError):
+ """Raised when dataset builder operations fail."""
+
+
+class DatasetBuilder(ABC):
+ """Build a specific type of BigQuery dataset for the PPDB."""
+
+ dataset_type: ClassVar[DatasetType]
+ """Dataset type this builder handles."""
+
+ @abstractmethod
+ def create_tables(
+ self,
+ config: PpdbBigQueryConfig,
+ converter: FelisConverter,
+ ) -> list[bigquery.Table]:
+ """Create BigQuery table objects for a dataset type.
+
+ Parameters
+ ----------
+ config
+ The PPDB BigQuery configuration containing dataset names and other
+ settings.
+ converter
+ A FelisConverter initialized with the PPDB schema for converting
+ Felis tables to BigQuery tables.
+
+ Returns
+ -------
+ `list` [`google.cloud.bigquery.Table`]
+ BigQuery table objects to create for the dataset.
+ """
+ ...
+
+ @abstractmethod
+ def create_views(
+ self,
+ config: PpdbBigQueryConfig,
+ converter: FelisConverter,
+ ) -> list[bigquery.Table]:
+ """Create BigQuery views for a dataset type.
+
+ Parameters
+ ----------
+ config
+ The PPDB BigQuery configuration containing dataset names and other
+ settings.
+ converter
+ A FelisConverter initialized with the PPDB schema for converting
+ Felis tables to BigQuery tables, which may be needed to define the
+ view queries.
+
+ Returns
+ -------
+ `list` [`google.cloud.bigquery.Table`]
+ BigQuery view objects to create for the dataset.
+ """
+ ...
+
+
+class StagingDatasetBuilder(DatasetBuilder):
+ """Builder for the staging dataset type."""
+
+ dataset_type = DatasetType.STAGING
+
+ def create_tables(
+ self,
+ config: PpdbBigQueryConfig,
+ converter: FelisConverter,
+ ) -> list[bigquery.Table]:
+ """Create BigQuery tables for the staging dataset type."""
+ # Convert the base DIA tables.
+ dia_object_table, dia_source_table, dia_forced_source_table = converter.convert_tables(
+ _DIA_TABLES,
+ dataset_fqn=config.fqn_for(self.dataset_type),
+ )
+ tables = [dia_object_table, dia_source_table, dia_forced_source_table]
+
+ # Add the apdb_replica_chunk field to every staging table.
+ for table in tables:
+ schema_fields = list(table.schema)
+ schema_fields.append(
+ bigquery.SchemaField(
+ "apdb_replica_chunk",
+ "INT64",
+ mode="REQUIRED",
+ description="APDB replica chunk to which this row belongs.",
+ )
+ )
+ table.schema = schema_fields
+
+ return tables
+
+ def create_views(
+ self,
+ config: PpdbBigQueryConfig,
+ converter: FelisConverter,
+ ) -> list[bigquery.Table]:
+ return []
+
+
+class InternalDatasetBuilder(DatasetBuilder):
+ """Builder for the internal dataset type."""
+
+ dataset_type = DatasetType.INTERNAL
+
+ def create_tables(
+ self,
+ config: PpdbBigQueryConfig,
+ converter: FelisConverter,
+ ) -> list[bigquery.Table]:
+ """Create BigQuery tables for the internal dataset type."""
+ # Convert the base DIA tables.
+ dia_object_table, dia_source_table, dia_forced_source_table = converter.convert_tables(
+ _DIA_TABLES,
+ dataset_fqn=config.fqn_for(self.dataset_type),
+ )
+ tables = [dia_object_table, dia_source_table, dia_forced_source_table]
+
+ # Add an internal geography column for spatial query optimization to
+ # every internal table and set clustering on that column.
+ for table in tables:
+ schema_fields = list(table.schema)
+ schema_fields.append(
+ bigquery.SchemaField(
+ "geo_point",
+ "GEOGRAPHY",
+ mode="REQUIRED",
+ description="Internal geography column for optimization of spatial queries.",
+ )
+ )
+ table.schema = schema_fields
+ table.clustering_fields = ["geo_point"]
+
+ return tables
+
+ def create_views(
+ self,
+ config: PpdbBigQueryConfig,
+ converter: FelisConverter,
+ ) -> list[bigquery.Table]:
+ return []
+
+
+class PublicDatasetBuilder(DatasetBuilder):
+ """Builder for the public dataset type."""
+
+ dataset_type = DatasetType.PUBLIC
+
+ def create_tables(
+ self,
+ config: PpdbBigQueryConfig,
+ converter: FelisConverter,
+ ) -> list[bigquery.Table]:
+ """Create BigQuery tables for the public dataset type.
+
+ Notes
+ -----
+ DiaObject is defined using a table rather than a view, because using
+ ``validityEndMjdTai IS NULL`` as a filter in a view definition would be
+ inefficient. A materialized view would be cumbersome, as its references
+ would be invalidated by table swaps during the promotion process.
+ """
+ (dia_object_table,) = converter.convert_tables(
+ [ApdbTables.DiaObject.value],
+ dataset_fqn=config.fqn_for(self.dataset_type),
+ )
+
+ # Omit the validityEndMjdTai column from the table definition, as it
+ # will always be null.
+ dia_object_table.schema = [
+ field for field in dia_object_table.schema if field.name != "validityEndMjdTai"
+ ]
+
+ return [dia_object_table]
+
+ def create_views(
+ self,
+ config: PpdbBigQueryConfig,
+ converter: FelisConverter,
+ ) -> list[bigquery.Table]:
+ """Create BigQuery views for the public dataset type."""
+ public_dataset_fqn = config.fqn_for(self.dataset_type)
+ internal_dataset_fqn = config.fqn_for(DatasetType.INTERNAL)
+
+ return [
+ self._create_simple_view(name, public_dataset_fqn, internal_dataset_fqn, converter)
+ for name in (ApdbTables.DiaSource.value, ApdbTables.DiaForcedSource.value)
+ ]
+
+ @staticmethod
+ def _create_simple_view(
+ table_name: str,
+ target_dataset_fqn: str,
+ source_dataset_fqn: str,
+ converter: FelisConverter,
+ ) -> bigquery.Table:
+ """Create a view selecting all Felis-defined columns from a source
+ table.
+ """
+ column_list = converter.to_bigquery_column_list(converter.find_table(table_name))
+ view = bigquery.Table(f"{target_dataset_fqn}.{table_name}")
+ view._properties["id"] = f"{target_dataset_fqn}.{table_name}"
+ view.view_query = f"SELECT {column_list} FROM `{source_dataset_fqn}.{table_name}`"
+ return view
+
+
+class DatasetBuildManager:
+ """Manage dataset builders and dispatch build operations by dataset type.
+
+ Parameters
+ ----------
+ config
+ The PPDB BigQuery configuration containing dataset names and other
+ settings.
+ schema
+ The Felis schema object containing table definitions.
+ dataset_types
+ Optional list of dataset types to build from the config. If
+ None, all supported datasets in the config will be built.
+ dry_run
+ If True, build BigQuery objects but do not execute operations to create
+ them.
+ exists_ok
+ If True, do not fail if a BigQuery table or view already exists; skip
+ creating it.
+ configure_authorized_views
+ If True, configure internal dataset access entries for public
+ authorized views after build.
+ """
+
+ _BUILDER_TYPES: Mapping[DatasetType, type[DatasetBuilder]] = MappingProxyType(
+ {
+ StagingDatasetBuilder.dataset_type: StagingDatasetBuilder,
+ InternalDatasetBuilder.dataset_type: InternalDatasetBuilder,
+ PublicDatasetBuilder.dataset_type: PublicDatasetBuilder,
+ }
+ )
+
+ def __init__(
+ self,
+ config: PpdbBigQueryConfig,
+ schema: Schema,
+ dataset_types: list[DatasetType] | None = None,
+ dry_run: bool = False,
+ exists_ok: bool = False,
+ configure_authorized_views: bool = False,
+ ) -> None:
+ self._config = config
+ self._schema = schema
+ self._dry_run = dry_run
+ self._exists_ok = exists_ok
+ self._configure_authorized_views_enabled = configure_authorized_views
+
+ # Initialize the builders for all supported dataset types.
+ self._builders: dict[DatasetType, DatasetBuilder] = {
+ dataset_type: builder_type() for dataset_type, builder_type in self._BUILDER_TYPES.items()
+ }
+
+ # Set the dataset types to build from the input list or default to all
+ # builders if not specified.
+ self._dataset_types: tuple[DatasetType, ...] = (
+ tuple(dataset_types) if dataset_types else tuple(self._builders)
+ )
+
+ # Initialize BigQuery client early in non-dry-run mode so any
+ # credential/configuration errors fail fast.
+ self._client: bigquery.Client | None = None
+ if not self._dry_run:
+ try:
+ self._client = bigquery.Client(project=self._config.project_id)
+ except Exception as e:
+ raise DatasetBuilderError(f"Failed to initialize BigQuery client: {e}") from e
+
+ def build_datasets(self) -> None:
+ """Create BigQuery objects for the datasets."""
+ converter = FelisConverter(schema=self._schema)
+ public_views: list[bigquery.Table] = []
+
+ for dataset_type in self._dataset_types:
+ builder = self._builders[dataset_type]
+
+ # Build the tables.
+ _LOG.info("Building tables for %s", dataset_type.value)
+ tables = builder.create_tables(config=self._config, converter=converter)
+ self._create_resources(
+ resources=tables,
+ resource_kind="table",
+ )
+
+ # Build the views.
+ _LOG.info("Building views for %s", dataset_type.value)
+ views = builder.create_views(config=self._config, converter=converter)
+ self._create_resources(
+ resources=views,
+ resource_kind="view",
+ )
+ if dataset_type is DatasetType.PUBLIC:
+ public_views.extend(views)
+
+ # After all datasets are built, optionally configure authorized view
+ # entries for the public views on the internal dataset.
+ if self._configure_authorized_views_enabled:
+ self._configure_authorized_views(public_views)
+
+ def _create_resources(
+ self,
+ resources: list[bigquery.Table],
+ resource_kind: str,
+ ) -> None:
+ """Create BigQuery resources.
+
+ Parameters
+ ----------
+ resources
+ List of BigQuery table or view objects to create.
+ resource_kind
+ String label for the kind of resource being created, used in log
+ messages.
+ """
+ if not resources:
+ _LOG.info("No %ss to create", resource_kind)
+ return
+
+ if not self._dry_run and self._client is None:
+ raise DatasetBuilderError("BigQuery client is not initialized.")
+
+ for resource in resources:
+ if self._dry_run:
+ _LOG.info("Dry run would create %s: %s", resource_kind, resource.to_api_repr())
+ continue
+ _LOG.info("Creating %s: %s", resource_kind, resource.to_api_repr())
+ try:
+ self._client.create_table(resource, exists_ok=self._exists_ok)
+ except Conflict as e:
+ raise DatasetBuilderError(
+ f"{resource_kind.capitalize()} {resource.table_id!r} already exists."
+ ) from e
+ _LOG.info("Created %s: %s", resource_kind, resource.table_id)
+
+ def _configure_authorized_views(self, public_views: list[bigquery.Table]) -> None:
+ """Configure authorized view access entries on the internal dataset for
+ the public views.
+
+ Parameters
+ ----------
+ public_views
+ List of public view objects to configure as authorized views on the
+ internal dataset.
+
+ Notes
+ -----
+ Managed view entries are owned by this tool and are reconciled to the
+ current desired state on every run. Existing non-view entries and
+ unrelated view entries are retained.
+ """
+ if not public_views:
+ _LOG.info("No public views found for authorized-view configuration")
+ return
+
+ if self._dry_run:
+ for view in public_views:
+ _LOG.info(
+ "Dry run would authorize view %s for dataset %s",
+ view.full_table_id,
+ self._config.fqn_for(DatasetType.INTERNAL),
+ )
+ return
+
+ if self._client is None:
+ raise DatasetBuilderError("BigQuery client is not initialized.")
+
+ internal_dataset_fqn = self._config.fqn_for(DatasetType.INTERNAL)
+ internal_dataset = self._client.get_dataset(internal_dataset_fqn)
+
+ # Build a set of public view references managed by this tool.
+ managed_view_refs: set[tuple[str, str, str]] = set()
+ for view in public_views:
+ managed_view_refs.add((view.project, view.dataset_id, view.table_id))
+
+ # Keep any entries that are not managed by this tool.
+ retained_entries: list[bigquery.AccessEntry] = []
+ for entry in internal_dataset.access_entries:
+ # Not a view entry, so keep it.
+ if entry.entity_type != "view":
+ retained_entries.append(entry)
+ continue
+
+ # If an existing view entry is not in the managed set, keep it.
+ existing_view_ref = (
+ entry.entity_id.get("projectId"),
+ entry.entity_id.get("datasetId"),
+ entry.entity_id.get("tableId"),
+ )
+ if existing_view_ref not in managed_view_refs:
+ retained_entries.append(entry)
+
+ # Rebuild managed entries from the current set of public views.
+ managed_view_entries: list[bigquery.AccessEntry] = []
+ for project_id, dataset_id, table_id in sorted(managed_view_refs):
+ managed_view_entries.append(
+ bigquery.AccessEntry(
+ role=None,
+ entity_type="view",
+ entity_id={
+ "projectId": project_id,
+ "datasetId": dataset_id,
+ "tableId": table_id,
+ },
+ )
+ )
+
+ # Combine the retained and managed entries and update the dataset.
+ internal_dataset.access_entries = retained_entries + managed_view_entries
+ self._client.update_dataset(internal_dataset, ["access_entries"])
+ _LOG.info(
+ "Configured %d authorized PPDB view entries on dataset %s",
+ len(managed_view_entries),
+ internal_dataset_fqn,
+ )
diff --git a/python/lsst/dax/ppdb/bigquery/schema/felis_converter.py b/python/lsst/dax/ppdb/bigquery/schema/felis_converter.py
new file mode 100644
index 00000000..eb95dbe2
--- /dev/null
+++ b/python/lsst/dax/ppdb/bigquery/schema/felis_converter.py
@@ -0,0 +1,205 @@
+# This file is part of dax_ppdb
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from __future__ import annotations
+
+__all__ = [
+ "FelisConverter",
+ "FelisConverterError",
+]
+
+from collections.abc import Sequence
+from typing import ClassVar
+
+from felis import Schema
+from felis.datamodel import DataType, Table
+from google.cloud import bigquery
+
+
+class FelisConverterError(RuntimeError):
+ """Raised when Felis to BigQuery conversion fails."""
+
+
+class FelisConverter:
+ """Convert Felis objects to BigQuery.
+
+ Parameters
+ ----------
+ schema
+ Felis schema containing database objects to convert.
+ """
+
+ _TYPE_MAP: ClassVar[dict[DataType, str]] = {
+ DataType.boolean: "BOOL",
+ DataType.byte: "INT64",
+ DataType.short: "INT64",
+ DataType.int: "INT64",
+ DataType.long: "INT64",
+ DataType.float: "FLOAT64",
+ DataType.double: "FLOAT64",
+ DataType.char: "STRING",
+ DataType.string: "STRING",
+ DataType.unicode: "STRING",
+ DataType.text: "STRING",
+ DataType.binary: "BYTES",
+ DataType.timestamp: "TIMESTAMP",
+ }
+
+ def __init__(self, schema: Schema) -> None:
+ self._tables_by_name: dict[str, Table] = {table.name: table for table in schema.tables}
+
+ def find_table(self, name: str) -> Table:
+ """Find a Felis table by name.
+
+ Parameters
+ ----------
+ name
+ Name of the Felis table.
+
+ Returns
+ -------
+ `felis.datamodel.Table`
+ Matching Felis table.
+
+ Raises
+ ------
+ FelisConverterError
+ Raised if the table is not present in the schema.
+ """
+ table = self._tables_by_name.get(name)
+ if table is None:
+ raise FelisConverterError(f"Table {name!r} not found in Felis schema")
+ return table
+
+ def convert_tables(self, names: Sequence[str], dataset_fqn: str) -> list[bigquery.Table]:
+ """Create BigQuery table objects for selected Felis table names.
+
+ Parameters
+ ----------
+ names
+ Sequence of Felis table names to convert.
+ dataset_fqn
+ Fully qualified BigQuery dataset name (project.dataset) used for
+ table references.
+
+ Returns
+ -------
+ list[`google.cloud.bigquery.Table`]
+ Converted BigQuery table objects in the same order as ``names``.
+
+ """
+ return [self._to_bigquery_table(self.find_table(name), dataset_fqn=dataset_fqn) for name in names]
+
+ def to_bigquery_column_list(self, table: Table) -> str:
+ """Return a comma-separated, backtick-quoted column list.
+
+ Parameters
+ ----------
+ table
+ Felis table object.
+
+ Returns
+ -------
+ str
+ Comma-separated list of column names formatted for BigQuery SQL.
+ """
+ return ", ".join(f"`{column.name}`" for column in table.columns)
+
+ def _to_bigquery_table(self, table: Table, dataset_fqn: str) -> bigquery.Table:
+ """Convert a Felis table to a BigQuery table object.
+
+ Parameters
+ ----------
+ table
+ Felis table definition.
+ dataset_fqn
+ Fully qualified BigQuery dataset name (project.dataset) used for
+ table references.
+
+ Returns
+ -------
+ `google.cloud.bigquery.Table`
+ BigQuery table object, not yet created remotely.
+
+ Raises
+ ------
+ FelisConverterError
+ Raised if any of the table's columns have unsupported datatypes for
+ conversion.
+ """
+ schema_fields: list[bigquery.SchemaField] = []
+ for column in table.columns:
+ try:
+ field_type = self._to_bigquery_type(column.datatype)
+ except FelisConverterError as e:
+ raise FelisConverterError(
+ f"Failed to map datatype for {table.name}.{column.name}: {e}"
+ ) from e
+
+ schema_fields.append(
+ bigquery.SchemaField(
+ name=column.name,
+ field_type=field_type,
+ mode=self._to_bigquery_mode(column.nullable),
+ )
+ )
+
+ table_fqn = f"{dataset_fqn}.{table.name}"
+ return bigquery.Table(table_fqn, schema=schema_fields)
+
+ @classmethod
+ def _to_bigquery_type(cls, datatype: DataType) -> str:
+ """Map a Felis datatype to a BigQuery type string.
+
+ Parameters
+ ----------
+ datatype
+ Felis datatype to map.
+
+ Returns
+ -------
+ str
+ Corresponding BigQuery type string.
+
+ Raises
+ ------
+ FelisConverterError
+ Raised if the datatype is not supported for conversion.
+ """
+ if datatype in cls._TYPE_MAP:
+ return cls._TYPE_MAP[datatype]
+ raise FelisConverterError(f"Unsupported Felis type {datatype!r}")
+
+ @staticmethod
+ def _to_bigquery_mode(nullable: bool) -> str:
+ """Return the BigQuery mode string for a nullable column.
+
+ Parameters
+ ----------
+ nullable
+ Whether the column is nullable.
+
+ Returns
+ -------
+ str
+ "NULLABLE" if the column is nullable, otherwise "REQUIRED".
+ """
+ return "NULLABLE" if nullable else "REQUIRED"
diff --git a/python/lsst/dax/ppdb/cli/ppdb_cli.py b/python/lsst/dax/ppdb/cli/ppdb_cli.py
index 60a75a9c..2366b89c 100644
--- a/python/lsst/dax/ppdb/cli/ppdb_cli.py
+++ b/python/lsst/dax/ppdb/cli/ppdb_cli.py
@@ -28,6 +28,7 @@
from lsst.dax.apdb.cli.logging_cli import LoggingCli
from .. import scripts
+from ..bigquery.ppdb_bigquery_config import DatasetType
from . import options
@@ -39,6 +40,7 @@ def main() -> None:
subparsers = parser.add_subparsers(title="available subcommands", required=True)
_create_sql_subcommand(subparsers)
_create_bigquery(subparsers)
+ _build_bigquery_dataset(subparsers)
args = parser.parse_args()
log_cli.process_args(args)
@@ -77,3 +79,40 @@ def _create_bigquery(subparsers: argparse._SubParsersAction) -> None:
options.felis_schema_options(parser)
options.bigquery_options(parser)
parser.set_defaults(method=scripts.create_bigquery)
+
+
+def _build_bigquery_dataset(subparsers: argparse._SubParsersAction) -> None:
+ parser = subparsers.add_parser(
+ "build-bq-datasets", help="Build one or more BigQuery datasets for the PPDB from a Felis schema file."
+ )
+ parser.add_argument("--config", help="URI to the PPDB configuration file.", required=True)
+ parser.add_argument(
+ "--dry-run",
+ help="Build BigQuery objects but do not execute operations to create them.",
+ default=False,
+ action="store_true",
+ )
+ parser.add_argument(
+ "--exists-ok",
+ help="Do not fail if a BigQuery table or view already exists; skip creating it.",
+ default=False,
+ action="store_true",
+ )
+ parser.add_argument(
+ "--configure-authorized-views",
+ help="Configure internal dataset access entries for public authorized views after build.",
+ default=False,
+ action="store_true",
+ )
+ parser.add_argument(
+ "datasets",
+ nargs="*",
+ choices=[dataset_type.value for dataset_type in DatasetType],
+ help=(
+ "Key(s) of the BigQuery dataset(s) to create. Valid keys are: "
+ f"{', '.join(dataset_type.value for dataset_type in DatasetType)}. If not provided, all "
+ "datasets will be created."
+ ),
+ )
+
+ parser.set_defaults(method=scripts.build_bigquery_datasets)
diff --git a/python/lsst/dax/ppdb/scripts/__init__.py b/python/lsst/dax/ppdb/scripts/__init__.py
index bfce287b..cb91e4e7 100644
--- a/python/lsst/dax/ppdb/scripts/__init__.py
+++ b/python/lsst/dax/ppdb/scripts/__init__.py
@@ -19,6 +19,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
+from .build_bigquery_datasets import build_bigquery_datasets
from .create_bigquery import create_bigquery
from .create_sql import create_sql
from .replication_list_chunks_apdb import replication_list_chunks_apdb
diff --git a/python/lsst/dax/ppdb/scripts/build_bigquery_datasets.py b/python/lsst/dax/ppdb/scripts/build_bigquery_datasets.py
new file mode 100644
index 00000000..9049ad51
--- /dev/null
+++ b/python/lsst/dax/ppdb/scripts/build_bigquery_datasets.py
@@ -0,0 +1,72 @@
+# This file is part of dax_ppdb
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from felis import Schema
+
+from ..bigquery import PpdbBigQueryConfig
+from ..bigquery.ppdb_bigquery_config import DatasetType
+from ..bigquery.schema import DatasetBuildManager
+from ..ppdb_config import PpdbConfig
+
+
+def build_bigquery_datasets(
+ config: str,
+ datasets: list[str],
+ dry_run: bool = False,
+ exists_ok: bool = False,
+ configure_authorized_views: bool = False,
+) -> None:
+ """Build BigQuery datasets for the PPDB."""
+ # Load the PPDB config.
+ try:
+ ppdb_config = PpdbConfig.from_uri(config)
+ except Exception as e:
+ raise RuntimeError(f"Failed to load configuration from {config}: {e}") from e
+
+ # Check the config type.
+ if not isinstance(ppdb_config, PpdbBigQueryConfig):
+ raise TypeError(
+ f"Configuration loaded from '{config}' has wrong type, "
+ f"expected PpdbBigQueryConfig, got: {type(ppdb_config)}"
+ )
+
+ # Load the Felis schema from the URI in the config.
+ try:
+ schema = Schema.from_uri(ppdb_config.felis_schema_uri, context={"id_generation": True})
+ except Exception as e:
+ raise RuntimeError(f"Failed to load schema from {ppdb_config.felis_schema_uri}: {e}") from e
+
+ # Build the requested datasets.
+ try:
+ dataset_types = [DatasetType(dataset) for dataset in datasets]
+ except ValueError as e:
+ valid = ", ".join(dataset_type.value for dataset_type in DatasetType)
+ raise RuntimeError(f"Invalid dataset key in {datasets!r}. Valid keys are: {valid}.") from e
+
+ build_manager = DatasetBuildManager(
+ ppdb_config,
+ schema,
+ dataset_types or None,
+ dry_run=dry_run,
+ exists_ok=exists_ok,
+ configure_authorized_views=configure_authorized_views,
+ )
+ build_manager.build_datasets()
diff --git a/requirements.txt b/requirements.txt
index b167d759..abc38c3b 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,4 +10,4 @@ lsst-utils @ git+https://github.com/lsst/utils@main
lsst-resources[s3] @ git+https://github.com/lsst/resources@main
lsst-felis @ git+https://github.com/lsst/felis@main
lsst-dax-ppdbx-gcp @ git+https://github.com/lsst-dm/dax_ppdbx_gcp@main
-lsst-sdm-schemas @ git+https://github.com/lsst/sdm_schemas@main
+lsst-sdm-schemas @ git+https://github.com/lsst/sdm_schemas@tickets/DM-51914
diff --git a/tests/test_dataset_builder.py b/tests/test_dataset_builder.py
new file mode 100644
index 00000000..31a58c7d
--- /dev/null
+++ b/tests/test_dataset_builder.py
@@ -0,0 +1,405 @@
+# This file is part of dax_ppdb.
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (http://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from __future__ import annotations
+
+import unittest
+import uuid
+from unittest.mock import Mock, patch
+
+from felis import Schema
+from google.api_core.exceptions import Conflict
+from google.cloud import bigquery
+
+from lsst.dax.apdb import ApdbTables
+from lsst.dax.ppdb.bigquery.ppdb_bigquery_config import (
+ DEFAULT_FELIS_SCHEMA_URI,
+ Datasets,
+ DatasetType,
+ PpdbBigQueryConfig,
+)
+from lsst.dax.ppdb.bigquery.schema.dataset_builder import (
+ DatasetBuilder,
+ DatasetBuilderError,
+ DatasetBuildManager,
+ InternalDatasetBuilder,
+ PublicDatasetBuilder,
+ StagingDatasetBuilder,
+)
+from lsst.dax.ppdb.bigquery.schema.felis_converter import FelisConverter
+from lsst.dax.ppdb.sql import PpdbSqlBaseConfig
+from lsst.dax.ppdb.tests._bigquery import have_valid_google_credentials
+
+
+def _load_test_schema() -> Schema:
+ return Schema.from_uri(DEFAULT_FELIS_SCHEMA_URI, context={"id_generation": True})
+
+
+def _make_test_config(project_id: str, datasets: Datasets) -> PpdbBigQueryConfig:
+ return PpdbBigQueryConfig(
+ project_id=project_id,
+ dataset_id="test_dataset_builder",
+ bucket_name="test-bucket",
+ object_prefix="data/test",
+ replication_dir="/tmp",
+ sql=PpdbSqlBaseConfig(db_url="sqlite:///:memory:"),
+ datasets=datasets,
+ )
+
+
+class _InvalidViewBuilder(DatasetBuilder):
+ """Builder used to exercise manager validation of malformed views."""
+
+ dataset_type = DatasetType.PUBLIC
+
+ def create_tables(self, config: PpdbBigQueryConfig, converter: FelisConverter) -> list[bigquery.Table]:
+ return []
+
+ def create_views(self, config: PpdbBigQueryConfig, converter: FelisConverter) -> list[bigquery.Table]:
+ return [bigquery.Table(f"{config.fqn_for(DatasetType.PUBLIC)}.BrokenView")]
+
+
+class DatasetBuilderTestCase(unittest.TestCase):
+ """Tests for dataset builder object generation and manager behavior."""
+
+ def setUp(self) -> None:
+ self.schema = _load_test_schema()
+ self.converter = FelisConverter(self.schema)
+ self.config = _make_test_config(
+ project_id="test-project",
+ datasets=Datasets(
+ internal="test_dataset_builder_internal",
+ public="test_dataset_builder_public",
+ staging="test_dataset_builder_staging",
+ ),
+ )
+
+ def test_public_builder_creates_diaobject_without_validity_end(self) -> None:
+ builder = PublicDatasetBuilder()
+
+ tables = builder.create_tables(
+ config=self.config,
+ converter=self.converter,
+ )
+
+ self.assertEqual(len(tables), 1)
+ table = tables[0]
+ self.assertEqual(table.table_id, ApdbTables.DiaObject.value)
+
+ field_names = [field.name for field in table.schema]
+ self.assertIn("diaObjectId", field_names)
+ self.assertNotIn("validityEndMjdTai", field_names)
+
+ source_table = self.converter.find_table(ApdbTables.DiaObject.value)
+ self.assertEqual(len(field_names), len(source_table.columns) - 1)
+
+ def test_public_builder_creates_explicit_views(self) -> None:
+ builder = PublicDatasetBuilder()
+ public_dataset_fqn = self.config.fqn_for(DatasetType.PUBLIC)
+ internal_dataset_fqn = self.config.fqn_for(DatasetType.INTERNAL)
+
+ views = builder.create_views(config=self.config, converter=self.converter)
+
+ self.assertEqual([view.table_id for view in views], ["DiaSource", "DiaForcedSource"])
+ for view_name, view in zip(("DiaSource", "DiaForcedSource"), views, strict=True):
+ assert view.view_query is not None
+ self.assertNotIn("SELECT *", view.view_query)
+ self.assertIn(f"FROM `{internal_dataset_fqn}.{view_name}`", view.view_query)
+ expected_columns = self.converter.to_bigquery_column_list(self.converter.find_table(view_name))
+ self.assertEqual(
+ view.view_query,
+ f"SELECT {expected_columns} FROM `{internal_dataset_fqn}.{view_name}`",
+ )
+ self.assertEqual(view.full_table_id, f"{public_dataset_fqn}.{view_name}")
+
+ def test_internal_builder_adds_geo_point_and_clustering(self) -> None:
+ builder = InternalDatasetBuilder()
+
+ tables = builder.create_tables(
+ config=self.config,
+ converter=self.converter,
+ )
+
+ self.assertEqual({table.table_id for table in tables}, {"DiaObject", "DiaSource", "DiaForcedSource"})
+ for table in tables:
+ geo_point = next(field for field in table.schema if field.name == "geo_point")
+ self.assertEqual(geo_point.field_type, "GEOGRAPHY")
+ self.assertEqual(geo_point.mode, "REQUIRED")
+ self.assertEqual(table.clustering_fields, ["geo_point"])
+
+ def test_staging_builder_adds_replica_chunk(self) -> None:
+ builder = StagingDatasetBuilder()
+
+ tables = builder.create_tables(
+ config=self.config,
+ converter=self.converter,
+ )
+
+ self.assertEqual({table.table_id for table in tables}, {"DiaObject", "DiaSource", "DiaForcedSource"})
+ for table in tables:
+ chunk_field = next(field for field in table.schema if field.name == "apdb_replica_chunk")
+ self.assertEqual(chunk_field.field_type, "INT64")
+ self.assertEqual(chunk_field.mode, "REQUIRED")
+ self.assertNotIn("geo_point", [field.name for field in table.schema])
+
+ def test_manager_dry_run_does_not_initialize_client(self) -> None:
+ with patch("lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client") as client_type:
+ manager = DatasetBuildManager(config=self.config, schema=self.schema, dry_run=True)
+ manager.build_datasets()
+
+ client_type.assert_not_called()
+
+ def test_manager_raises_on_client_init_failure(self) -> None:
+ with patch(
+ "lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client",
+ side_effect=RuntimeError("boom"),
+ ):
+ with self.assertRaisesRegex(DatasetBuilderError, r"Failed to initialize BigQuery client"):
+ DatasetBuildManager(config=self.config, schema=self.schema)
+
+ def test_manager_passes_exists_ok_when_creating_resources(self) -> None:
+ with patch("lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client"):
+ manager = DatasetBuildManager(
+ config=self.config,
+ schema=self.schema,
+ dataset_types=[DatasetType.PUBLIC],
+ exists_ok=True,
+ )
+ client = Mock()
+ manager._client = client
+ table = bigquery.Table(f"{self.config.project_id}.{self.config.datasets.public}.DiaObject")
+
+ manager._create_resources([table], "table")
+
+ client.create_table.assert_called_once_with(table, exists_ok=True)
+
+ def test_manager_wraps_conflict_as_builder_error(self) -> None:
+ with patch("lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client"):
+ manager = DatasetBuildManager(
+ config=self.config,
+ schema=self.schema,
+ dataset_types=[DatasetType.PUBLIC],
+ )
+ client = Mock()
+ manager._client = client
+ client.create_table.side_effect = Conflict("already exists")
+ table = bigquery.Table(f"{self.config.project_id}.{self.config.datasets.public}.DiaObject")
+
+ with self.assertRaises(DatasetBuilderError) as cm:
+ manager._create_resources([table], "table")
+
+ self.assertIn("already exists", str(cm.exception))
+
+ def test_manager_allows_view_without_query_in_dry_run(self) -> None:
+ manager = DatasetBuildManager(
+ config=self.config,
+ schema=self.schema,
+ dataset_types=[DatasetType.PUBLIC],
+ dry_run=True,
+ )
+ manager._builders[DatasetType.PUBLIC] = _InvalidViewBuilder()
+
+ manager.build_datasets()
+
+ def test_configure_authorized_views_preserves_unrelated_view_permissions(self) -> None:
+ with patch("lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client"):
+ manager = DatasetBuildManager(
+ config=self.config,
+ schema=self.schema,
+ dataset_types=[DatasetType.PUBLIC],
+ configure_authorized_views=True,
+ )
+
+ non_view_entry = bigquery.AccessEntry(
+ role="READER",
+ entity_type="userByEmail",
+ entity_id="reader@example.org",
+ )
+ managed_existing_entry = bigquery.AccessEntry(
+ role=None,
+ entity_type="view",
+ entity_id={
+ "projectId": self.config.project_id,
+ "datasetId": self.config.datasets.public,
+ "tableId": ApdbTables.DiaSource.value,
+ },
+ )
+ unrelated_view_entry = bigquery.AccessEntry(
+ role=None,
+ entity_type="view",
+ entity_id={
+ "projectId": "unrelated-project",
+ "datasetId": "unrelated_dataset",
+ "tableId": "UnrelatedView",
+ },
+ )
+
+ dataset = Mock()
+ dataset.access_entries = [non_view_entry, managed_existing_entry, unrelated_view_entry]
+
+ client = Mock()
+ client.get_dataset.return_value = dataset
+ manager._client = client
+
+ public_views = [
+ bigquery.Table(f"{self.config.fqn_for(DatasetType.PUBLIC)}.{ApdbTables.DiaSource.value}"),
+ bigquery.Table(f"{self.config.fqn_for(DatasetType.PUBLIC)}.{ApdbTables.DiaForcedSource.value}"),
+ ]
+
+ manager._configure_authorized_views(public_views)
+
+ client.update_dataset.assert_called_once_with(dataset, ["access_entries"])
+
+ updated_entries = dataset.access_entries
+ self.assertIn(non_view_entry, updated_entries)
+ self.assertIn(unrelated_view_entry, updated_entries)
+
+ updated_view_refs = {
+ (
+ entry.entity_id["projectId"],
+ entry.entity_id["datasetId"],
+ entry.entity_id["tableId"],
+ )
+ for entry in updated_entries
+ if entry.entity_type == "view" and isinstance(entry.entity_id, dict)
+ }
+ self.assertEqual(
+ updated_view_refs,
+ {
+ (
+ self.config.project_id,
+ self.config.datasets.public,
+ ApdbTables.DiaSource.value,
+ ),
+ (
+ self.config.project_id,
+ self.config.datasets.public,
+ ApdbTables.DiaForcedSource.value,
+ ),
+ (
+ "unrelated-project",
+ "unrelated_dataset",
+ "UnrelatedView",
+ ),
+ },
+ )
+
+
+@unittest.skipIf(not have_valid_google_credentials(), "Missing valid Google credentials")
+class DatasetBuilderBigQueryTestCase(unittest.TestCase):
+ """Integration tests for creating BigQuery tables and views."""
+
+ def setUp(self) -> None:
+ self.schema = _load_test_schema()
+ self.client = bigquery.Client()
+ self.project_id = self.client.project
+ suffix = uuid.uuid4().hex[:8]
+ self.datasets = Datasets(
+ internal=f"test_dataset_builder_internal_{suffix}",
+ public=f"test_dataset_builder_public_{suffix}",
+ staging=f"test_dataset_builder_staging_{suffix}",
+ )
+ self.config = _make_test_config(project_id=self.project_id, datasets=self.datasets)
+
+ for dataset_type in DatasetType:
+ dataset_name = self.datasets.name_for(dataset_type)
+ dataset = bigquery.Dataset(f"{self.project_id}.{dataset_name}")
+ dataset.default_table_expiration_ms = 3600000
+ self.client.create_dataset(dataset)
+
+ def tearDown(self) -> None:
+ delete_errors: list[str] = []
+ for dataset_type in DatasetType:
+ dataset_name = self.datasets.name_for(dataset_type)
+ dataset_fqn = f"{self.project_id}.{dataset_name}"
+ try:
+ self.client.delete_dataset(dataset_fqn, delete_contents=True, not_found_ok=True)
+ except Exception as e:
+ delete_errors.append(f"{dataset_fqn}: {type(e).__name__}: {e}")
+
+ if delete_errors:
+ self.fail(
+ "Failed to delete one or more BigQuery datasets in tearDown:\n"
+ + "\n".join(delete_errors)
+ )
+
+ def test_build_datasets_creates_expected_bigquery_objects(self) -> None:
+ manager = DatasetBuildManager(config=self.config, schema=self.schema)
+
+ manager.build_datasets()
+
+ internal_tables = list(self.client.list_tables(f"{self.project_id}.{self.datasets.internal}"))
+ public_tables = list(self.client.list_tables(f"{self.project_id}.{self.datasets.public}"))
+ staging_tables = list(self.client.list_tables(f"{self.project_id}.{self.datasets.staging}"))
+
+ self.assertEqual(len(internal_tables), 3)
+ self.assertEqual(len(public_tables), 3)
+ self.assertEqual(len(staging_tables), 3)
+
+ internal_dia_object = self.client.get_table(
+ f"{self.project_id}.{self.datasets.internal}.{ApdbTables.DiaObject.value}"
+ )
+ internal_geo_point = next(field for field in internal_dia_object.schema if field.name == "geo_point")
+ self.assertEqual(internal_geo_point.field_type, "GEOGRAPHY")
+ self.assertEqual(internal_geo_point.mode, "REQUIRED")
+ self.assertEqual(internal_dia_object.clustering_fields, ["geo_point"])
+
+ public_dia_object = self.client.get_table(
+ f"{self.project_id}.{self.datasets.public}.{ApdbTables.DiaObject.value}"
+ )
+ self.assertEqual(public_dia_object.table_type, "TABLE")
+ self.assertNotIn("validityEndMjdTai", [field.name for field in public_dia_object.schema])
+
+ public_dia_source = self.client.get_table(
+ f"{self.project_id}.{self.datasets.public}.{ApdbTables.DiaSource.value}"
+ )
+ self.assertEqual(public_dia_source.table_type, "VIEW")
+ assert public_dia_source.view_query is not None
+ self.assertNotIn("SELECT *", public_dia_source.view_query)
+ self.assertIn(
+ f"FROM `{self.project_id}.{self.datasets.internal}.{ApdbTables.DiaSource.value}`",
+ public_dia_source.view_query,
+ )
+
+ staging_dia_source = self.client.get_table(
+ f"{self.project_id}.{self.datasets.staging}.{ApdbTables.DiaSource.value}"
+ )
+ chunk_field = next(field for field in staging_dia_source.schema if field.name == "apdb_replica_chunk")
+ self.assertEqual(chunk_field.field_type, "INTEGER")
+ self.assertEqual(chunk_field.mode, "REQUIRED")
+
+ def test_build_datasets_exists_ok_allows_repeated_creation(self) -> None:
+ manager = DatasetBuildManager(config=self.config, schema=self.schema, exists_ok=True)
+
+ manager.build_datasets()
+ manager.build_datasets()
+
+ def test_build_datasets_without_exists_ok_fails_on_repeated_creation(self) -> None:
+ manager = DatasetBuildManager(config=self.config, schema=self.schema)
+
+ manager.build_datasets()
+
+ with self.assertRaisesRegex(DatasetBuilderError, r"already exists"):
+ manager.build_datasets()
+
+
+if __name__ == "__main__":
+ unittest.main()