diff --git a/AGENTS.md b/AGENTS.md index 46b7f9c85..87dea6fac 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -26,6 +26,24 @@ When adding, changing, or reviewing calibration target definitions, read When adding, changing, or reviewing donor-survey imputations, read `docs/engineering/skills/imputation.md`. +## Weighting / population aggregates (CRITICAL) + +Never read or sum a weight array directly, and never report unweighted record +counts or raw HDF5 column sums as population figures — both are wrong. To get any +population aggregate from a dataset, load it as a `Microsimulation` and aggregate +the result; microdf auto-weights with the household weight, so you never touch a +weight: + +```python +from policyengine_us import Microsimulation +sim = Microsimulation(dataset=path) +total = sim.calculate("taxable_private_pension_income", 2024).sum() # weighted $ +recipients = (sim.calculate("taxable_private_pension_income", 2024) > 0).sum() # weighted count +``` + +If you ever must reference a weight at all, it is `household_weight` ONLY; the +person/tax_unit/family/marital weights are derived and must never be used directly. + ## Calibration targets Manually sourced national or local-file calibration targets must be registered diff --git a/Makefile b/Makefile index 88c35899a..135407376 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: all format lint test test-unit test-integration install download upload docker documentation data validate-data calibrate calibrate-build publish-local-area upload-calibration upload-dataset push-to-modal build-data-modal build-matrices calibrate-modal calibrate-modal-national calibrate-both stage-h5s stage-national-h5 stage-all-h5s pipeline validate-staging validate-staging-full upload-validation check-staging check-sanity clean build paper clean-paper presentations database database-refresh promote-dataset promote build-h5s validate-local refresh-soi-targets push-pr-branch +.PHONY: all format lint test test-unit test-integration install download upload production-ecps docker documentation data validate-data calibrate calibrate-build publish-local-area upload-calibration upload-dataset push-to-modal build-data-modal build-matrices calibrate-modal calibrate-modal-national calibrate-both stage-h5s stage-national-h5 stage-all-h5s pipeline validate-staging validate-staging-full upload-validation check-staging check-sanity clean build paper clean-paper presentations database database-refresh promote-dataset promote build-h5s validate-local refresh-soi-targets push-pr-branch SOI_SOURCE_YEAR ?= 2021 SOI_TARGET_YEAR ?= 2023 @@ -49,6 +49,9 @@ download: upload: python -m policyengine_us_data.storage.upload_completed_datasets +production-ecps: + python -m policyengine_us_data.utils.production_baseline --json + docker: docker buildx build --platform linux/amd64 . -t policyengine-us-data:latest diff --git a/changelog.d/1164.added b/changelog.d/1164.added new file mode 100644 index 000000000..949210bf2 --- /dev/null +++ b/changelog.d/1164.added @@ -0,0 +1 @@ +Production eCPS baseline resolver (`policyengine_us_data.utils.production_baseline`) that fetches the Hugging Face-published enhanced CPS pinned to the package version, records provenance, and fails loudly when a required column is missing or all-zero -- so a comparison can never silently run against a stale or column-dropped local rebuild. Also requires `social_security_disability` alongside `social_security_retirement` in the `enhanced_cps_2024.h5` upload validator. diff --git a/policyengine_us_data/storage/upload_completed_datasets.py b/policyengine_us_data/storage/upload_completed_datasets.py index f55df0374..48802de1f 100644 --- a/policyengine_us_data/storage/upload_completed_datasets.py +++ b/policyengine_us_data/storage/upload_completed_datasets.py @@ -93,7 +93,17 @@ class MicrosimulationAggregateCheck: # not formula outputs; they are source or imputed inputs used by model formulas. REQUIRED_VARIABLES_BY_FILENAME = { "enhanced_cps_2024.h5": ( + # Require the robustly-populated Social Security components. Only + # `retirement` was guarded before; `disability` is also always + # populated (the imputation fallback assigns both), so requiring it + # catches a dropped column -- as the extended-CPS step once dropped + # `retirement`, leaving the published baseline 64% short on total SS. + # These entries are checked by _check_group_has_data (present with + # length > 0). `survivors`/`dependents` are sparse and can be + # legitimately all-zero under the imputation fallback, so they are + # intentionally not required here. "social_security_retirement", + "social_security_disability", "takes_up_snap_if_eligible", "takes_up_ssi_if_eligible", "takes_up_tanf_if_eligible", diff --git a/policyengine_us_data/utils/production_baseline.py b/policyengine_us_data/utils/production_baseline.py new file mode 100644 index 000000000..78d474736 --- /dev/null +++ b/policyengine_us_data/utils/production_baseline.py @@ -0,0 +1,305 @@ +"""Resolve the canonical *production* enhanced CPS baseline. + +The "production eCPS" is the ``enhanced_cps_2024.h5`` published to Hugging +Face under :data:`DEFAULT_REPO`, tagged with the released +``policyengine-us-data`` version. Uploads create that tag +(:func:`policyengine_us_data.utils.data_upload.upload_files_to_hf`), so a +package version pins a reproducible, byte-identical dataset. + +This module exists so that anything comparing against "the eCPS" (microplex, +audits, replacement diagnostics) can never silently use a stale or broken +*local* rebuild. It fetches the pinned release into the local Hugging Face +cache, records provenance (repo, revision, sha256), and runs an integrity +gate that fails loudly if a required column is missing or all-zero -- the +recurring failure mode where a build step quietly drops a column (e.g. the +extended-CPS step dropping ``social_security_retirement``). + +The recorded ``sha256`` is provenance only: byte-level integrity of the +download is already guaranteed by ``hf_hub_download`` against the Hub's content +hash, so the hash here is for audit/traceability, not an independent integrity +check. The gate this module adds is about *content* -- the required columns +being present and non-zero, which the Hub hash cannot tell you. + +Typical use:: + + from policyengine_us_data.utils.production_baseline import ( + resolve_production_ecps, + ) + + baseline = resolve_production_ecps() # pinned, verified + run_comparison(baseline.path) + record_provenance(baseline.to_dict()) +""" + +from __future__ import annotations + +import hashlib +import importlib.metadata as importlib_metadata +import json +import os +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional, Sequence + +DEFAULT_REPO = "policyengine/policyengine-us-data" +DEFAULT_FILENAME = "enhanced_cps_2024.h5" +PACKAGE_NAME = "policyengine-us-data" + +#: Environment override for the pinned revision (commit, tag, or branch). +VERSION_ENV_VAR = "POLICYENGINE_US_DATA_ECPS_VERSION" + +#: Columns that MUST be present and non-zero in a healthy production eCPS. +#: This is the guardrail against the recurring "a build step silently dropped +#: a column" failure -- most recently ``social_security_retirement`` vanishing +#: in the extended-CPS step, which left the published comparison baseline 64% +#: short on total Social Security. Extend as new must-have inputs are added. +#: +#: Only the *robustly-populated* Social Security components are gated. +#: ``social_security_retirement`` and ``social_security_disability`` are always +#: populated -- the imputation fallback ``_age_heuristic_ss_shares`` assigns +#: both even when the QRF share model is unavailable -- so an all-zero value +#: reliably means the column was dropped. ``social_security_survivors`` and +#: ``social_security_dependents`` are sparse and stay zero under that fallback, +#: so gating them would reject builds the pipeline intentionally allows; they +#: are deliberately excluded. +REQUIRED_NONZERO_COLUMNS: tuple[str, ...] = ( + "social_security_retirement", + "social_security_disability", + "employment_income_before_lsr", +) + + +class BaselineIntegrityError(RuntimeError): + """Raised when a resolved baseline fails the integrity gate.""" + + +@dataclass +class ProductionBaseline: + """A resolved, verified production baseline and its provenance.""" + + path: str + repo: str + revision: str + sha256: str + checks: dict + fetched_at: str + + def to_dict(self) -> dict: + """Return a JSON-serialisable provenance record.""" + return asdict(self) + + +def production_pin(version: Optional[str] = None) -> str: + """Return the revision to pin the production eCPS to. + + Args: + version: Explicit revision (commit, tag, or branch). When ``None``, + falls back to the ``POLICYENGINE_US_DATA_ECPS_VERSION`` environment + variable and then to the installed ``policyengine-us-data`` + package version. + + Returns: + The revision string to request from Hugging Face. + """ + if version: + return version + env_version = os.environ.get(VERSION_ENV_VAR) + if env_version: + return env_version + return importlib_metadata.version(PACKAGE_NAME) + + +def _sha256(path: str | Path) -> str: + """Return the SHA-256 hex digest of a file, read in chunks.""" + digest = hashlib.sha256() + with open(path, "rb") as handle: + for chunk in iter(lambda: handle.read(1 << 20), b""): + digest.update(chunk) + return digest.hexdigest() + + +def _column_nonzero_count(group_or_dataset) -> int: + """Return the number of non-zero entries in an h5py node. + + Handles both a bare dataset and a group keyed by period (the latest + period is used). + """ + import h5py + import numpy as np + + node = group_or_dataset + if isinstance(node, h5py.Group): + node = node[sorted(node.keys())[-1]] + values = np.asarray(node[()], dtype=float) + return int((values != 0).sum()) + + +def assert_baseline_intact( + path: str | Path, + required_nonzero_columns: Sequence[str] = REQUIRED_NONZERO_COLUMNS, +) -> dict: + """Assert required columns are present and non-zero in a dataset. + + Args: + path: Path to an HDF5 dataset (PolicyEngine ``Dataset`` layout). + required_nonzero_columns: Columns that must exist and have at least + one non-zero value. + + Returns: + A checks dict describing what was inspected. + + Raises: + BaselineIntegrityError: If any required column is missing or all-zero. + """ + import h5py + + path = Path(path) + missing: list[str] = [] + all_zero: list[str] = [] + nonzero_counts: dict[str, int] = {} + + with h5py.File(path, "r") as handle: + present_keys = set(handle.keys()) + for column in required_nonzero_columns: + if column not in present_keys: + missing.append(column) + continue + count = _column_nonzero_count(handle[column]) + nonzero_counts[column] = count + if count == 0: + all_zero.append(column) + + passed = not missing and not all_zero + checks = { + "required_nonzero_columns": list(required_nonzero_columns), + "missing": missing, + "all_zero": all_zero, + "nonzero_counts": nonzero_counts, + "passed": passed, + } + if not passed: + raise BaselineIntegrityError( + f"Production eCPS baseline at {path} failed the integrity gate: " + f"missing={missing or 'none'}, all_zero={all_zero or 'none'}. " + "This usually means a build step silently dropped a column (e.g. " + "the extended-CPS step dropping social_security_retirement). " + "Refusing to use a broken baseline." + ) + return checks + + +def resolve_production_ecps( + version: Optional[str] = None, + *, + repo: str = DEFAULT_REPO, + filename: str = DEFAULT_FILENAME, + token: Optional[str] = None, + cache_dir: Optional[str | Path] = None, + verify: bool = True, + required_nonzero_columns: Sequence[str] = REQUIRED_NONZERO_COLUMNS, +) -> ProductionBaseline: + """Download (or reuse from cache) and verify the production eCPS. + + The file is fetched from Hugging Face pinned to a specific revision (the + installed package version by default) into the local Hugging Face cache, + so repeat calls reuse the byte-identical file. Integrity is checked unless + ``verify`` is ``False``. + + Args: + version: Revision to pin to. Defaults to + :func:`production_pin`. + repo: Hugging Face repository id. + filename: File to fetch from the repository. + token: Hugging Face token. Defaults to ``HUGGING_FACE_TOKEN``. + cache_dir: Optional override for the Hugging Face cache directory. + verify: Whether to run the integrity gate. + required_nonzero_columns: Columns the integrity gate requires. + + Returns: + A :class:`ProductionBaseline` with the local path and provenance. + + Raises: + BaselineIntegrityError: If ``verify`` is set and the gate fails. + """ + from huggingface_hub import hf_hub_download + from huggingface_hub.errors import HfHubHTTPError + + revision = production_pin(version) + token = token or os.environ.get("HUGGING_FACE_TOKEN") + try: + local_path = hf_hub_download( + repo_id=repo, + repo_type="model", + filename=filename, + revision=revision, + token=token, + cache_dir=str(cache_dir) if cache_dir is not None else None, + ) + except (HfHubHTTPError, FileNotFoundError) as exc: + # RepositoryNotFoundError / RevisionNotFoundError / + # RemoteEntryNotFoundError / GatedRepoError all subclass HfHubHTTPError; + # LocalEntryNotFoundError (offline, uncached) is a FileNotFoundError. + # Turn any of them into a clear, actionable failure rather than an + # opaque Hub traceback. + raise BaselineIntegrityError( + f"Could not fetch the production eCPS '{filename}' at revision " + f"'{revision}' from '{repo}': {exc}. The pin defaults to the " + f"installed {PACKAGE_NAME} version, which may never have been " + f"published to Hugging Face (e.g. a local/dev/editable install). " + f"Set {VERSION_ENV_VAR} to a published revision (tag or commit), " + f"or install a released {PACKAGE_NAME}." + ) from exc + checks: dict + if verify: + checks = assert_baseline_intact(local_path, required_nonzero_columns) + else: + checks = {"passed": None, "skipped": True} + return ProductionBaseline( + path=str(local_path), + repo=repo, + revision=revision, + sha256=_sha256(local_path), + checks=checks, + fetched_at=datetime.now(timezone.utc).isoformat(), + ) + + +def main(argv: Optional[Sequence[str]] = None) -> int: + """CLI: print the resolved production eCPS path (or JSON provenance).""" + import argparse + + parser = argparse.ArgumentParser( + description=( + "Resolve and verify the production eCPS baseline " + "(HF-published, pinned to the package version)." + ) + ) + parser.add_argument( + "--version", + default=None, + help=( + "Revision/pin to fetch (default: installed policyengine-us-data version)." + ), + ) + parser.add_argument( + "--no-verify", + action="store_true", + help="Skip the integrity gate.", + ) + parser.add_argument( + "--json", + action="store_true", + help="Print full provenance as JSON instead of just the path.", + ) + args = parser.parse_args(argv) + baseline = resolve_production_ecps(version=args.version, verify=not args.no_verify) + if args.json: + print(json.dumps(baseline.to_dict(), indent=2)) + else: + print(baseline.path) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/unit/utils/test_production_baseline.py b/tests/unit/utils/test_production_baseline.py new file mode 100644 index 000000000..4148fdbd4 --- /dev/null +++ b/tests/unit/utils/test_production_baseline.py @@ -0,0 +1,159 @@ +"""Tests for the production eCPS baseline resolver and integrity gate.""" + +from __future__ import annotations + +import h5py +import numpy as np +import pytest + +from policyengine_us_data.utils import production_baseline as pb + + +def _write_h5(path, columns): + """Write a minimal PolicyEngine-style HDF5 dataset. + + Args: + path: Output path. + columns: Mapping of column name -> 1-D array. A value that is itself + a mapping is written as a period-keyed group. + """ + with h5py.File(path, "w") as handle: + for name, value in columns.items(): + if isinstance(value, dict): + group = handle.create_group(name) + for period, arr in value.items(): + group.create_dataset(period, data=np.asarray(arr)) + else: + handle.create_dataset(name, data=np.asarray(value)) + + +def _healthy_columns(): + # Real enhanced_cps_2024.h5 stores every variable as a period-keyed group + # (e.g. social_security_retirement -> {"2024": ...}), so use that layout by + # default and exercise the realistic failure modes against it. + return { + col: {"2024": np.array([0.0, 1.0, 2.0])} + for col in pb.REQUIRED_NONZERO_COLUMNS + } + + +def test_assert_intact_passes_on_healthy_dataset(tmp_path): + path = tmp_path / "good.h5" + _write_h5(path, _healthy_columns()) + + checks = pb.assert_baseline_intact(path) + + assert checks["passed"] is True + assert checks["missing"] == [] + assert checks["all_zero"] == [] + assert checks["nonzero_counts"]["social_security_retirement"] == 2 + + +def test_assert_intact_raises_on_missing_column(tmp_path): + columns = _healthy_columns() + del columns["social_security_retirement"] + path = tmp_path / "missing.h5" + _write_h5(path, columns) + + with pytest.raises(pb.BaselineIntegrityError) as excinfo: + pb.assert_baseline_intact(path) + + assert "social_security_retirement" in str(excinfo.value) + + +def test_assert_intact_raises_on_all_zero_column(tmp_path): + # The exact recurring failure: the column exists but is all zeros. + columns = _healthy_columns() + columns["social_security_retirement"] = {"2024": np.zeros(3)} + path = tmp_path / "zeroed.h5" + _write_h5(path, columns) + + with pytest.raises(pb.BaselineIntegrityError): + pb.assert_baseline_intact(path) + + +def test_assert_intact_handles_flat_datasets(tmp_path): + # Some layouts store a variable as a bare dataset rather than a period + # group; the gate must handle both. + columns = { + col: np.array([0.0, 3.0]) for col in pb.REQUIRED_NONZERO_COLUMNS + } + path = tmp_path / "flat.h5" + _write_h5(path, columns) + + checks = pb.assert_baseline_intact(path) + + assert checks["passed"] is True + + +def test_production_pin_prefers_explicit_then_env(monkeypatch): + monkeypatch.setenv(pb.VERSION_ENV_VAR, "from-env") + assert pb.production_pin("explicit") == "explicit" + assert pb.production_pin() == "from-env" + + monkeypatch.delenv(pb.VERSION_ENV_VAR, raising=False) + # Falls back to the installed package version (a non-empty string). + assert pb.production_pin() + + +def test_resolve_returns_provenance_and_verifies(tmp_path, monkeypatch): + path = tmp_path / "resolved.h5" + _write_h5(path, _healthy_columns()) + + import huggingface_hub + + def fake_download(**kwargs): + assert kwargs["revision"] == "1.2.3" + return str(path) + + monkeypatch.setattr(huggingface_hub, "hf_hub_download", fake_download) + + baseline = pb.resolve_production_ecps(version="1.2.3") + + assert baseline.path == str(path) + assert baseline.revision == "1.2.3" + assert baseline.repo == pb.DEFAULT_REPO + assert len(baseline.sha256) == 64 + assert baseline.checks["passed"] is True + assert baseline.to_dict()["sha256"] == baseline.sha256 + + +def test_resolve_propagates_integrity_failure(tmp_path, monkeypatch): + columns = _healthy_columns() + columns["social_security_retirement"] = np.zeros(3) + path = tmp_path / "broken.h5" + _write_h5(path, columns) + + import huggingface_hub + + monkeypatch.setattr(huggingface_hub, "hf_hub_download", lambda **kw: str(path)) + + with pytest.raises(pb.BaselineIntegrityError): + pb.resolve_production_ecps(version="1.2.3") + + +def test_resolve_wraps_unpublished_revision_in_clear_error(monkeypatch): + # A pin with no matching HF tag (e.g. a dev/editable install whose version + # was never published) must fail loudly and actionably, not with an opaque + # Hub traceback. + import huggingface_hub + from unittest.mock import MagicMock + + from huggingface_hub.errors import HfHubHTTPError + + response = MagicMock() + response.headers = {} + + def boom(**kwargs): + raise HfHubHTTPError( + "404 Client Error: revision not found", response=response + ) + + monkeypatch.setattr(huggingface_hub, "hf_hub_download", boom) + + with pytest.raises(pb.BaselineIntegrityError) as excinfo: + pb.resolve_production_ecps(version="9.9.9-never-published") + + message = str(excinfo.value) + assert "9.9.9-never-published" in message + assert pb.VERSION_ENV_VAR in message