diff --git a/packages/populace-build/src/populace/build/__init__.py b/packages/populace-build/src/populace/build/__init__.py index f59fdf2..06ead5f 100644 --- a/packages/populace-build/src/populace/build/__init__.py +++ b/packages/populace-build/src/populace/build/__init__.py @@ -63,6 +63,13 @@ def _assert_frame_compatible(version: str, required: tuple[int, int]) -> None: rotated_folds, summarize_rotations, ) +from populace.build.parity_reference import ( # noqa: E402 - after the compat gate + ReferenceSpec, + gather_candidate_shares, + judge_parity, + reference_layers, + run_parity_against_reference, +) from populace.build.plan import ( # noqa: E402 - after the compat gate DonorSpec, Stage, @@ -76,15 +83,20 @@ def _assert_frame_compatible(version: str, required: tuple[int, int]) -> None: "DonorSpec", "GateReport", "GateResult", + "ReferenceSpec", "Stage", "StagePlan", "StageRecord", "aggregate_admin_gate", "exported_nonzero_gate", + "gather_candidate_shares", + "judge_parity", "parity_gate", "per_family_fit_gate", + "reference_layers", "relative_error_loss", "rotated_folds", + "run_parity_against_reference", "summarize_rotations", "support_gate", "__version__", diff --git a/packages/populace-build/src/populace/build/parity_reference.py b/packages/populace-build/src/populace/build/parity_reference.py new file mode 100644 index 0000000..690694f --- /dev/null +++ b/packages/populace-build/src/populace/build/parity_reference.py @@ -0,0 +1,475 @@ +"""Reference-pinned, recorded, reconstructable simulation-level parity. + +The parity gate is the incumbent-replacement contract: every variable layer +the reference (the enhanced CPS) populates, the candidate (populace) must +populate too — letting the rules engine compute a layer the candidate drops is +parity, dropping it to zero is a gap that silently masks the engine's formulas. +The *judging* of that contract lives in :func:`populace.build.parity_gate` and +is reused verbatim here. What this module adds is the **process discipline** +that PolicyEngine/populace issue #19 found missing in the certified US build: + +1. **The reference was unpinned.** Parity ran against a working-copy eCPS, not + a recorded revision, so a ``gaps=0`` verdict silently rotted the moment the + eCPS changed and was not reproducible. :class:`ReferenceSpec` fixes this by + identifying the reference eCPS by **sha256** (mandatory) plus either a + Hugging Face ``repo`` + ``revision`` or a local ``path``. An unpinned + reference is refused at construction — it is the exact bug being fixed. + +2. **The verdict recorded nothing about what it was judged against.** The + certified release manifest carried no gap count, no reference identity, no + skipped-layer count. :func:`judge_parity` wraps :func:`parity_gate` and + records the reference identity and the layer/skip counts in the + :class:`~populace.build.GateResult` details, so the manifest that ships the + dataset also ships the evidence for its parity claim. + +3. **The runner was deleted.** ``packages/populace-data/build/us/check_parity`` + (which gathered the simulation-level shares and invoked ``parity_gate``) was + removed from HEAD in ``fda3838``; the gate library survived but nothing + exercised it against a reference, so the contract was only reconstructable + from git history. :func:`gather_candidate_shares` re-homes that + simulation-level logic into the package, with the ``Microsimulation`` + isolated behind an injected ``calculate`` callable so the skip rules and + structural-weight handling are unit-testable without ``policyengine_us`` or + a 355 MB dataset. :func:`run_parity_against_reference` is the thin + orchestrator that wires the real sim in. + +**Release-manifest contract.** The :class:`GateResult` produced here is meant +to travel with the release: ``GateReport((result, ...)).to_manifest()`` lands +the parity verdict — reference identity, gaps, skipped count, populated-layer +counts — in the same ``gates`` block that :mod:`populace.build.trace` already +binds (by content hash) into the build's TRACE TRO. Recording it there is what +makes "parity gaps = 0" a reproducible, auditable claim rather than a +since-deleted log line. + +**Deferred to follow-up (NOT in this module).** A CI *drift-check* that +re-runs :func:`run_parity_against_reference` against the **latest published** +eCPS on every build — so reference drift (the #19 root cause: the certified +default fell behind a newer eCPS on 10 layers) fails loudly instead of going +invisible — is the natural companion and is tracked in #19. It is deliberately +not built here: this module pins, records, and reconstructs the runner; the CI +job and the data-gap closure (the missing PUF-derived credit inputs) are +separate work. +""" + +from __future__ import annotations + +import os +from collections.abc import Callable, Iterable, Mapping +from dataclasses import dataclass + +import numpy as np + +from populace.build.gates import GateResult, parity_gate +from populace.build.trace import sha256_file + +__all__ = [ + "ReferenceSpec", + "judge_parity", + "gather_candidate_shares", + "reference_layers", + "run_parity_against_reference", + "DEFAULT_PARITY_YEAR", + "STRUCTURAL_WEIGHTS", +] + +#: The build's reference year. The certified US default is a 2024 vintage; the +#: orchestrator and runner default to it, callers can override per build. +DEFAULT_PARITY_YEAR = 2024 + +#: Variables that are dataset *structure*, not value layers, and so are never +#: judged for parity: an entity weight populating is mechanical, not a signal +#: the candidate must reproduce. Popped from both share dicts before judging +#: (the deleted check_parity.py popped exactly these two). +STRUCTURAL_WEIGHTS: tuple[str, ...] = ("household_weight", "person_weight") + + +@dataclass(frozen=True) +class ReferenceSpec: + """Pins the parity reference eCPS by content hash, recorded with the verdict. + + The reference is identified one of two ways, and **always** by sha256: + + - **Hugging Face**: ``repo`` + ``revision`` + ``sha256`` — an immutable, + revision-pinned Hub artifact (``revision`` must be a commit sha, not a + moving ref like ``main``, for the pin to be immutable; this class records + whatever it is given and does not police that, but the build should pass + a sha). + - **Local**: ``path`` + ``sha256`` — a file on disk; use + :meth:`from_local_file` to hash it. + + The sha256 is mandatory in every case. An unpinned reference — a working + copy whose bytes are not recorded — is precisely the #19 bug: a ``gaps=0`` + verdict judged against it is neither reproducible nor drift-detectable. + + Attributes: + sha256: The reference file's sha256 (64 hex chars in practice; not + length-validated here so a caller may record a truncated/labelled + digest, but it must be non-empty). + repo: The Hugging Face dataset repo, for a Hub reference. + revision: The Hub revision (commit sha), for a Hub reference. + path: The local file path, for an on-disk reference. + """ + + sha256: str + repo: str | None = None + revision: str | None = None + path: str | None = None + + def __post_init__(self) -> None: + if not self.sha256: + raise ValueError( + "ReferenceSpec requires a sha256 — an unpinned parity " + "reference is the bug issue #19 fixes (a verdict judged " + "against unrecorded bytes is not reproducible). Hash the " + "reference file (ReferenceSpec.from_local_file) or record the " + "Hub revision's sha256." + ) + has_hf = self.repo is not None or self.revision is not None + has_local = self.path is not None + if has_hf and (self.repo is None or self.revision is None): + raise ValueError( + "A Hugging Face ReferenceSpec needs both repo and revision " + f"(got repo={self.repo!r}, revision={self.revision!r}); a " + "repo without a pinned revision is a moving reference." + ) + if not has_hf and not has_local: + raise ValueError( + "ReferenceSpec needs a location: either a Hugging Face " + "repo+revision or a local path (plus, always, a sha256)." + ) + + @property + def kind(self) -> str: + """``"huggingface"`` for a Hub reference, ``"local"`` for a file.""" + return "huggingface" if self.repo is not None else "local" + + @classmethod + def from_local_file( + cls, path: str | os.PathLike[str], *, chunk_size: int = 1 << 20 + ) -> ReferenceSpec: + """Build a local :class:`ReferenceSpec` by streaming-hashing ``path``. + + Reuses :func:`populace.build.trace.sha256_file` so the reference is + hashed by the same algorithm (and the same byte-chunking) the build's + TRACE provenance uses for every other artifact — one hash definition + across the build, not two. + """ + digest = sha256_file(path, chunk_size=chunk_size) + return cls(sha256=digest, path=str(os.fspath(path))) + + def to_dict(self) -> dict[str, object]: + """JSON-ready identity for recording in a :class:`GateResult`/manifest. + + Includes only the fields that apply (so a local spec records ``path``, + a Hub spec records ``repo``/``revision``), plus ``kind`` so a reader + can tell which without inferring from presence. + """ + identity: dict[str, object] = {"kind": self.kind, "sha256": self.sha256} + if self.repo is not None: + identity["repo"] = self.repo + if self.revision is not None: + identity["revision"] = self.revision + if self.path is not None: + identity["path"] = self.path + return identity + + +def judge_parity( + candidate_shares: Mapping[str, float], + reference_shares: Mapping[str, float], + reference_spec: ReferenceSpec, + *, + known_gaps: Iterable[str] = (), + skipped: Iterable[str] = (), +) -> GateResult: + """Judge parity against a pinned reference and record what it was judged on. + + Pure: takes precomputed non-zero share dicts (so it runs with no dataset + and no ``policyengine_us``), delegates the verdict to the surviving + :func:`populace.build.parity_gate` — the failure lines are that gate's, + verbatim, not reinvented here — and returns a :class:`GateResult` whose + ``details`` additionally carry: + + - ``reference``: the :meth:`ReferenceSpec.to_dict` identity (sha256 plus + repo/revision or path). This is the #19 fix: the verdict records the + exact reference it was judged against, so a later run can tell whether a + changed verdict means the candidate changed or the reference drifted. + - ``skipped`` / ``skipped_layers``: the reference layers not judged because + the engine does not register them or they are not annual (gathered by + :func:`gather_candidate_shares`), recorded so "169 of 237 layers checked" + is in the manifest, not lost. + - ``candidate_populated_layers``: how many judged layers the candidate + actually populates (a positive non-zero share), alongside + ``reference_populated_layers`` from the base gate. + + Args: + candidate_shares: Variable -> non-zero share in the candidate. + reference_shares: Variable -> non-zero share in the reference. Only + reference-populated layers are judged; candidate-only extras are + not parity failures. + reference_spec: The pinned reference identity, recorded in the result. + known_gaps: Variables exempted by name (each a documented decision), + passed straight through to :func:`parity_gate`. + skipped: Reference layers excluded before judging (non-annual or not + engine-registered), recorded for the manifest. + + Returns: + A ``"parity"`` :class:`GateResult` with the base verdict and the + recorded reference identity + counts. + """ + base = parity_gate(candidate_shares, reference_shares, known_gaps=known_gaps) + skipped_tuple = tuple(skipped) + candidate_populated = sum(1 for share in candidate_shares.values() if share > 0.0) + details: dict[str, object] = dict(base.details) + details.update( + { + "reference": reference_spec.to_dict(), + "skipped": list(skipped_tuple), + "skipped_layers": len(skipped_tuple), + "candidate_populated_layers": candidate_populated, + } + ) + # GateResult is frozen and refuses pass-with-failures / fail-without-reason; + # carry the base verdict and lines through unchanged so those invariants and + # the failure text are exactly parity_gate's. + return GateResult( + name=base.name, + passed=base.passed, + failures=base.failures, + details=details, + ) + + +def _nonzero_share(values: np.ndarray) -> float: + """Share of entries that are non-zero, as a float in [0, 1].""" + arr = np.asarray(values, dtype=np.float64) + if arr.size == 0: + return 0.0 + return float((arr != 0).mean()) + + +def gather_candidate_shares( + reference_layers: Mapping[str, float], + *, + year: int, + tax_benefit_system: object, + calculate: Callable[[str, int], np.ndarray], +) -> tuple[dict[str, float], dict[str, float], list[str]]: + """Gather candidate non-zero shares over the reference's populated layers. + + Ports the simulation-level loop from the deleted ``check_parity.py``: for + every variable the reference stores, the candidate's *simulation* must + produce a non-zero layer — stored or computed by an engine formula. A + layer the candidate drops to zero would mask the engine's own formula, so + dropping the column and letting the engine compute is parity, not a gap. + + The ``Microsimulation`` is isolated behind ``calculate`` (a plain + ``(var, year) -> ndarray`` callable). That is the seam #19's missing test + coverage needed: the skip rules and structural-weight popping are exercised + here with a fake sim, with no ``policyengine_us`` and no 355 MB dataset. + :func:`run_parity_against_reference` supplies the real + ``sim.calculate(var, year).values`` closure. + + Skip rules (a *skip* is "not judged", distinct from a measured gap): + + - a variable the engine does not register (``var not in tbs.variables``) + is skipped — the candidate's engine has no such layer to populate; + - a variable whose ``definition_period`` is not ``"year"`` is skipped — + parity is judged on annual layers (the deleted runner's rule); + - the structural entity weights (:data:`STRUCTURAL_WEIGHTS`) are popped + from both returned dicts — a weight populating is mechanical, not signal. + + A ``calculate`` that raises is **not** a skip: it is recorded as a + candidate share of ``0.0`` (a real gap), matching ``check_parity.py``, + which reported a failing ``sim.calculate`` rather than masking it. + + Args: + reference_layers: Variable -> reference non-zero share (from + :func:`reference_layers`). + year: The simulation year to ``calculate`` each variable at. + tax_benefit_system: An object exposing ``.variables`` as a mapping of + variable name -> object with a ``.definition_period`` attribute + (the real ``Microsimulation.tax_benefit_system``). + calculate: ``(variable, year) -> ndarray`` — the candidate's simulated + values for a variable (the injected sim seam). + + Returns: + ``(candidate_shares, reference_shares, skipped)`` where + ``candidate_shares`` and ``reference_shares`` are aligned over the + judged (kept, non-structural) layers, and ``skipped`` lists the + reference layers excluded by the rules above. + """ + variables = getattr(tax_benefit_system, "variables", {}) + candidate_shares: dict[str, float] = {} + reference_aligned: dict[str, float] = {} + skipped: list[str] = [] + for var, ref_share in sorted(reference_layers.items()): + if var not in variables: + skipped.append(var) + continue + if getattr(variables[var], "definition_period", None) != "year": + skipped.append(var) + continue + try: + values = calculate(var, year) + except Exception: # noqa: BLE001 - a failed formula is a gap, not a skip + candidate_shares[var] = 0.0 + reference_aligned[var] = ref_share + continue + candidate_shares[var] = _nonzero_share(values) + reference_aligned[var] = ref_share + + # Weights are structure, not layers — drop from both sides before judging. + for structural in STRUCTURAL_WEIGHTS: + candidate_shares.pop(structural, None) + reference_aligned.pop(structural, None) + + return candidate_shares, reference_aligned, skipped + + +def reference_layers( + path: str | os.PathLike[str], *, year: int = DEFAULT_PARITY_YEAR +) -> dict[str, float]: + """Read the reference eCPS's stored layers as variable -> non-zero share. + + Ports ``check_parity.py``'s ``stored_layers``. The eCPS stores each layer + in the flat ``variable/YEAR`` HDF5 layout (e.g. ``snap/2024``); this reads + every dataset for ``year``, maps it to its entity-unprefixed variable name, + and records the share of entries that are non-zero. String/object columns + (geographic codes such as ``county_fips`` stored as fixed-width bytes) are + skipped — a non-zero share is undefined for them and they are never parity + layers. + + Layout handling, year-correct so a multi-year file does not leak the wrong + year (or a year token mistaken for a variable name): + + - a trailing path component that is a 4-digit year is a ``…/YEAR`` time + layer — kept only when that year equals ``year``, recorded under the + component just before it (``snap/2024`` -> ``snap``); + - a 2-part path whose trailing component is *not* a year is the + ``entity/variable`` single-year layout the deleted runner also accepted, + recorded under the variable (``person/employment_income`` -> + ``employment_income``). + + String/object columns (geographic codes such as ``county_fips`` stored as + fixed-width bytes) are always skipped — a non-zero share is undefined for + them and they are never parity layers. + + Requires :mod:`h5py` (the ``us`` optional dependency of this package). + """ + import h5py + + year_str = str(year) + + def _is_year(token: str) -> bool: + return len(token) == 4 and token.isdigit() + + shares: dict[str, float] = {} + + def visit(name: str, node: object) -> None: + if not isinstance(node, h5py.Dataset): + return + parts = name.split("/") + if _is_year(parts[-1]): + # …/YEAR time layer: only the requested year, named by what + # precedes the year (the entity-unprefixed variable). + if parts[-1] != year_str or len(parts) < 2: + return + var = parts[-2] + elif len(parts) == 2: + # entity/variable single-year layout (no year token to filter on). + var = parts[1] + else: + var = parts[-1] + values = node[:] + if np.asarray(values).dtype.kind in ("S", "O", "U"): + return + shares[var] = _nonzero_share(np.asarray(values)) + + with h5py.File(os.fspath(path), "r") as handle: + handle.visititems(visit) + return shares + + +def run_parity_against_reference( + candidate_path: str | os.PathLike[str], + reference: str | os.PathLike[str] | ReferenceSpec, + *, + year: int = DEFAULT_PARITY_YEAR, + known_gaps: Iterable[str] = (), +) -> GateResult: + """Orchestrate gather -> judge against a pinned reference, recording it. + + The thin sim-wiring layer that the pure pieces above deliberately exclude: + + 1. resolve ``reference`` to a :class:`ReferenceSpec` — when given a path, + hash it (:meth:`ReferenceSpec.from_local_file`) so the verdict is pinned + even for an ad-hoc local reference; + 2. read the reference's stored layers (:func:`reference_layers`); + 3. build the candidate ``Microsimulation`` and gather its simulated shares + (:func:`gather_candidate_shares`), injecting + ``sim.calculate(var, year).values`` as the ``calculate`` seam; + 4. :func:`judge_parity` — verdict + recorded reference identity + counts. + + This is the only part that imports ``policyengine_us``; it is import-light + everywhere else so scorers and tests consume the gate without an engine. + The returned :class:`GateResult` is what the release manifest should carry + (via ``GateReport.to_manifest()``), so the parity claim ships with its + evidence. + + Args: + candidate_path: The built populace dataset H5 to judge. + reference: A reference eCPS H5 path (hashed here) or a pre-pinned + :class:`ReferenceSpec`. When a :class:`ReferenceSpec` carries a + local ``path`` that exists, that path is read for layers; a + Hub-only spec requires the bytes to be resolved by the caller and + is not fetched here. + year: Simulation/reference year (default :data:`DEFAULT_PARITY_YEAR`). + known_gaps: Documented per-name exemptions, passed to + :func:`judge_parity`. + + Returns: + The recorded ``"parity"`` :class:`GateResult`. + + Raises: + ValueError: If ``reference`` is a Hub-only :class:`ReferenceSpec` with + no local path to read layers from (the bytes must be materialized + first — this orchestrator does not download). + """ + from policyengine_us import Microsimulation + + if isinstance(reference, ReferenceSpec): + reference_spec = reference + reference_layer_path = reference.path + if reference_layer_path is None: + raise ValueError( + "run_parity_against_reference needs local bytes to read the " + "reference layers; the given ReferenceSpec is Hub-only " + f"(repo={reference.repo!r}). Download the pinned revision and " + "pass its local path (or a ReferenceSpec with .path set)." + ) + else: + reference_layer_path = os.fspath(reference) + reference_spec = ReferenceSpec.from_local_file(reference_layer_path) + + ref_layers = reference_layers(reference_layer_path, year=year) + + sim = Microsimulation(dataset=str(candidate_path)) + tbs = sim.tax_benefit_system + + def calculate(variable: str, period: int) -> np.ndarray: + return np.asarray(sim.calculate(variable, period).values, dtype=np.float64) + + candidate_shares, reference_aligned, skipped = gather_candidate_shares( + ref_layers, + year=year, + tax_benefit_system=tbs, + calculate=calculate, + ) + return judge_parity( + candidate_shares=candidate_shares, + reference_shares=reference_aligned, + reference_spec=reference_spec, + known_gaps=known_gaps, + skipped=skipped, + ) diff --git a/packages/populace-build/tests/test_parity_reference.py b/packages/populace-build/tests/test_parity_reference.py new file mode 100644 index 0000000..b568258 --- /dev/null +++ b/packages/populace-build/tests/test_parity_reference.py @@ -0,0 +1,406 @@ +"""Reference-pinned, recorded, reconstructable parity (issue #19). + +The bug these tests pin down: parity ``gaps=0`` was certified against an +*unpinned* enhanced-CPS reference, the release manifest recorded no gate +result, and the runner that produced the verdict was later deleted — so the +verdict was neither reproducible nor auditable. The fixes under test: + +- a :class:`~populace.build.parity_reference.ReferenceSpec` that refuses to + identify a reference without a sha256 (an unpinned reference is the bug); +- :func:`~populace.build.parity_reference.judge_parity`, a pure function that + reuses the surviving :func:`populace.build.parity_gate` and **records the + reference identity** (repo/revision/sha256 or path/sha256) plus the + gap/skip/populated-layer counts in the :class:`GateResult` details — so the + verdict carries what it was judged against; +- :func:`~populace.build.parity_reference.gather_candidate_shares`, the + simulation-level share gathering ported from the deleted ``check_parity.py``, + with the ``Microsimulation`` injected as a plain callable so the skip logic + and structural-weight popping are testable **without** ``policyengine_us`` or + a 355 MB dataset; +- a round-trip: a judged :class:`GateResult` serializes into a manifest-style + dict that carries the reference identity, closing manifest-gap #2. + +Every test here runs with no network and no large files. +""" + +from __future__ import annotations + +import hashlib + +import numpy as np +import pytest + +from populace.build import GateResult +from populace.build.parity_reference import ( + ReferenceSpec, + gather_candidate_shares, + judge_parity, + reference_layers, +) + +# A real eCPS sha256 is 64 hex chars; use a fixed illustrative digest so the +# recorded identity is checkable byte-for-byte. +_SHA = hashlib.sha256(b"enhanced_cps_2024.h5 reference bytes").hexdigest() + + +class TestReferenceSpec: + def test_local_spec_records_path_and_sha(self) -> None: + spec = ReferenceSpec(path="storage/enhanced_cps_2024.h5", sha256=_SHA) + assert spec.sha256 == _SHA + identity = spec.to_dict() + assert identity["path"] == "storage/enhanced_cps_2024.h5" + assert identity["sha256"] == _SHA + assert identity["kind"] == "local" + + def test_hf_spec_records_repo_revision_and_sha(self) -> None: + spec = ReferenceSpec( + repo="policyengine/policyengine-us-data", + revision="4a8e7d39eb9e", + sha256=_SHA, + ) + identity = spec.to_dict() + assert identity["repo"] == "policyengine/policyengine-us-data" + assert identity["revision"] == "4a8e7d39eb9e" + assert identity["sha256"] == _SHA + assert identity["kind"] == "huggingface" + + def test_missing_sha256_is_refused(self) -> None: + # The whole point of #19: an unpinned reference is the bug. + with pytest.raises(ValueError, match="sha256"): + ReferenceSpec(path="storage/enhanced_cps_2024.h5", sha256="") + + def test_no_sha256_argument_is_refused(self) -> None: + with pytest.raises(ValueError, match="sha256"): + ReferenceSpec(repo="r", revision="rev", sha256=None) # type: ignore[arg-type] + + def test_hf_without_repo_or_revision_is_refused(self) -> None: + with pytest.raises(ValueError, match="repo.*revision|revision.*repo"): + ReferenceSpec(repo="policyengine/x", sha256=_SHA) # revision missing + + def test_local_without_path_or_hf_without_repo_is_refused(self) -> None: + with pytest.raises(ValueError, match="path|repo"): + ReferenceSpec(sha256=_SHA) # neither local path nor HF repo + + def test_from_local_file_hashes_the_bytes(self, tmp_path) -> None: + payload = b"enhanced cps reference bytes" * 1000 + ref = tmp_path / "enhanced_cps_2024.h5" + ref.write_bytes(payload) + spec = ReferenceSpec.from_local_file(ref) + assert spec.sha256 == hashlib.sha256(payload).hexdigest() + assert spec.path == str(ref) + assert spec.kind == "local" + + +class TestJudgeParityRecordsReference: + def _spec(self) -> ReferenceSpec: + return ReferenceSpec( + repo="policyengine/policyengine-us-data", + revision="4a8e7d39eb9e", + sha256=_SHA, + ) + + def test_gap_when_reference_populates_a_layer_candidate_is_zero(self) -> None: + result = judge_parity( + candidate_shares={"snap": 0.1, "general_business_credit": 0.0}, + reference_shares={"snap": 0.12, "general_business_credit": 0.054}, + reference_spec=self._spec(), + ) + assert not result.passed + assert "general_business_credit" in result.failures[0] + + def test_pass_when_candidate_populates_every_reference_layer(self) -> None: + result = judge_parity( + candidate_shares={"snap": 0.1, "net_worth": 0.5}, + reference_shares={"snap": 0.12, "net_worth": 0.9}, + reference_spec=self._spec(), + ) + assert result.passed + assert result.details["gaps"] == 0 + + def test_known_gap_exemption_is_honored_and_recorded(self) -> None: + result = judge_parity( + candidate_shares={"snap": 0.1, "amt_foreign_tax_credit": 0.0}, + reference_shares={"snap": 0.12, "amt_foreign_tax_credit": 0.093}, + reference_spec=self._spec(), + known_gaps=["amt_foreign_tax_credit"], + ) + assert result.passed + assert result.details["exempted"] == ["amt_foreign_tax_credit"] + + def test_reference_identity_is_recorded_in_details(self) -> None: + result = judge_parity( + candidate_shares={"snap": 0.1}, + reference_shares={"snap": 0.12}, + reference_spec=self._spec(), + ) + ref = result.details["reference"] + assert ref["sha256"] == _SHA + assert ref["repo"] == "policyengine/policyengine-us-data" + assert ref["revision"] == "4a8e7d39eb9e" + assert ref["kind"] == "huggingface" + + def test_local_reference_identity_is_recorded(self) -> None: + spec = ReferenceSpec(path="storage/enhanced_cps_2024.h5", sha256=_SHA) + result = judge_parity( + candidate_shares={"snap": 0.1}, + reference_shares={"snap": 0.12}, + reference_spec=spec, + ) + ref = result.details["reference"] + assert ref["path"] == "storage/enhanced_cps_2024.h5" + assert ref["sha256"] == _SHA + + def test_layer_and_skipped_counts_are_recorded(self) -> None: + result = judge_parity( + candidate_shares={"snap": 0.1, "net_worth": 0.5}, + reference_shares={"snap": 0.12, "net_worth": 0.9, "vacuous": 0.0}, + reference_spec=self._spec(), + skipped=("non_annual_var", "unregistered_var"), + ) + # populated reference layers: snap + net_worth (vacuous is 0%) + assert result.details["reference_populated_layers"] == 2 + # candidate layers we actually measured a non-zero share for + assert result.details["candidate_populated_layers"] == 2 + assert result.details["skipped_layers"] == 2 + assert list(result.details["skipped"]) == [ + "non_annual_var", + "unregistered_var", + ] + + def test_judge_returns_a_gateresult_named_parity(self) -> None: + result = judge_parity( + candidate_shares={"snap": 0.1}, + reference_shares={"snap": 0.12}, + reference_spec=self._spec(), + ) + assert isinstance(result, GateResult) + assert result.name == "parity" + + def test_judge_preserves_parity_gate_failure_text(self) -> None: + # The recorded result must not invent its own caveats: failures come + # verbatim from the surviving parity_gate. + from populace.build import parity_gate + + cand = {"snap": 0.0} + ref = {"snap": 0.12} + bare = parity_gate(cand, ref) + recorded = judge_parity( + candidate_shares=cand, reference_shares=ref, reference_spec=self._spec() + ) + assert recorded.failures == bare.failures + + +class TestGatherCandidateShares: + """Port of check_parity.py's simulation loop, sim injected as a callable.""" + + class _FakeVariable: + def __init__(self, definition_period: str) -> None: + self.definition_period = definition_period + + def _tbs(self): + """A stub tax-benefit system: a few annual vars, one monthly, and + deliberately *no* entry for the unregistered reference layer.""" + + class _TBS: + variables = { + "snap": TestGatherCandidateShares._FakeVariable("year"), + "net_worth": TestGatherCandidateShares._FakeVariable("year"), + "household_weight": TestGatherCandidateShares._FakeVariable("year"), + "person_weight": TestGatherCandidateShares._FakeVariable("year"), + "monthly_thing": TestGatherCandidateShares._FakeVariable("month"), + } + + return _TBS() + + def _calculate(self, shares_by_var): + """A fake sim: returns arrays whose non-zero share matches the spec.""" + + def calculate(var: str, year: int) -> np.ndarray: + n = 10 + nonzero = round(shares_by_var[var] * n) + return np.asarray([1.0] * nonzero + [0.0] * (n - nonzero)) + + return calculate + + def test_unregistered_and_non_annual_layers_are_skipped(self) -> None: + reference = { + "snap": 0.12, + "net_worth": 0.9, + "monthly_thing": 0.5, # non-annual -> skipped + "amt_foreign_tax_credit": 0.093, # not in tbs.variables -> skipped + } + candidate, ref_out, skipped = gather_candidate_shares( + reference, + year=2024, + tax_benefit_system=self._tbs(), + calculate=self._calculate({"snap": 0.1, "net_worth": 0.5}), + ) + assert set(skipped) == {"monthly_thing", "amt_foreign_tax_credit"} + assert set(candidate) == {"snap", "net_worth"} + # the reference shares returned are aligned to the kept (non-skipped, + # non-structural) layers only + assert set(ref_out) == {"snap", "net_worth"} + assert candidate["snap"] == pytest.approx(0.1) + assert candidate["net_worth"] == pytest.approx(0.5) + assert ref_out["snap"] == pytest.approx(0.12) + + def test_structural_weights_are_popped(self) -> None: + reference = { + "snap": 0.12, + "household_weight": 1.0, + "person_weight": 1.0, + } + candidate, ref_out, skipped = gather_candidate_shares( + reference, + year=2024, + tax_benefit_system=self._tbs(), + calculate=self._calculate( + {"snap": 0.1, "household_weight": 1.0, "person_weight": 1.0} + ), + ) + assert "household_weight" not in candidate + assert "person_weight" not in candidate + assert "household_weight" not in ref_out + assert "person_weight" not in ref_out + assert "snap" in candidate + + def test_calc_failure_records_zero_share_not_a_crash(self) -> None: + # check_parity.py reported a failing sim.calculate as candidate 0.0 + # (a real gap) rather than masking it; the port keeps that contract. + def calculate(var: str, year: int) -> np.ndarray: + if var == "net_worth": + raise RuntimeError("formula blew up") + return np.asarray([1.0, 0.0]) + + candidate, ref_out, skipped = gather_candidate_shares( + {"snap": 0.12, "net_worth": 0.9}, + year=2024, + tax_benefit_system=self._tbs(), + calculate=calculate, + ) + assert candidate["net_worth"] == 0.0 + assert ref_out["net_worth"] == 0.9 + # and it is NOT counted as skipped — it is a measured (failed) layer + assert "net_worth" not in skipped + + def test_gather_then_judge_finds_the_gap(self) -> None: + # End-to-end of the pure half: gather over a fake sim, judge with a + # pinned spec, get a recorded gap. + reference = {"snap": 0.12, "net_worth": 0.9} + candidate, ref_out, skipped = gather_candidate_shares( + reference, + year=2024, + tax_benefit_system=self._tbs(), + calculate=self._calculate({"snap": 0.1, "net_worth": 0.0}), + ) + result = judge_parity( + candidate_shares=candidate, + reference_shares=ref_out, + reference_spec=ReferenceSpec(path="x.h5", sha256=_SHA), + skipped=skipped, + ) + assert not result.passed + assert "net_worth" in result.failures[0] + assert result.details["reference"]["sha256"] == _SHA + + +class TestReferenceLayers: + """Port of check_parity.py's stored_layers: flat var/YEAR in an H5 file.""" + + def test_reads_flat_var_year_layout(self, tmp_path) -> None: + h5py = pytest.importorskip("h5py") + path = tmp_path / "ref.h5" + with h5py.File(path, "w") as f: + # var/YEAR layout, the real eCPS shape + f.create_dataset("snap/2024", data=np.asarray([0.0, 1.0, 2.0, 0.0])) + f.create_dataset("net_worth/2024", data=np.asarray([10.0, 20.0, 0.0, 5.0])) + f.create_dataset("all_zero/2024", data=np.zeros(4)) + layers = reference_layers(path, year=2024) + assert layers["snap"] == pytest.approx(0.5) # 2 of 4 non-zero + assert layers["net_worth"] == pytest.approx(0.75) # 3 of 4 + assert layers["all_zero"] == 0.0 + + def test_string_columns_are_skipped(self, tmp_path) -> None: + h5py = pytest.importorskip("h5py") + path = tmp_path / "ref.h5" + with h5py.File(path, "w") as f: + f.create_dataset("snap/2024", data=np.asarray([1.0, 0.0])) + f.create_dataset( + "county_fips/2024", + data=np.asarray([b"06037", b"36061"], dtype="S5"), + ) + layers = reference_layers(path, year=2024) + assert "snap" in layers + assert "county_fips" not in layers # |S5 string column dropped + + def test_only_the_requested_year_is_read(self, tmp_path) -> None: + h5py = pytest.importorskip("h5py") + path = tmp_path / "ref.h5" + with h5py.File(path, "w") as f: + f.create_dataset("snap/2024", data=np.asarray([1.0, 0.0])) + f.create_dataset("snap/2023", data=np.asarray([1.0, 1.0])) + layers = reference_layers(path, year=2024) + assert layers["snap"] == pytest.approx(0.5) # the 2024 layer, not 2023 + # The off-year layer must not leak a spurious variable: a multi-year + # file must not record the wrong year, nor a year token as a var name. + assert set(layers) == {"snap"} + assert "2023" not in layers + + def test_entity_variable_single_year_layout(self, tmp_path) -> None: + # The other layout the deleted runner accepted: entity/variable with + # no year token. The trailing component is the variable. + h5py = pytest.importorskip("h5py") + path = tmp_path / "ref.h5" + with h5py.File(path, "w") as f: + f.create_dataset( + "person/employment_income", data=np.asarray([1.0, 0.0, 2.0, 0.0]) + ) + layers = reference_layers(path, year=2024) + assert layers["employment_income"] == pytest.approx(0.5) + + +class TestPackageReexports: + def test_symbols_are_importable_from_populace_build(self) -> None: + # gates/holdout/plan are re-exported at the package root; the parity + # reference surface joins them so callers import one place. + import populace.build as build + + for name in ( + "ReferenceSpec", + "judge_parity", + "gather_candidate_shares", + "reference_layers", + "run_parity_against_reference", + ): + assert hasattr(build, name) + assert name in build.__all__ + + +class TestManifestRoundTrip: + def test_recorded_result_serializes_into_a_manifest_dict(self) -> None: + from populace.build import GateReport + + spec = ReferenceSpec( + repo="policyengine/policyengine-us-data", + revision="4a8e7d39eb9e", + sha256=_SHA, + ) + result = judge_parity( + candidate_shares={"snap": 0.1, "net_worth": 0.5}, + reference_shares={"snap": 0.12, "net_worth": 0.9}, + reference_spec=spec, + skipped=("monthly_thing",), + ) + manifest = GateReport((result,)).to_manifest() + parity = manifest["gates"]["parity"] + assert parity["passed"] is True + # The reference identity survives the round-trip into the manifest: + # this is exactly the gap #19 calls out (the certified manifest + # recorded no reference identity). + assert parity["details"]["reference"]["sha256"] == _SHA + assert parity["details"]["reference"]["revision"] == "4a8e7d39eb9e" + assert parity["details"]["skipped_layers"] == 1 + # JSON-roundtrippable (no non-serializable objects leaked into details) + import json + + json.loads(json.dumps(manifest))