Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 1 addition & 48 deletions python/lsst/dax/ppdb/bigquery/ppdb_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@

from .._arrow import write_parquet
from ..ppdb import Ppdb, PpdbReplicaChunk
from ..ppdb_config import PpdbConfig
from ..sql import PasswordProvider, PpdbSqlBase, PpdbSqlBaseConfig
from .manifest import Manifest, TableStats
from .ppdb_bigquery_config import PpdbBigQueryConfig
from .ppdb_replica_chunk_extended import ChunkStatus, PpdbReplicaChunkExtended
from .updates.update_records import UpdateRecords

Expand All @@ -66,53 +66,6 @@
"""


class PpdbBigQueryConfig(PpdbConfig):
"""Configuration for BigQuery-based PPDB."""

project_id: str
"""Google Cloud project ID."""

dataset_id: str
"""Target BigQuery dataset ID, without the project."""

bucket_name: str
"""Name of Google Cloud Storage bucket for uploading chunks."""

object_prefix: str
"""Base prefix for the object in cloud storage."""

replication_dir: str
"""Directory where the exported chunks will be stored."""

stage_chunk_topic: str = "stage-chunk-topic"
"""Pub/Sub topic name for triggering chunk staging process."""

parq_batch_size: int = 10000
"""Number of rows to process in each batch when writing parquet files."""

parq_compression: str = "snappy"
"""Compression format for Parquet files."""

delete_existing_dirs: bool = False
"""If `True`, existing directories for chunks will be deleted before
export. If `False`, an error will be raised if the directory already
exists.
"""

sql: PpdbSqlBaseConfig
"""SQL database configuration (`~lsst.dax.ppdb.sql.PpdbSqlBaseConfig`)."""

@property
def replication_path(self) -> Path:
"""Return path for writing replica chunk data (`pathlib.Path`)."""
return Path(self.replication_dir)

@property
def fq_dataset_id(self) -> str:
"""Fully qualified BigQuery dataset ID, including project (`str`)."""
return f"{self.project_id}:{self.dataset_id}"


class _SecretManagerPasswordProvider(PasswordProvider):
"""Retrieves a database password from Google Cloud Secret Manager.

Expand Down
143 changes: 143 additions & 0 deletions python/lsst/dax/ppdb/bigquery/ppdb_bigquery_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# This file is part of dax_ppdb
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from enum import StrEnum
from pathlib import Path

from pydantic import BaseModel

from ..ppdb_config import PpdbConfig
from ..sql import PpdbSqlBaseConfig

PPDB_PREFIX = "ppdb"


class DatasetType(StrEnum):
"""Typed keys for BigQuery dataset categories."""

STAGING = "staging"
INTERNAL = "internal"
PUBLIC = "public"


class Datasets(BaseModel):
"""Mapping of dataset types to their target names in BigQuery."""

staging: str = f"{PPDB_PREFIX}_staging"
"""Name of the staging dataset used for intermediate storage during
replication and promotion processes."""

internal: str = f"{PPDB_PREFIX}_internal"
"""Name of the internal dataset containing the fully replicated and
promoted data."""

public: str = f"{PPDB_PREFIX}_public"
"""Name of the public dataset which is presented through the TAP
interface."""

def name_for(self, dataset_type: DatasetType) -> str:
"""Return the configured dataset name for a dataset type."""
match dataset_type:
case DatasetType.INTERNAL:
return self.internal
case DatasetType.PUBLIC:
return self.public
case DatasetType.STAGING:
return self.staging

# Defensive check in case a bad value was passed at runtime.
raise ValueError(f"Unsupported dataset type: {dataset_type!r}")


DEFAULT_FELIS_SCHEMA_URI = "resource://lsst.sdm.schemas/ppdb.yaml"
"""Default URI for the Felis PPDB schema in SDM Schemas."""


class PpdbBigQueryConfig(PpdbConfig):
"""Configuration for BigQuery-based PPDB."""

felis_schema_uri: str = DEFAULT_FELIS_SCHEMA_URI
"""URI for the FELIS schema used in the PPDB."""

project_id: str
"""Google Cloud project ID."""

dataset_id: str # TODO: This will need to be deprecated and removed (DM-54681).
"""Target BigQuery dataset ID, without the project."""

bucket_name: str
"""Name of Google Cloud Storage bucket for uploading chunks."""

object_prefix: str
"""Base prefix for the object in cloud storage."""

replication_dir: str
"""Directory where the exported chunks will be stored."""

stage_chunk_topic: str = "stage-chunk-topic"
"""Pub/Sub topic name for triggering chunk staging process."""

parq_batch_size: int = 10000
"""Number of rows to process in each batch when writing parquet files."""

parq_compression: str = "snappy"
"""Compression format for Parquet files."""

delete_existing_dirs: bool = False
"""If `True`, existing directories for chunks will be deleted before
export. If `False`, an error will be raised if the directory already
exists.
"""

sql: PpdbSqlBaseConfig
"""SQL database configuration (`~lsst.dax.ppdb.sql.PpdbSqlBaseConfig`)."""

datasets: Datasets = Datasets()
"""Names of the BigQuery datasets used for different purposes
(`Datasets`).
"""

@property
def replication_path(self) -> Path:
"""Return path for writing replica chunk data (`pathlib.Path`)."""
return Path(self.replication_dir)

# TODO: This will need to be deprecated and removed (DM-54681).
@property
def fq_dataset_id(self) -> str:
"""Fully qualified BigQuery dataset ID, including project (`str`)."""
return f"{self.project_id}:{self.dataset_id}"

def fqn_for(self, dataset_type: DatasetType) -> str:
"""Return the fully qualified BigQuery dataset name for a dataset type.

Parameters
----------
dataset_type
Type of dataset to get the name for.

Returns
-------
str
Fully qualified BigQuery dataset name (project.dataset).
"""
dataset_name = self.datasets.name_for(dataset_type)
return f"{self.project_id}.{dataset_name}"
23 changes: 23 additions & 0 deletions python/lsst/dax/ppdb/bigquery/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# This file is part of dax_ppdb
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from .dataset_builder import *
from .felis_converter import *
Loading
Loading