From 08bb87a255a8639327e05cf6cf49acb240cd9454 Mon Sep 17 00:00:00 2001 From: Jeremy McCormick Date: Fri, 29 May 2026 14:54:09 -0500 Subject: [PATCH 1/3] DO NOT MERGE: Use sdm_schemas branch for development --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b167d759..abc38c3b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,4 @@ lsst-utils @ git+https://github.com/lsst/utils@main lsst-resources[s3] @ git+https://github.com/lsst/resources@main lsst-felis @ git+https://github.com/lsst/felis@main lsst-dax-ppdbx-gcp @ git+https://github.com/lsst-dm/dax_ppdbx_gcp@main -lsst-sdm-schemas @ git+https://github.com/lsst/sdm_schemas@main +lsst-sdm-schemas @ git+https://github.com/lsst/sdm_schemas@tickets/DM-51914 From 5e0d83c1b62033b7803120e5517739c29787cf1d Mon Sep 17 00:00:00 2001 From: Jeremy McCormick Date: Fri, 29 May 2026 11:46:22 -0700 Subject: [PATCH 2/3] Move BigQuery config class to separate module --- .../lsst/dax/ppdb/bigquery/ppdb_bigquery.py | 49 +------------ .../dax/ppdb/bigquery/ppdb_bigquery_config.py | 72 +++++++++++++++++++ 2 files changed, 73 insertions(+), 48 deletions(-) create mode 100644 python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py diff --git a/python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py index d9a26bf4..ad210f61 100644 --- a/python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py +++ b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py @@ -49,9 +49,9 @@ from .._arrow import write_parquet from ..ppdb import Ppdb, PpdbReplicaChunk -from ..ppdb_config import PpdbConfig from ..sql import PasswordProvider, PpdbSqlBase, PpdbSqlBaseConfig from .manifest import Manifest, TableStats +from .ppdb_bigquery_config import PpdbBigQueryConfig from .ppdb_replica_chunk_extended import ChunkStatus, PpdbReplicaChunkExtended from .updates.update_records import UpdateRecords @@ -66,53 +66,6 @@ """ -class PpdbBigQueryConfig(PpdbConfig): - """Configuration for BigQuery-based PPDB.""" - - project_id: str - """Google Cloud project ID.""" - - dataset_id: str - """Target BigQuery dataset ID, without the project.""" - - bucket_name: str - """Name of Google Cloud Storage bucket for uploading chunks.""" - - object_prefix: str - """Base prefix for the object in cloud storage.""" - - replication_dir: str - """Directory where the exported chunks will be stored.""" - - stage_chunk_topic: str = "stage-chunk-topic" - """Pub/Sub topic name for triggering chunk staging process.""" - - parq_batch_size: int = 10000 - """Number of rows to process in each batch when writing parquet files.""" - - parq_compression: str = "snappy" - """Compression format for Parquet files.""" - - delete_existing_dirs: bool = False - """If `True`, existing directories for chunks will be deleted before - export. If `False`, an error will be raised if the directory already - exists. - """ - - sql: PpdbSqlBaseConfig - """SQL database configuration (`~lsst.dax.ppdb.sql.PpdbSqlBaseConfig`).""" - - @property - def replication_path(self) -> Path: - """Return path for writing replica chunk data (`pathlib.Path`).""" - return Path(self.replication_dir) - - @property - def fq_dataset_id(self) -> str: - """Fully qualified BigQuery dataset ID, including project (`str`).""" - return f"{self.project_id}:{self.dataset_id}" - - class _SecretManagerPasswordProvider(PasswordProvider): """Retrieves a database password from Google Cloud Secret Manager. diff --git a/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py new file mode 100644 index 00000000..a1df35c8 --- /dev/null +++ b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py @@ -0,0 +1,72 @@ + +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from pathlib import Path + +from ..ppdb_config import PpdbConfig +from ..sql import PpdbSqlBaseConfig + +class PpdbBigQueryConfig(PpdbConfig): + """Configuration for BigQuery-based PPDB.""" + + project_id: str + """Google Cloud project ID.""" + + dataset_id: str + """Target BigQuery dataset ID, without the project.""" + + bucket_name: str + """Name of Google Cloud Storage bucket for uploading chunks.""" + + object_prefix: str + """Base prefix for the object in cloud storage.""" + + replication_dir: str + """Directory where the exported chunks will be stored.""" + + stage_chunk_topic: str = "stage-chunk-topic" + """Pub/Sub topic name for triggering chunk staging process.""" + + parq_batch_size: int = 10000 + """Number of rows to process in each batch when writing parquet files.""" + + parq_compression: str = "snappy" + """Compression format for Parquet files.""" + + delete_existing_dirs: bool = False + """If `True`, existing directories for chunks will be deleted before + export. If `False`, an error will be raised if the directory already + exists. + """ + + sql: PpdbSqlBaseConfig + """SQL database configuration (`~lsst.dax.ppdb.sql.PpdbSqlBaseConfig`).""" + + @property + def replication_path(self) -> Path: + """Return path for writing replica chunk data (`pathlib.Path`).""" + return Path(self.replication_dir) + + @property + def fq_dataset_id(self) -> str: + """Fully qualified BigQuery dataset ID, including project (`str`).""" + return f"{self.project_id}:{self.dataset_id}" \ No newline at end of file From 2ff10120783921f70e816647bd1267957cf2bd06 Mon Sep 17 00:00:00 2001 From: Jeremy McCormick Date: Fri, 29 May 2026 11:46:35 -0700 Subject: [PATCH 3/3] Add a command line tool for creating tables and views in BigQuery This converts tables from the PPDB Felis schema into BigQuery, allowing arbitrary changes or extensions, and also provides a hook for creating views from SQL. There is an additional option to configure public view access on the internal dataset. --- .../dax/ppdb/bigquery/ppdb_bigquery_config.py | 77 ++- .../lsst/dax/ppdb/bigquery/schema/__init__.py | 23 + .../ppdb/bigquery/schema/dataset_builder.py | 476 ++++++++++++++++++ .../ppdb/bigquery/schema/felis_converter.py | 205 ++++++++ python/lsst/dax/ppdb/cli/ppdb_cli.py | 39 ++ python/lsst/dax/ppdb/scripts/__init__.py | 1 + .../ppdb/scripts/build_bigquery_datasets.py | 72 +++ tests/test_dataset_builder.py | 405 +++++++++++++++ 8 files changed, 1295 insertions(+), 3 deletions(-) create mode 100644 python/lsst/dax/ppdb/bigquery/schema/__init__.py create mode 100644 python/lsst/dax/ppdb/bigquery/schema/dataset_builder.py create mode 100644 python/lsst/dax/ppdb/bigquery/schema/felis_converter.py create mode 100644 python/lsst/dax/ppdb/scripts/build_bigquery_datasets.py create mode 100644 tests/test_dataset_builder.py diff --git a/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py index a1df35c8..919c7d1b 100644 --- a/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py +++ b/python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py @@ -1,4 +1,3 @@ - # This file is part of dax_ppdb # # Developed for the LSST Data Management System. @@ -20,18 +19,68 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from enum import StrEnum from pathlib import Path +from pydantic import BaseModel + from ..ppdb_config import PpdbConfig from ..sql import PpdbSqlBaseConfig +PPDB_PREFIX = "ppdb" + + +class DatasetType(StrEnum): + """Typed keys for BigQuery dataset categories.""" + + STAGING = "staging" + INTERNAL = "internal" + PUBLIC = "public" + + +class Datasets(BaseModel): + """Mapping of dataset types to their target names in BigQuery.""" + + staging: str = f"{PPDB_PREFIX}_staging" + """Name of the staging dataset used for intermediate storage during + replication and promotion processes.""" + + internal: str = f"{PPDB_PREFIX}_internal" + """Name of the internal dataset containing the fully replicated and + promoted data.""" + + public: str = f"{PPDB_PREFIX}_public" + """Name of the public dataset which is presented through the TAP + interface.""" + + def name_for(self, dataset_type: DatasetType) -> str: + """Return the configured dataset name for a dataset type.""" + match dataset_type: + case DatasetType.INTERNAL: + return self.internal + case DatasetType.PUBLIC: + return self.public + case DatasetType.STAGING: + return self.staging + + # Defensive check in case a bad value was passed at runtime. + raise ValueError(f"Unsupported dataset type: {dataset_type!r}") + + +DEFAULT_FELIS_SCHEMA_URI = "resource://lsst.sdm.schemas/ppdb.yaml" +"""Default URI for the Felis PPDB schema in SDM Schemas.""" + + class PpdbBigQueryConfig(PpdbConfig): """Configuration for BigQuery-based PPDB.""" + felis_schema_uri: str = DEFAULT_FELIS_SCHEMA_URI + """URI for the FELIS schema used in the PPDB.""" + project_id: str """Google Cloud project ID.""" - dataset_id: str + dataset_id: str # TODO: This will need to be deprecated and removed (DM-54681). """Target BigQuery dataset ID, without the project.""" bucket_name: str @@ -61,12 +110,34 @@ class PpdbBigQueryConfig(PpdbConfig): sql: PpdbSqlBaseConfig """SQL database configuration (`~lsst.dax.ppdb.sql.PpdbSqlBaseConfig`).""" + datasets: Datasets = Datasets() + """Names of the BigQuery datasets used for different purposes + (`Datasets`). + """ + @property def replication_path(self) -> Path: """Return path for writing replica chunk data (`pathlib.Path`).""" return Path(self.replication_dir) + # TODO: This will need to be deprecated and removed (DM-54681). @property def fq_dataset_id(self) -> str: """Fully qualified BigQuery dataset ID, including project (`str`).""" - return f"{self.project_id}:{self.dataset_id}" \ No newline at end of file + return f"{self.project_id}:{self.dataset_id}" + + def fqn_for(self, dataset_type: DatasetType) -> str: + """Return the fully qualified BigQuery dataset name for a dataset type. + + Parameters + ---------- + dataset_type + Type of dataset to get the name for. + + Returns + ------- + str + Fully qualified BigQuery dataset name (project.dataset). + """ + dataset_name = self.datasets.name_for(dataset_type) + return f"{self.project_id}.{dataset_name}" diff --git a/python/lsst/dax/ppdb/bigquery/schema/__init__.py b/python/lsst/dax/ppdb/bigquery/schema/__init__.py new file mode 100644 index 00000000..f6afdfc2 --- /dev/null +++ b/python/lsst/dax/ppdb/bigquery/schema/__init__.py @@ -0,0 +1,23 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from .dataset_builder import * +from .felis_converter import * diff --git a/python/lsst/dax/ppdb/bigquery/schema/dataset_builder.py b/python/lsst/dax/ppdb/bigquery/schema/dataset_builder.py new file mode 100644 index 00000000..d66ac0fe --- /dev/null +++ b/python/lsst/dax/ppdb/bigquery/schema/dataset_builder.py @@ -0,0 +1,476 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = [ + "DatasetBuildManager", + "DatasetBuilder", + "DatasetBuilderError", + "InternalDatasetBuilder", + "PublicDatasetBuilder", + "StagingDatasetBuilder", +] + +import logging +from abc import ABC, abstractmethod +from collections.abc import Mapping +from types import MappingProxyType +from typing import ClassVar + +from felis import Schema +from google.api_core.exceptions import Conflict +from google.cloud import bigquery + +from lsst.dax.apdb import ApdbTables + +from ..ppdb_bigquery_config import DatasetType, PpdbBigQueryConfig +from .felis_converter import FelisConverter + +_LOG = logging.getLogger(__name__) + +_DIA_TABLES: tuple[str, ...] = ( + ApdbTables.DiaObject.value, + ApdbTables.DiaSource.value, + ApdbTables.DiaForcedSource.value, +) + + +class DatasetBuilderError(RuntimeError): + """Raised when dataset builder operations fail.""" + + +class DatasetBuilder(ABC): + """Build a specific type of BigQuery dataset for the PPDB.""" + + dataset_type: ClassVar[DatasetType] + """Dataset type this builder handles.""" + + @abstractmethod + def create_tables( + self, + config: PpdbBigQueryConfig, + converter: FelisConverter, + ) -> list[bigquery.Table]: + """Create BigQuery table objects for a dataset type. + + Parameters + ---------- + config + The PPDB BigQuery configuration containing dataset names and other + settings. + converter + A FelisConverter initialized with the PPDB schema for converting + Felis tables to BigQuery tables. + + Returns + ------- + `list` [`google.cloud.bigquery.Table`] + BigQuery table objects to create for the dataset. + """ + ... + + @abstractmethod + def create_views( + self, + config: PpdbBigQueryConfig, + converter: FelisConverter, + ) -> list[bigquery.Table]: + """Create BigQuery views for a dataset type. + + Parameters + ---------- + config + The PPDB BigQuery configuration containing dataset names and other + settings. + converter + A FelisConverter initialized with the PPDB schema for converting + Felis tables to BigQuery tables, which may be needed to define the + view queries. + + Returns + ------- + `list` [`google.cloud.bigquery.Table`] + BigQuery view objects to create for the dataset. + """ + ... + + +class StagingDatasetBuilder(DatasetBuilder): + """Builder for the staging dataset type.""" + + dataset_type = DatasetType.STAGING + + def create_tables( + self, + config: PpdbBigQueryConfig, + converter: FelisConverter, + ) -> list[bigquery.Table]: + """Create BigQuery tables for the staging dataset type.""" + # Convert the base DIA tables. + dia_object_table, dia_source_table, dia_forced_source_table = converter.convert_tables( + _DIA_TABLES, + dataset_fqn=config.fqn_for(self.dataset_type), + ) + tables = [dia_object_table, dia_source_table, dia_forced_source_table] + + # Add the apdb_replica_chunk field to every staging table. + for table in tables: + schema_fields = list(table.schema) + schema_fields.append( + bigquery.SchemaField( + "apdb_replica_chunk", + "INT64", + mode="REQUIRED", + description="APDB replica chunk to which this row belongs.", + ) + ) + table.schema = schema_fields + + return tables + + def create_views( + self, + config: PpdbBigQueryConfig, + converter: FelisConverter, + ) -> list[bigquery.Table]: + return [] + + +class InternalDatasetBuilder(DatasetBuilder): + """Builder for the internal dataset type.""" + + dataset_type = DatasetType.INTERNAL + + def create_tables( + self, + config: PpdbBigQueryConfig, + converter: FelisConverter, + ) -> list[bigquery.Table]: + """Create BigQuery tables for the internal dataset type.""" + # Convert the base DIA tables. + dia_object_table, dia_source_table, dia_forced_source_table = converter.convert_tables( + _DIA_TABLES, + dataset_fqn=config.fqn_for(self.dataset_type), + ) + tables = [dia_object_table, dia_source_table, dia_forced_source_table] + + # Add an internal geography column for spatial query optimization to + # every internal table and set clustering on that column. + for table in tables: + schema_fields = list(table.schema) + schema_fields.append( + bigquery.SchemaField( + "geo_point", + "GEOGRAPHY", + mode="REQUIRED", + description="Internal geography column for optimization of spatial queries.", + ) + ) + table.schema = schema_fields + table.clustering_fields = ["geo_point"] + + return tables + + def create_views( + self, + config: PpdbBigQueryConfig, + converter: FelisConverter, + ) -> list[bigquery.Table]: + return [] + + +class PublicDatasetBuilder(DatasetBuilder): + """Builder for the public dataset type.""" + + dataset_type = DatasetType.PUBLIC + + def create_tables( + self, + config: PpdbBigQueryConfig, + converter: FelisConverter, + ) -> list[bigquery.Table]: + """Create BigQuery tables for the public dataset type. + + Notes + ----- + DiaObject is defined using a table rather than a view, because using + ``validityEndMjdTai IS NULL`` as a filter in a view definition would be + inefficient. A materialized view would be cumbersome, as its references + would be invalidated by table swaps during the promotion process. + """ + (dia_object_table,) = converter.convert_tables( + [ApdbTables.DiaObject.value], + dataset_fqn=config.fqn_for(self.dataset_type), + ) + + # Omit the validityEndMjdTai column from the table definition, as it + # will always be null. + dia_object_table.schema = [ + field for field in dia_object_table.schema if field.name != "validityEndMjdTai" + ] + + return [dia_object_table] + + def create_views( + self, + config: PpdbBigQueryConfig, + converter: FelisConverter, + ) -> list[bigquery.Table]: + """Create BigQuery views for the public dataset type.""" + public_dataset_fqn = config.fqn_for(self.dataset_type) + internal_dataset_fqn = config.fqn_for(DatasetType.INTERNAL) + + return [ + self._create_simple_view(name, public_dataset_fqn, internal_dataset_fqn, converter) + for name in (ApdbTables.DiaSource.value, ApdbTables.DiaForcedSource.value) + ] + + @staticmethod + def _create_simple_view( + table_name: str, + target_dataset_fqn: str, + source_dataset_fqn: str, + converter: FelisConverter, + ) -> bigquery.Table: + """Create a view selecting all Felis-defined columns from a source + table. + """ + column_list = converter.to_bigquery_column_list(converter.find_table(table_name)) + view = bigquery.Table(f"{target_dataset_fqn}.{table_name}") + view._properties["id"] = f"{target_dataset_fqn}.{table_name}" + view.view_query = f"SELECT {column_list} FROM `{source_dataset_fqn}.{table_name}`" + return view + + +class DatasetBuildManager: + """Manage dataset builders and dispatch build operations by dataset type. + + Parameters + ---------- + config + The PPDB BigQuery configuration containing dataset names and other + settings. + schema + The Felis schema object containing table definitions. + dataset_types + Optional list of dataset types to build from the config. If + None, all supported datasets in the config will be built. + dry_run + If True, build BigQuery objects but do not execute operations to create + them. + exists_ok + If True, do not fail if a BigQuery table or view already exists; skip + creating it. + configure_authorized_views + If True, configure internal dataset access entries for public + authorized views after build. + """ + + _BUILDER_TYPES: Mapping[DatasetType, type[DatasetBuilder]] = MappingProxyType( + { + StagingDatasetBuilder.dataset_type: StagingDatasetBuilder, + InternalDatasetBuilder.dataset_type: InternalDatasetBuilder, + PublicDatasetBuilder.dataset_type: PublicDatasetBuilder, + } + ) + + def __init__( + self, + config: PpdbBigQueryConfig, + schema: Schema, + dataset_types: list[DatasetType] | None = None, + dry_run: bool = False, + exists_ok: bool = False, + configure_authorized_views: bool = False, + ) -> None: + self._config = config + self._schema = schema + self._dry_run = dry_run + self._exists_ok = exists_ok + self._configure_authorized_views_enabled = configure_authorized_views + + # Initialize the builders for all supported dataset types. + self._builders: dict[DatasetType, DatasetBuilder] = { + dataset_type: builder_type() for dataset_type, builder_type in self._BUILDER_TYPES.items() + } + + # Set the dataset types to build from the input list or default to all + # builders if not specified. + self._dataset_types: tuple[DatasetType, ...] = ( + tuple(dataset_types) if dataset_types else tuple(self._builders) + ) + + # Initialize BigQuery client early in non-dry-run mode so any + # credential/configuration errors fail fast. + self._client: bigquery.Client | None = None + if not self._dry_run: + try: + self._client = bigquery.Client(project=self._config.project_id) + except Exception as e: + raise DatasetBuilderError(f"Failed to initialize BigQuery client: {e}") from e + + def build_datasets(self) -> None: + """Create BigQuery objects for the datasets.""" + converter = FelisConverter(schema=self._schema) + public_views: list[bigquery.Table] = [] + + for dataset_type in self._dataset_types: + builder = self._builders[dataset_type] + + # Build the tables. + _LOG.info("Building tables for %s", dataset_type.value) + tables = builder.create_tables(config=self._config, converter=converter) + self._create_resources( + resources=tables, + resource_kind="table", + ) + + # Build the views. + _LOG.info("Building views for %s", dataset_type.value) + views = builder.create_views(config=self._config, converter=converter) + self._create_resources( + resources=views, + resource_kind="view", + ) + if dataset_type is DatasetType.PUBLIC: + public_views.extend(views) + + # After all datasets are built, optionally configure authorized view + # entries for the public views on the internal dataset. + if self._configure_authorized_views_enabled: + self._configure_authorized_views(public_views) + + def _create_resources( + self, + resources: list[bigquery.Table], + resource_kind: str, + ) -> None: + """Create BigQuery resources. + + Parameters + ---------- + resources + List of BigQuery table or view objects to create. + resource_kind + String label for the kind of resource being created, used in log + messages. + """ + if not resources: + _LOG.info("No %ss to create", resource_kind) + return + + if not self._dry_run and self._client is None: + raise DatasetBuilderError("BigQuery client is not initialized.") + + for resource in resources: + if self._dry_run: + _LOG.info("Dry run would create %s: %s", resource_kind, resource.to_api_repr()) + continue + _LOG.info("Creating %s: %s", resource_kind, resource.to_api_repr()) + try: + self._client.create_table(resource, exists_ok=self._exists_ok) + except Conflict as e: + raise DatasetBuilderError( + f"{resource_kind.capitalize()} {resource.table_id!r} already exists." + ) from e + _LOG.info("Created %s: %s", resource_kind, resource.table_id) + + def _configure_authorized_views(self, public_views: list[bigquery.Table]) -> None: + """Configure authorized view access entries on the internal dataset for + the public views. + + Parameters + ---------- + public_views + List of public view objects to configure as authorized views on the + internal dataset. + + Notes + ----- + Managed view entries are owned by this tool and are reconciled to the + current desired state on every run. Existing non-view entries and + unrelated view entries are retained. + """ + if not public_views: + _LOG.info("No public views found for authorized-view configuration") + return + + if self._dry_run: + for view in public_views: + _LOG.info( + "Dry run would authorize view %s for dataset %s", + view.full_table_id, + self._config.fqn_for(DatasetType.INTERNAL), + ) + return + + if self._client is None: + raise DatasetBuilderError("BigQuery client is not initialized.") + + internal_dataset_fqn = self._config.fqn_for(DatasetType.INTERNAL) + internal_dataset = self._client.get_dataset(internal_dataset_fqn) + + # Build a set of public view references managed by this tool. + managed_view_refs: set[tuple[str, str, str]] = set() + for view in public_views: + managed_view_refs.add((view.project, view.dataset_id, view.table_id)) + + # Keep any entries that are not managed by this tool. + retained_entries: list[bigquery.AccessEntry] = [] + for entry in internal_dataset.access_entries: + # Not a view entry, so keep it. + if entry.entity_type != "view": + retained_entries.append(entry) + continue + + # If an existing view entry is not in the managed set, keep it. + existing_view_ref = ( + entry.entity_id.get("projectId"), + entry.entity_id.get("datasetId"), + entry.entity_id.get("tableId"), + ) + if existing_view_ref not in managed_view_refs: + retained_entries.append(entry) + + # Rebuild managed entries from the current set of public views. + managed_view_entries: list[bigquery.AccessEntry] = [] + for project_id, dataset_id, table_id in sorted(managed_view_refs): + managed_view_entries.append( + bigquery.AccessEntry( + role=None, + entity_type="view", + entity_id={ + "projectId": project_id, + "datasetId": dataset_id, + "tableId": table_id, + }, + ) + ) + + # Combine the retained and managed entries and update the dataset. + internal_dataset.access_entries = retained_entries + managed_view_entries + self._client.update_dataset(internal_dataset, ["access_entries"]) + _LOG.info( + "Configured %d authorized PPDB view entries on dataset %s", + len(managed_view_entries), + internal_dataset_fqn, + ) diff --git a/python/lsst/dax/ppdb/bigquery/schema/felis_converter.py b/python/lsst/dax/ppdb/bigquery/schema/felis_converter.py new file mode 100644 index 00000000..eb95dbe2 --- /dev/null +++ b/python/lsst/dax/ppdb/bigquery/schema/felis_converter.py @@ -0,0 +1,205 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = [ + "FelisConverter", + "FelisConverterError", +] + +from collections.abc import Sequence +from typing import ClassVar + +from felis import Schema +from felis.datamodel import DataType, Table +from google.cloud import bigquery + + +class FelisConverterError(RuntimeError): + """Raised when Felis to BigQuery conversion fails.""" + + +class FelisConverter: + """Convert Felis objects to BigQuery. + + Parameters + ---------- + schema + Felis schema containing database objects to convert. + """ + + _TYPE_MAP: ClassVar[dict[DataType, str]] = { + DataType.boolean: "BOOL", + DataType.byte: "INT64", + DataType.short: "INT64", + DataType.int: "INT64", + DataType.long: "INT64", + DataType.float: "FLOAT64", + DataType.double: "FLOAT64", + DataType.char: "STRING", + DataType.string: "STRING", + DataType.unicode: "STRING", + DataType.text: "STRING", + DataType.binary: "BYTES", + DataType.timestamp: "TIMESTAMP", + } + + def __init__(self, schema: Schema) -> None: + self._tables_by_name: dict[str, Table] = {table.name: table for table in schema.tables} + + def find_table(self, name: str) -> Table: + """Find a Felis table by name. + + Parameters + ---------- + name + Name of the Felis table. + + Returns + ------- + `felis.datamodel.Table` + Matching Felis table. + + Raises + ------ + FelisConverterError + Raised if the table is not present in the schema. + """ + table = self._tables_by_name.get(name) + if table is None: + raise FelisConverterError(f"Table {name!r} not found in Felis schema") + return table + + def convert_tables(self, names: Sequence[str], dataset_fqn: str) -> list[bigquery.Table]: + """Create BigQuery table objects for selected Felis table names. + + Parameters + ---------- + names + Sequence of Felis table names to convert. + dataset_fqn + Fully qualified BigQuery dataset name (project.dataset) used for + table references. + + Returns + ------- + list[`google.cloud.bigquery.Table`] + Converted BigQuery table objects in the same order as ``names``. + + """ + return [self._to_bigquery_table(self.find_table(name), dataset_fqn=dataset_fqn) for name in names] + + def to_bigquery_column_list(self, table: Table) -> str: + """Return a comma-separated, backtick-quoted column list. + + Parameters + ---------- + table + Felis table object. + + Returns + ------- + str + Comma-separated list of column names formatted for BigQuery SQL. + """ + return ", ".join(f"`{column.name}`" for column in table.columns) + + def _to_bigquery_table(self, table: Table, dataset_fqn: str) -> bigquery.Table: + """Convert a Felis table to a BigQuery table object. + + Parameters + ---------- + table + Felis table definition. + dataset_fqn + Fully qualified BigQuery dataset name (project.dataset) used for + table references. + + Returns + ------- + `google.cloud.bigquery.Table` + BigQuery table object, not yet created remotely. + + Raises + ------ + FelisConverterError + Raised if any of the table's columns have unsupported datatypes for + conversion. + """ + schema_fields: list[bigquery.SchemaField] = [] + for column in table.columns: + try: + field_type = self._to_bigquery_type(column.datatype) + except FelisConverterError as e: + raise FelisConverterError( + f"Failed to map datatype for {table.name}.{column.name}: {e}" + ) from e + + schema_fields.append( + bigquery.SchemaField( + name=column.name, + field_type=field_type, + mode=self._to_bigquery_mode(column.nullable), + ) + ) + + table_fqn = f"{dataset_fqn}.{table.name}" + return bigquery.Table(table_fqn, schema=schema_fields) + + @classmethod + def _to_bigquery_type(cls, datatype: DataType) -> str: + """Map a Felis datatype to a BigQuery type string. + + Parameters + ---------- + datatype + Felis datatype to map. + + Returns + ------- + str + Corresponding BigQuery type string. + + Raises + ------ + FelisConverterError + Raised if the datatype is not supported for conversion. + """ + if datatype in cls._TYPE_MAP: + return cls._TYPE_MAP[datatype] + raise FelisConverterError(f"Unsupported Felis type {datatype!r}") + + @staticmethod + def _to_bigquery_mode(nullable: bool) -> str: + """Return the BigQuery mode string for a nullable column. + + Parameters + ---------- + nullable + Whether the column is nullable. + + Returns + ------- + str + "NULLABLE" if the column is nullable, otherwise "REQUIRED". + """ + return "NULLABLE" if nullable else "REQUIRED" diff --git a/python/lsst/dax/ppdb/cli/ppdb_cli.py b/python/lsst/dax/ppdb/cli/ppdb_cli.py index 60a75a9c..2366b89c 100644 --- a/python/lsst/dax/ppdb/cli/ppdb_cli.py +++ b/python/lsst/dax/ppdb/cli/ppdb_cli.py @@ -28,6 +28,7 @@ from lsst.dax.apdb.cli.logging_cli import LoggingCli from .. import scripts +from ..bigquery.ppdb_bigquery_config import DatasetType from . import options @@ -39,6 +40,7 @@ def main() -> None: subparsers = parser.add_subparsers(title="available subcommands", required=True) _create_sql_subcommand(subparsers) _create_bigquery(subparsers) + _build_bigquery_dataset(subparsers) args = parser.parse_args() log_cli.process_args(args) @@ -77,3 +79,40 @@ def _create_bigquery(subparsers: argparse._SubParsersAction) -> None: options.felis_schema_options(parser) options.bigquery_options(parser) parser.set_defaults(method=scripts.create_bigquery) + + +def _build_bigquery_dataset(subparsers: argparse._SubParsersAction) -> None: + parser = subparsers.add_parser( + "build-bq-datasets", help="Build one or more BigQuery datasets for the PPDB from a Felis schema file." + ) + parser.add_argument("--config", help="URI to the PPDB configuration file.", required=True) + parser.add_argument( + "--dry-run", + help="Build BigQuery objects but do not execute operations to create them.", + default=False, + action="store_true", + ) + parser.add_argument( + "--exists-ok", + help="Do not fail if a BigQuery table or view already exists; skip creating it.", + default=False, + action="store_true", + ) + parser.add_argument( + "--configure-authorized-views", + help="Configure internal dataset access entries for public authorized views after build.", + default=False, + action="store_true", + ) + parser.add_argument( + "datasets", + nargs="*", + choices=[dataset_type.value for dataset_type in DatasetType], + help=( + "Key(s) of the BigQuery dataset(s) to create. Valid keys are: " + f"{', '.join(dataset_type.value for dataset_type in DatasetType)}. If not provided, all " + "datasets will be created." + ), + ) + + parser.set_defaults(method=scripts.build_bigquery_datasets) diff --git a/python/lsst/dax/ppdb/scripts/__init__.py b/python/lsst/dax/ppdb/scripts/__init__.py index bfce287b..cb91e4e7 100644 --- a/python/lsst/dax/ppdb/scripts/__init__.py +++ b/python/lsst/dax/ppdb/scripts/__init__.py @@ -19,6 +19,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +from .build_bigquery_datasets import build_bigquery_datasets from .create_bigquery import create_bigquery from .create_sql import create_sql from .replication_list_chunks_apdb import replication_list_chunks_apdb diff --git a/python/lsst/dax/ppdb/scripts/build_bigquery_datasets.py b/python/lsst/dax/ppdb/scripts/build_bigquery_datasets.py new file mode 100644 index 00000000..9049ad51 --- /dev/null +++ b/python/lsst/dax/ppdb/scripts/build_bigquery_datasets.py @@ -0,0 +1,72 @@ +# This file is part of dax_ppdb +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from felis import Schema + +from ..bigquery import PpdbBigQueryConfig +from ..bigquery.ppdb_bigquery_config import DatasetType +from ..bigquery.schema import DatasetBuildManager +from ..ppdb_config import PpdbConfig + + +def build_bigquery_datasets( + config: str, + datasets: list[str], + dry_run: bool = False, + exists_ok: bool = False, + configure_authorized_views: bool = False, +) -> None: + """Build BigQuery datasets for the PPDB.""" + # Load the PPDB config. + try: + ppdb_config = PpdbConfig.from_uri(config) + except Exception as e: + raise RuntimeError(f"Failed to load configuration from {config}: {e}") from e + + # Check the config type. + if not isinstance(ppdb_config, PpdbBigQueryConfig): + raise TypeError( + f"Configuration loaded from '{config}' has wrong type, " + f"expected PpdbBigQueryConfig, got: {type(ppdb_config)}" + ) + + # Load the Felis schema from the URI in the config. + try: + schema = Schema.from_uri(ppdb_config.felis_schema_uri, context={"id_generation": True}) + except Exception as e: + raise RuntimeError(f"Failed to load schema from {ppdb_config.felis_schema_uri}: {e}") from e + + # Build the requested datasets. + try: + dataset_types = [DatasetType(dataset) for dataset in datasets] + except ValueError as e: + valid = ", ".join(dataset_type.value for dataset_type in DatasetType) + raise RuntimeError(f"Invalid dataset key in {datasets!r}. Valid keys are: {valid}.") from e + + build_manager = DatasetBuildManager( + ppdb_config, + schema, + dataset_types or None, + dry_run=dry_run, + exists_ok=exists_ok, + configure_authorized_views=configure_authorized_views, + ) + build_manager.build_datasets() diff --git a/tests/test_dataset_builder.py b/tests/test_dataset_builder.py new file mode 100644 index 00000000..31a58c7d --- /dev/null +++ b/tests/test_dataset_builder.py @@ -0,0 +1,405 @@ +# This file is part of dax_ppdb. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +import unittest +import uuid +from unittest.mock import Mock, patch + +from felis import Schema +from google.api_core.exceptions import Conflict +from google.cloud import bigquery + +from lsst.dax.apdb import ApdbTables +from lsst.dax.ppdb.bigquery.ppdb_bigquery_config import ( + DEFAULT_FELIS_SCHEMA_URI, + Datasets, + DatasetType, + PpdbBigQueryConfig, +) +from lsst.dax.ppdb.bigquery.schema.dataset_builder import ( + DatasetBuilder, + DatasetBuilderError, + DatasetBuildManager, + InternalDatasetBuilder, + PublicDatasetBuilder, + StagingDatasetBuilder, +) +from lsst.dax.ppdb.bigquery.schema.felis_converter import FelisConverter +from lsst.dax.ppdb.sql import PpdbSqlBaseConfig +from lsst.dax.ppdb.tests._bigquery import have_valid_google_credentials + + +def _load_test_schema() -> Schema: + return Schema.from_uri(DEFAULT_FELIS_SCHEMA_URI, context={"id_generation": True}) + + +def _make_test_config(project_id: str, datasets: Datasets) -> PpdbBigQueryConfig: + return PpdbBigQueryConfig( + project_id=project_id, + dataset_id="test_dataset_builder", + bucket_name="test-bucket", + object_prefix="data/test", + replication_dir="/tmp", + sql=PpdbSqlBaseConfig(db_url="sqlite:///:memory:"), + datasets=datasets, + ) + + +class _InvalidViewBuilder(DatasetBuilder): + """Builder used to exercise manager validation of malformed views.""" + + dataset_type = DatasetType.PUBLIC + + def create_tables(self, config: PpdbBigQueryConfig, converter: FelisConverter) -> list[bigquery.Table]: + return [] + + def create_views(self, config: PpdbBigQueryConfig, converter: FelisConverter) -> list[bigquery.Table]: + return [bigquery.Table(f"{config.fqn_for(DatasetType.PUBLIC)}.BrokenView")] + + +class DatasetBuilderTestCase(unittest.TestCase): + """Tests for dataset builder object generation and manager behavior.""" + + def setUp(self) -> None: + self.schema = _load_test_schema() + self.converter = FelisConverter(self.schema) + self.config = _make_test_config( + project_id="test-project", + datasets=Datasets( + internal="test_dataset_builder_internal", + public="test_dataset_builder_public", + staging="test_dataset_builder_staging", + ), + ) + + def test_public_builder_creates_diaobject_without_validity_end(self) -> None: + builder = PublicDatasetBuilder() + + tables = builder.create_tables( + config=self.config, + converter=self.converter, + ) + + self.assertEqual(len(tables), 1) + table = tables[0] + self.assertEqual(table.table_id, ApdbTables.DiaObject.value) + + field_names = [field.name for field in table.schema] + self.assertIn("diaObjectId", field_names) + self.assertNotIn("validityEndMjdTai", field_names) + + source_table = self.converter.find_table(ApdbTables.DiaObject.value) + self.assertEqual(len(field_names), len(source_table.columns) - 1) + + def test_public_builder_creates_explicit_views(self) -> None: + builder = PublicDatasetBuilder() + public_dataset_fqn = self.config.fqn_for(DatasetType.PUBLIC) + internal_dataset_fqn = self.config.fqn_for(DatasetType.INTERNAL) + + views = builder.create_views(config=self.config, converter=self.converter) + + self.assertEqual([view.table_id for view in views], ["DiaSource", "DiaForcedSource"]) + for view_name, view in zip(("DiaSource", "DiaForcedSource"), views, strict=True): + assert view.view_query is not None + self.assertNotIn("SELECT *", view.view_query) + self.assertIn(f"FROM `{internal_dataset_fqn}.{view_name}`", view.view_query) + expected_columns = self.converter.to_bigquery_column_list(self.converter.find_table(view_name)) + self.assertEqual( + view.view_query, + f"SELECT {expected_columns} FROM `{internal_dataset_fqn}.{view_name}`", + ) + self.assertEqual(view.full_table_id, f"{public_dataset_fqn}.{view_name}") + + def test_internal_builder_adds_geo_point_and_clustering(self) -> None: + builder = InternalDatasetBuilder() + + tables = builder.create_tables( + config=self.config, + converter=self.converter, + ) + + self.assertEqual({table.table_id for table in tables}, {"DiaObject", "DiaSource", "DiaForcedSource"}) + for table in tables: + geo_point = next(field for field in table.schema if field.name == "geo_point") + self.assertEqual(geo_point.field_type, "GEOGRAPHY") + self.assertEqual(geo_point.mode, "REQUIRED") + self.assertEqual(table.clustering_fields, ["geo_point"]) + + def test_staging_builder_adds_replica_chunk(self) -> None: + builder = StagingDatasetBuilder() + + tables = builder.create_tables( + config=self.config, + converter=self.converter, + ) + + self.assertEqual({table.table_id for table in tables}, {"DiaObject", "DiaSource", "DiaForcedSource"}) + for table in tables: + chunk_field = next(field for field in table.schema if field.name == "apdb_replica_chunk") + self.assertEqual(chunk_field.field_type, "INT64") + self.assertEqual(chunk_field.mode, "REQUIRED") + self.assertNotIn("geo_point", [field.name for field in table.schema]) + + def test_manager_dry_run_does_not_initialize_client(self) -> None: + with patch("lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client") as client_type: + manager = DatasetBuildManager(config=self.config, schema=self.schema, dry_run=True) + manager.build_datasets() + + client_type.assert_not_called() + + def test_manager_raises_on_client_init_failure(self) -> None: + with patch( + "lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client", + side_effect=RuntimeError("boom"), + ): + with self.assertRaisesRegex(DatasetBuilderError, r"Failed to initialize BigQuery client"): + DatasetBuildManager(config=self.config, schema=self.schema) + + def test_manager_passes_exists_ok_when_creating_resources(self) -> None: + with patch("lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client"): + manager = DatasetBuildManager( + config=self.config, + schema=self.schema, + dataset_types=[DatasetType.PUBLIC], + exists_ok=True, + ) + client = Mock() + manager._client = client + table = bigquery.Table(f"{self.config.project_id}.{self.config.datasets.public}.DiaObject") + + manager._create_resources([table], "table") + + client.create_table.assert_called_once_with(table, exists_ok=True) + + def test_manager_wraps_conflict_as_builder_error(self) -> None: + with patch("lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client"): + manager = DatasetBuildManager( + config=self.config, + schema=self.schema, + dataset_types=[DatasetType.PUBLIC], + ) + client = Mock() + manager._client = client + client.create_table.side_effect = Conflict("already exists") + table = bigquery.Table(f"{self.config.project_id}.{self.config.datasets.public}.DiaObject") + + with self.assertRaises(DatasetBuilderError) as cm: + manager._create_resources([table], "table") + + self.assertIn("already exists", str(cm.exception)) + + def test_manager_allows_view_without_query_in_dry_run(self) -> None: + manager = DatasetBuildManager( + config=self.config, + schema=self.schema, + dataset_types=[DatasetType.PUBLIC], + dry_run=True, + ) + manager._builders[DatasetType.PUBLIC] = _InvalidViewBuilder() + + manager.build_datasets() + + def test_configure_authorized_views_preserves_unrelated_view_permissions(self) -> None: + with patch("lsst.dax.ppdb.bigquery.schema.dataset_builder.bigquery.Client"): + manager = DatasetBuildManager( + config=self.config, + schema=self.schema, + dataset_types=[DatasetType.PUBLIC], + configure_authorized_views=True, + ) + + non_view_entry = bigquery.AccessEntry( + role="READER", + entity_type="userByEmail", + entity_id="reader@example.org", + ) + managed_existing_entry = bigquery.AccessEntry( + role=None, + entity_type="view", + entity_id={ + "projectId": self.config.project_id, + "datasetId": self.config.datasets.public, + "tableId": ApdbTables.DiaSource.value, + }, + ) + unrelated_view_entry = bigquery.AccessEntry( + role=None, + entity_type="view", + entity_id={ + "projectId": "unrelated-project", + "datasetId": "unrelated_dataset", + "tableId": "UnrelatedView", + }, + ) + + dataset = Mock() + dataset.access_entries = [non_view_entry, managed_existing_entry, unrelated_view_entry] + + client = Mock() + client.get_dataset.return_value = dataset + manager._client = client + + public_views = [ + bigquery.Table(f"{self.config.fqn_for(DatasetType.PUBLIC)}.{ApdbTables.DiaSource.value}"), + bigquery.Table(f"{self.config.fqn_for(DatasetType.PUBLIC)}.{ApdbTables.DiaForcedSource.value}"), + ] + + manager._configure_authorized_views(public_views) + + client.update_dataset.assert_called_once_with(dataset, ["access_entries"]) + + updated_entries = dataset.access_entries + self.assertIn(non_view_entry, updated_entries) + self.assertIn(unrelated_view_entry, updated_entries) + + updated_view_refs = { + ( + entry.entity_id["projectId"], + entry.entity_id["datasetId"], + entry.entity_id["tableId"], + ) + for entry in updated_entries + if entry.entity_type == "view" and isinstance(entry.entity_id, dict) + } + self.assertEqual( + updated_view_refs, + { + ( + self.config.project_id, + self.config.datasets.public, + ApdbTables.DiaSource.value, + ), + ( + self.config.project_id, + self.config.datasets.public, + ApdbTables.DiaForcedSource.value, + ), + ( + "unrelated-project", + "unrelated_dataset", + "UnrelatedView", + ), + }, + ) + + +@unittest.skipIf(not have_valid_google_credentials(), "Missing valid Google credentials") +class DatasetBuilderBigQueryTestCase(unittest.TestCase): + """Integration tests for creating BigQuery tables and views.""" + + def setUp(self) -> None: + self.schema = _load_test_schema() + self.client = bigquery.Client() + self.project_id = self.client.project + suffix = uuid.uuid4().hex[:8] + self.datasets = Datasets( + internal=f"test_dataset_builder_internal_{suffix}", + public=f"test_dataset_builder_public_{suffix}", + staging=f"test_dataset_builder_staging_{suffix}", + ) + self.config = _make_test_config(project_id=self.project_id, datasets=self.datasets) + + for dataset_type in DatasetType: + dataset_name = self.datasets.name_for(dataset_type) + dataset = bigquery.Dataset(f"{self.project_id}.{dataset_name}") + dataset.default_table_expiration_ms = 3600000 + self.client.create_dataset(dataset) + + def tearDown(self) -> None: + delete_errors: list[str] = [] + for dataset_type in DatasetType: + dataset_name = self.datasets.name_for(dataset_type) + dataset_fqn = f"{self.project_id}.{dataset_name}" + try: + self.client.delete_dataset(dataset_fqn, delete_contents=True, not_found_ok=True) + except Exception as e: + delete_errors.append(f"{dataset_fqn}: {type(e).__name__}: {e}") + + if delete_errors: + self.fail( + "Failed to delete one or more BigQuery datasets in tearDown:\n" + + "\n".join(delete_errors) + ) + + def test_build_datasets_creates_expected_bigquery_objects(self) -> None: + manager = DatasetBuildManager(config=self.config, schema=self.schema) + + manager.build_datasets() + + internal_tables = list(self.client.list_tables(f"{self.project_id}.{self.datasets.internal}")) + public_tables = list(self.client.list_tables(f"{self.project_id}.{self.datasets.public}")) + staging_tables = list(self.client.list_tables(f"{self.project_id}.{self.datasets.staging}")) + + self.assertEqual(len(internal_tables), 3) + self.assertEqual(len(public_tables), 3) + self.assertEqual(len(staging_tables), 3) + + internal_dia_object = self.client.get_table( + f"{self.project_id}.{self.datasets.internal}.{ApdbTables.DiaObject.value}" + ) + internal_geo_point = next(field for field in internal_dia_object.schema if field.name == "geo_point") + self.assertEqual(internal_geo_point.field_type, "GEOGRAPHY") + self.assertEqual(internal_geo_point.mode, "REQUIRED") + self.assertEqual(internal_dia_object.clustering_fields, ["geo_point"]) + + public_dia_object = self.client.get_table( + f"{self.project_id}.{self.datasets.public}.{ApdbTables.DiaObject.value}" + ) + self.assertEqual(public_dia_object.table_type, "TABLE") + self.assertNotIn("validityEndMjdTai", [field.name for field in public_dia_object.schema]) + + public_dia_source = self.client.get_table( + f"{self.project_id}.{self.datasets.public}.{ApdbTables.DiaSource.value}" + ) + self.assertEqual(public_dia_source.table_type, "VIEW") + assert public_dia_source.view_query is not None + self.assertNotIn("SELECT *", public_dia_source.view_query) + self.assertIn( + f"FROM `{self.project_id}.{self.datasets.internal}.{ApdbTables.DiaSource.value}`", + public_dia_source.view_query, + ) + + staging_dia_source = self.client.get_table( + f"{self.project_id}.{self.datasets.staging}.{ApdbTables.DiaSource.value}" + ) + chunk_field = next(field for field in staging_dia_source.schema if field.name == "apdb_replica_chunk") + self.assertEqual(chunk_field.field_type, "INTEGER") + self.assertEqual(chunk_field.mode, "REQUIRED") + + def test_build_datasets_exists_ok_allows_repeated_creation(self) -> None: + manager = DatasetBuildManager(config=self.config, schema=self.schema, exists_ok=True) + + manager.build_datasets() + manager.build_datasets() + + def test_build_datasets_without_exists_ok_fails_on_repeated_creation(self) -> None: + manager = DatasetBuildManager(config=self.config, schema=self.schema) + + manager.build_datasets() + + with self.assertRaisesRegex(DatasetBuilderError, r"already exists"): + manager.build_datasets() + + +if __name__ == "__main__": + unittest.main()