Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1776.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Added direct `gs://` dataset loading for UK simulations, including support for GCS generations and PolicyEngine data-version metadata.
13 changes: 13 additions & 0 deletions docs/book/usage/simulations.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,19 @@ sim = Simulation(dataset=dataset)
print(sim.calculate("household_net_income", 2026))
```

`Simulation` and `Microsimulation` can also load H5 files from local paths,
Hugging Face URLs, or Google Cloud Storage URLs:

```python
sim = Microsimulation(
dataset="gs://policyengine-uk-data-private/enhanced_frs_2023_24.h5@1.55.10"
)
```

For `gs://` URLs, a numeric suffix after `@` pins an exact GCS generation. A
non-numeric suffix pins the PolicyEngine data version stored in the object's
GCS metadata.

### From survey datasets

For population-level analysis, use survey data:
Expand Down
140 changes: 140 additions & 0 deletions policyengine_uk/data/dataset_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import hashlib
import os
import tempfile
from pathlib import Path
from typing import Optional, Union

from policyengine_core.tools.google_cloud import parse_gs_url


def materialize_gcs_dataset_url(
dataset_url: str,
*,
cache_dir: Optional[Union[str, os.PathLike]] = None,
) -> str:
"""Download a GCS dataset URL to a local H5 path and return that path."""
bucket_name, file_path, revision = parse_gs_url(dataset_url)
storage_client = _get_storage_client()
blob = _resolve_gcs_blob(storage_client, bucket_name, file_path, revision)
generation = _blob_generation(blob)

local_path = _cached_dataset_path(
bucket_name=bucket_name,
file_path=file_path,
generation=generation,
cache_dir=cache_dir,
)
if not local_path.exists():
_download_blob(blob, local_path)
return str(local_path)


def _get_storage_client():
try:
import google.auth
from google.auth import exceptions as auth_exceptions
from google.cloud import storage
except ImportError as exc:
raise ImportError(
"google-cloud-storage is required for gs:// dataset URLs. "
"Install it with: pip install google-cloud-storage"
) from exc

try:
credentials, project_id = google.auth.default()
except auth_exceptions.DefaultCredentialsError as exc:
raise RuntimeError(
"Google Cloud credentials are required for gs:// dataset URLs. "
"Set application default credentials or GOOGLE_APPLICATION_CREDENTIALS."
) from exc

return storage.Client(credentials=credentials, project=project_id)


def _resolve_gcs_blob(
storage_client,
bucket_name: str,
file_path: str,
revision: Optional[str],
):
bucket = storage_client.bucket(bucket_name)

if revision is not None and revision.isdigit():
blob = bucket.blob(file_path, generation=int(revision))
blob.reload()
return blob

current_blob = bucket.blob(file_path)
current_blob.reload()
if revision is None or _blob_metadata_version(current_blob) == revision:
return current_blob

matching_blobs = []
for blob in storage_client.list_blobs(
bucket_name,
prefix=file_path,
versions=True,
):
if blob.name != file_path:
continue
if _blob_metadata_version(blob) == revision:
matching_blobs.append(blob)

if not matching_blobs:
raise ValueError(
f"No GCS object version for gs://{bucket_name}/{file_path} has "
f"metadata version {revision!r}."
)

return max(matching_blobs, key=lambda blob: int(_blob_generation(blob)))


def _blob_metadata_version(blob) -> Optional[str]:
if getattr(blob, "metadata", None) is None:
blob.reload()
metadata = getattr(blob, "metadata", None) or {}
return metadata.get("version")


def _blob_generation(blob) -> str:
generation = getattr(blob, "generation", None)
if generation is None:
blob.reload()
generation = getattr(blob, "generation", None)
if generation is None:
raise ValueError(f"GCS object {blob.name!r} does not expose a generation.")
return str(generation)


def _cached_dataset_path(
*,
bucket_name: str,
file_path: str,
generation: str,
cache_dir: Optional[Union[str, os.PathLike]],
) -> Path:
if cache_dir is None:
cache_dir = Path(tempfile.gettempdir()) / "policyengine-uk-datasets"
else:
cache_dir = Path(cache_dir)

cache_key = hashlib.sha256(
f"{bucket_name}\0{file_path}\0{generation}".encode()
).hexdigest()
return cache_dir / cache_key / Path(file_path).name


def _download_blob(blob, local_path: Path) -> None:
local_path.parent.mkdir(parents=True, exist_ok=True)
fd, temporary_path_name = tempfile.mkstemp(
prefix=f".{local_path.name}.",
suffix=".tmp",
dir=local_path.parent,
)
os.close(fd)
temporary_path = Path(temporary_path_name)
try:
blob.download_to_filename(str(temporary_path))
os.replace(temporary_path, local_path)
finally:
temporary_path.unlink(missing_ok=True)
15 changes: 12 additions & 3 deletions policyengine_uk/simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
extend_single_year_dataset,
reset_growthfactor_uprating,
)
from policyengine_uk.data.dataset_sources import materialize_gcs_dataset_url
from policyengine_uk.utils.dependencies import get_variable_dependencies
from policyengine_uk.reforms import create_structural_reforms_from_parameters
from policyengine_uk.parameters.gov.simulation.labour_supply_responses.aliases import (
Expand Down Expand Up @@ -274,11 +275,19 @@ def build_from_dataset_source(
if dataset_source.startswith("hf://"):
self.build_from_url(dataset_source)
return
if dataset_source.startswith("gs://"):
if dataset_source in _url_dataset_cache:
multi_year_dataset = _url_dataset_cache[dataset_source]
self.build_from_multi_year_dataset(multi_year_dataset)
self.dataset = multi_year_dataset
return
dataset_file = materialize_gcs_dataset_url(dataset_source)
self.build_from_file(dataset_file, cache_key=dataset_source)
return
if "://" in dataset_source:
raise ValueError(
"Only HuggingFace dataset URLs are supported directly by "
"policyengine-uk. Download or materialize other dataset "
"sources to a local file path before passing them to Simulation."
"Only HuggingFace, Google Cloud Storage, and local dataset "
"sources are supported by policyengine-uk."
)
self.build_from_file(dataset_source)

Expand Down
Loading