diff --git a/changelog.d/1170.fixed.md b/changelog.d/1170.fixed.md new file mode 100644 index 000000000..886154e2d --- /dev/null +++ b/changelog.d/1170.fixed.md @@ -0,0 +1 @@ +Impute PUF-only variables onto positive-weight CPS records with a PUF-weighted, income-conditioned QRF draw instead of reusing the unweighted, demographic-only draw built for the zero-weight clone half. The old shared draw oversampled rich PUF donors onto real CPS records, inflating charitable donations and other PUF-sourced deduction inputs by an order of magnitude. The clone half keeps its rich-preserving draw, and the new real-half training frame respects the Forbes/top-tail donor exclusions. diff --git a/policyengine_us_data/calibration/puf_impute.py b/policyengine_us_data/calibration/puf_impute.py index de7498178..ed6e4db7a 100644 --- a/policyengine_us_data/calibration/puf_impute.py +++ b/policyengine_us_data/calibration/puf_impute.py @@ -80,6 +80,34 @@ "is_tax_unit_dependent", ] +REAL_HALF_INCOME_PREDICTORS = [ + "adjusted_gross_income", + "employment_income", + "self_employment_income", + "taxable_interest_income", + "tax_exempt_interest_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + "short_term_capital_gains", + "long_term_capital_gains", + "rental_income", + "farm_income", + "taxable_pension_income", + "tax_exempt_pension_income", + "taxable_private_pension_income", + "tax_exempt_private_pension_income", + "taxable_ira_distributions", + "tax_exempt_ira_distributions", + "taxable_unemployment_compensation", + "social_security", + "social_security_retirement", + "social_security_disability", + "social_security_survivors", + "social_security_dependents", +] + +PUF_WEIGHT_COLUMN = "puf_sample_weight" + IMPUTED_VARIABLES = [ "employment_income", "partnership_s_corp_income", @@ -548,13 +576,21 @@ def puf_clone_dataset( y_full = None y_override = None + y_real_full = None + y_real_override = None if not skip_qrf and puf_dataset is not None: - y_full, y_override = _run_qrf_imputation( + qrf_result = _run_qrf_imputation( data, time_period, puf_dataset, dataset_path=dataset_path, ) + if len(qrf_result) == 2: + # Backwards-compatible shape for older tests/callers. + y_full, y_override = qrf_result + y_real_full, y_real_override = y_full, y_override + else: + y_full, y_override, y_real_full, y_real_override = qrf_result cps_sim = None tbs = None @@ -596,8 +632,14 @@ def _map_to_entity(pred_values, variable_name): values = time_dict[time_period] if variable in OVERRIDDEN_IMPUTED_VARIABLES and y_override: - pred = _map_to_entity(y_override[variable], variable) - new_data[variable] = {time_period: np.concatenate([pred, pred])} + real_source = ( + y_real_override + if y_real_override and variable in y_real_override + else y_override + ) + real_pred = _map_to_entity(real_source[variable], variable) + puf_pred = _map_to_entity(y_override[variable], variable) + new_data[variable] = {time_period: np.concatenate([real_pred, puf_pred])} elif variable in IMPUTED_VARIABLES and y_full: pred = _map_to_entity(y_full[variable], variable) new_data[variable] = {time_period: np.concatenate([values, pred])} @@ -655,8 +697,21 @@ def _map_to_entity(pred_values, variable_name): if y_full: for var in IMPUTED_VARIABLES: if var not in data: - pred = _map_to_entity(y_full[var], var) - new_data[var] = {time_period: np.concatenate([pred, pred])} + if var in OVERRIDDEN_IMPUTED_VARIABLES and y_override: + real_source = ( + y_real_override + if y_real_override and var in y_real_override + else y_override + ) + puf_source = y_override + else: + real_source = ( + y_real_full if y_real_full and var in y_real_full else y_full + ) + puf_source = y_full + real_pred = _map_to_entity(real_source[var], var) + puf_pred = _map_to_entity(puf_source[var], var) + new_data[var] = {time_period: np.concatenate([real_pred, puf_pred])} if cps_sim is not None: del cps_sim @@ -944,8 +999,14 @@ def _run_qrf_imputation( demographic predictors via Microsimulation. Returns: - Tuple of (y_full_imputations, y_override_imputations) - as dicts of {variable: np.ndarray}. + Tuple of: + * y_full_imputations: old rich-preserving ghost-half draws + * y_override_imputations: old rich-preserving ghost-half draws + for variables that override CPS values + * y_real_full_imputations: weighted, income-conditioned draws + for positive-weight CPS records + * y_real_override_imputations: weighted, income-conditioned draws + for override variables on positive-weight CPS records """ from policyengine_us import Microsimulation @@ -955,6 +1016,7 @@ def _run_qrf_imputation( puf_agi = puf_sim.calculate("adjusted_gross_income", map_to="person").values puf_data = puf_sim.dataset.load_dataset() + puf_weight = puf_sim.calculate("household_weight", map_to="person").values X_train_full = puf_sim.calculate_dataframe( DEMOGRAPHIC_PREDICTORS + IMPUTED_VARIABLES @@ -964,6 +1026,22 @@ def _run_qrf_imputation( DEMOGRAPHIC_PREDICTORS + OVERRIDDEN_IMPUTED_VARIABLES ) + real_full_targets = [var for var in IMPUTED_VARIABLES if var not in data] + real_override_targets = list(OVERRIDDEN_IMPUTED_VARIABLES) + real_income_predictors = [ + var + for var in REAL_HALF_INCOME_PREDICTORS + if var not in set(real_full_targets) | set(real_override_targets) + ] + real_predictors = DEMOGRAPHIC_PREDICTORS + real_income_predictors + real_training_cols = list( + dict.fromkeys(real_predictors + real_full_targets + real_override_targets) + ) + X_train_real = puf_sim.calculate_dataframe(real_training_cols) + # Keep all person rows for now so the Forbes/top-tail training mask + # below stays index-aligned; the positive-weight filter happens after. + X_train_real[PUF_WEIGHT_COLUMN] = np.asarray(puf_weight, dtype=np.float64) + del puf_sim tax_unit_ids = _period_array(puf_data, "tax_unit_id", time_period) @@ -1002,8 +1080,10 @@ def _run_qrf_imputation( >= top_tail_threshold ) if len(forbes_person_mask) == len(puf_agi) and forbes_person_mask.any(): - if len(X_train_full) != len(forbes_person_mask) or len(X_train_override) != len( - forbes_person_mask + if ( + len(X_train_full) != len(forbes_person_mask) + or len(X_train_override) != len(forbes_person_mask) + or len(X_train_real) != len(forbes_person_mask) ): logger.warning( "Skipping Forbes donor exclusion because QRF training " @@ -1022,6 +1102,7 @@ def _run_qrf_imputation( X_train_override = X_train_override.loc[non_forbes_mask].reset_index( drop=True ) + X_train_real = X_train_real.loc[non_forbes_mask].reset_index(drop=True) sub_idx = _stratified_subsample_index(puf_agi) _log_stratified_subsample( @@ -1033,15 +1114,35 @@ def _run_qrf_imputation( X_train_full = X_train_full.iloc[sub_idx].reset_index(drop=True) X_train_override = X_train_override.iloc[sub_idx].reset_index(drop=True) + X_train_real = X_train_real.loc[ + np.asarray(X_train_real[PUF_WEIGHT_COLUMN], dtype=np.float64) > 0 + ].reset_index(drop=True) + if dataset_path is not None: cps_sim = Microsimulation(dataset=dataset_path) X_test = cps_sim.calculate_dataframe(DEMOGRAPHIC_PREDICTORS) + valid_real_predictors = [ + predictor + for predictor in real_predictors + if predictor in cps_sim.tax_benefit_system.variables + ] + X_test_real = cps_sim.calculate_dataframe(valid_real_predictors) del cps_sim else: X_test = pd.DataFrame() for pred in DEMOGRAPHIC_PREDICTORS: if pred in data: X_test[pred] = data[pred][time_period].astype(np.float32) + X_test_real = pd.DataFrame(index=X_test.index) + for pred in real_predictors: + if pred in data: + X_test_real[pred] = data[pred][time_period].astype(np.float32) + + for pred in DEMOGRAPHIC_PREDICTORS: + if pred not in X_test_real and pred in X_test: + X_test_real[pred] = X_test[pred] + + real_predictors = [pred for pred in real_predictors if pred in X_test_real] logger.info("Imputing %d PUF variables (full)", len(IMPUTED_VARIABLES)) y_full = _sequential_qrf( @@ -1059,7 +1160,39 @@ def _run_qrf_imputation( OVERRIDDEN_IMPUTED_VARIABLES, ) - return y_full, y_override + logger.info( + "Imputing %d PUF variables on real CPS half with %d predictors", + len(real_full_targets), + len(real_predictors), + ) + y_real_full = ( + _sequential_qrf( + X_train_real, + X_test_real, + real_predictors, + real_full_targets, + weight_col=PUF_WEIGHT_COLUMN, + max_train_samples=PUF_SUBSAMPLE_TARGET, + ) + if real_full_targets + else {} + ) + + logger.info( + "Imputing %d override variables on real CPS half with %d predictors", + len(real_override_targets), + len(real_predictors), + ) + y_real_override = _sequential_qrf( + X_train_real, + X_test_real, + real_predictors, + real_override_targets, + weight_col=PUF_WEIGHT_COLUMN, + max_train_samples=PUF_SUBSAMPLE_TARGET, + ) + + return y_full, y_override, y_real_full, y_real_override def _period_array( @@ -1187,6 +1320,8 @@ def _sequential_qrf( X_test: pd.DataFrame, predictors: List[str], output_vars: List[str], + weight_col: Optional[str] = None, + max_train_samples: Optional[int] = None, ) -> Dict[str, np.ndarray]: """Run a single sequential QRF preserving covariance. @@ -1209,12 +1344,14 @@ def _sequential_qrf( qrf = QRF( log_level="INFO", memory_efficient=True, + max_train_samples=max_train_samples, ) predictions = qrf.fit_predict( X_train=X_train, X_test=X_test, predictors=predictors, imputed_variables=output_vars, + weight_col=weight_col, n_jobs=1, ) diff --git a/pyproject.toml b/pyproject.toml index f9dff0c7b..f6028b35b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ classifiers = [ "Programming Language :: Python :: 3.14", ] dependencies = [ - "policyengine-us==1.715.3", + "policyengine-us==1.726.0", # policyengine-core 3.26.1 is the current 3.26.x runtime and includes the fix for # PolicyEngine/policyengine-core#482 (user-set ETERNITY inputs lost # after _invalidate_all_caches) and is required by policyengine-us 1.682.1+. diff --git a/tests/unit/calibration/test_calibration_puf_impute.py b/tests/unit/calibration/test_calibration_puf_impute.py index de6fa4c90..d3d95e5fe 100644 --- a/tests/unit/calibration/test_calibration_puf_impute.py +++ b/tests/unit/calibration/test_calibration_puf_impute.py @@ -12,6 +12,8 @@ DEMOGRAPHIC_PREDICTORS, IMPUTED_VARIABLES, OVERRIDDEN_IMPUTED_VARIABLES, + PUF_WEIGHT_COLUMN, + REAL_HALF_INCOME_PREDICTORS, _forbes_person_training_mask, _impute_retirement_contributions, _impute_weeks_unemployed, @@ -269,17 +271,22 @@ def fake_run_qrf_imputation(*args, **kwargs): for var in PUF_REPORTED_CALCULATED_TAX_OUTPUT_VARIABLES: assert var not in result - def test_puf_only_variables_are_imputed_onto_cps_half(self, monkeypatch): + def test_puf_only_variables_use_separate_real_and_clone_draws(self, monkeypatch): data = _make_mock_data(n_persons=20, n_households=5) assert "partnership_s_corp_income" not in data - predictions = np.arange(20, dtype=np.float32) + 100 + real_predictions = np.arange(20, dtype=np.float32) + 10 + clone_predictions = np.arange(20, dtype=np.float32) + 100 y_full = {var: np.ones(20, dtype=np.float32) for var in IMPUTED_VARIABLES} - y_full["partnership_s_corp_income"] = predictions + y_full["partnership_s_corp_income"] = clone_predictions y_full["employment_income"] = np.full(20, 999_999, dtype=np.float32) + y_real_full = { + var: np.ones(20, dtype=np.float32) * 2 for var in IMPUTED_VARIABLES + } + y_real_full["partnership_s_corp_income"] = real_predictions def fake_run_qrf_imputation(*args, **kwargs): - return y_full, {} + return y_full, {}, y_real_full, {} monkeypatch.setattr( puf_impute_module, @@ -296,13 +303,50 @@ def fake_run_qrf_imputation(*args, **kwargs): ) partnership = result["partnership_s_corp_income"][2024] - np.testing.assert_array_equal(partnership[:20], predictions) - np.testing.assert_array_equal(partnership[20:], predictions) + np.testing.assert_array_equal(partnership[:20], real_predictions) + np.testing.assert_array_equal(partnership[20:], clone_predictions) employment = result["employment_income"][2024] np.testing.assert_array_equal(employment[:20], data["employment_income"][2024]) np.testing.assert_array_equal(employment[20:], y_full["employment_income"]) + def test_overridden_variables_use_separate_real_and_clone_draws(self, monkeypatch): + data = _make_mock_data(n_persons=20, n_households=5) + data["charitable_cash_donations"] = {2024: np.zeros(20, dtype=np.float32)} + + y_full = {var: np.ones(20, dtype=np.float32) for var in IMPUTED_VARIABLES} + y_override = { + var: np.ones(20, dtype=np.float32) * 100 + for var in OVERRIDDEN_IMPUTED_VARIABLES + } + y_real_override = { + var: np.ones(20, dtype=np.float32) * 10 + for var in OVERRIDDEN_IMPUTED_VARIABLES + } + y_override["charitable_cash_donations"] = np.arange(20) + 1_000 + y_real_override["charitable_cash_donations"] = np.arange(20) + 100 + + def fake_run_qrf_imputation(*args, **kwargs): + return y_full, y_override, {}, y_real_override + + monkeypatch.setattr( + puf_impute_module, + "_run_qrf_imputation", + fake_run_qrf_imputation, + ) + + result = puf_clone_dataset( + data=data, + state_fips=np.array([1, 2, 36, 6, 48]), + time_period=2024, + puf_dataset=object(), + skip_qrf=False, + ) + + donations = result["charitable_cash_donations"][2024] + np.testing.assert_array_equal(donations[:20], np.arange(20) + 100) + np.testing.assert_array_equal(donations[20:], np.arange(20) + 1_000) + def test_sstb_qbi_split_variables_imputed(self): expected = { "sstb_self_employment_income", @@ -430,6 +474,8 @@ def __init__(self, dataset): def calculate(self, variable, map_to=None): assert map_to == "person" + if variable == "household_weight": + return pd.Series([100.0, 100.0, 100.0, 100.0]) assert variable == "adjusted_gross_income" return pd.Series([10.0, 30_000_000.0, 20.0, 30.0]) @@ -442,7 +488,14 @@ def calculate_dataframe(self, columns): train_frames = [] - def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + def fake_sequential_qrf( + X_train, + X_test, + predictors, + output_vars, + weight_col=None, + max_train_samples=None, + ): train_frames.append(X_train.copy()) return {variable: np.zeros(len(X_test)) for variable in output_vars} @@ -464,7 +517,7 @@ def fake_sequential_qrf(X_train, X_test, predictors, output_vars): dataset_path=None, ) - assert len(train_frames) == 2 + assert len(train_frames) == 4 assert all(len(frame) == 3 for frame in train_frames) assert all(99.0 not in set(frame["age"]) for frame in train_frames) @@ -486,6 +539,8 @@ def calculate(self, variable, map_to=None): assert map_to == "person" if variable == "person_weight": return pd.Series([100.0, 0.13, 100.0, 100.0]) + if variable == "household_weight": + return pd.Series([100.0, 100.0, 100.0, 100.0]) assert variable == "adjusted_gross_income" return pd.Series([10.0, 1_000_000_000.0, 20.0, 30.0]) @@ -498,7 +553,14 @@ def calculate_dataframe(self, columns): train_frames = [] - def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + def fake_sequential_qrf( + X_train, + X_test, + predictors, + output_vars, + weight_col=None, + max_train_samples=None, + ): train_frames.append(X_train.copy()) return {variable: np.zeros(len(X_test)) for variable in output_vars} @@ -520,7 +582,7 @@ def fake_sequential_qrf(X_train, X_test, predictors, output_vars): dataset_path=None, ) - assert len(train_frames) == 2 + assert len(train_frames) == 4 assert all(len(frame) == 3 for frame in train_frames) assert all(99.0 not in set(frame["age"]) for frame in train_frames) @@ -540,6 +602,8 @@ def __init__(self, dataset): def calculate(self, variable, map_to=None): assert map_to == "person" + if variable == "household_weight": + return pd.Series([100.0, 100.0, 100.0, 100.0]) assert variable == "adjusted_gross_income" return pd.Series([10.0, 20.0, 30.0, 40.0]) @@ -562,7 +626,14 @@ def calculate_dataframe(self, columns): train_frames = [] - def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + def fake_sequential_qrf( + X_train, + X_test, + predictors, + output_vars, + weight_col=None, + max_train_samples=None, + ): train_frames.append(X_train.copy()) return {variable: np.zeros(len(X_test)) for variable in output_vars} @@ -584,7 +655,7 @@ def fake_sequential_qrf(X_train, X_test, predictors, output_vars): dataset_path=None, ) - assert len(train_frames) == 2 + assert len(train_frames) == 4 assert all(len(frame) == 3 for frame in train_frames) assert all(99.0 not in set(frame["age"]) for frame in train_frames) @@ -604,6 +675,8 @@ def __init__(self, dataset): def calculate(self, variable, map_to=None): assert map_to == "person" + if variable == "household_weight": + return pd.Series([100.0, 100.0, 100.0, 100.0]) assert variable == "adjusted_gross_income" return pd.Series([10.0, 30_000_000.0, 20.0, 30.0]) @@ -616,7 +689,14 @@ def calculate_dataframe(self, columns): train_frames = [] - def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + def fake_sequential_qrf( + X_train, + X_test, + predictors, + output_vars, + weight_col=None, + max_train_samples=None, + ): train_frames.append(X_train.copy()) return {variable: np.zeros(len(X_test)) for variable in output_vars} @@ -638,7 +718,7 @@ def fake_sequential_qrf(X_train, X_test, predictors, output_vars): dataset_path=None, ) - assert len(train_frames) == 2 + assert len(train_frames) == 4 assert all(len(frame) == 3 for frame in train_frames) assert all(99.0 not in set(frame["age"]) for frame in train_frames) @@ -659,6 +739,8 @@ def __init__(self, dataset): def calculate(self, variable, map_to=None): assert map_to == "person" + if variable == "household_weight": + return pd.Series([100.0, 100.0, 100.0, 100.0]) assert variable == "adjusted_gross_income" return pd.Series([10.0, 30_000_000.0, 20.0, 30.0]) @@ -671,7 +753,14 @@ def calculate_dataframe(self, columns): train_frames = [] - def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + def fake_sequential_qrf( + X_train, + X_test, + predictors, + output_vars, + weight_col=None, + max_train_samples=None, + ): train_frames.append(X_train.copy()) return {variable: np.zeros(len(X_test)) for variable in output_vars} @@ -693,7 +782,7 @@ def fake_sequential_qrf(X_train, X_test, predictors, output_vars): dataset_path=None, ) - assert len(train_frames) == 2 + assert len(train_frames) == 4 assert all(len(frame) == 3 for frame in train_frames) assert all(99.0 not in set(frame["age"]) for frame in train_frames) @@ -714,6 +803,8 @@ def __init__(self, dataset): def calculate(self, variable, map_to=None): assert map_to == "person" + if variable == "household_weight": + return pd.Series([100.0, 100.0, 100.0, 100.0]) assert variable == "adjusted_gross_income" return pd.Series([10.0, 20.0, 30.0, 40.0]) @@ -736,7 +827,14 @@ def calculate_dataframe(self, columns): train_frames = [] - def fake_sequential_qrf(X_train, X_test, predictors, output_vars): + def fake_sequential_qrf( + X_train, + X_test, + predictors, + output_vars, + weight_col=None, + max_train_samples=None, + ): train_frames.append(X_train.copy()) return {variable: np.zeros(len(X_test)) for variable in output_vars} @@ -758,11 +856,101 @@ def fake_sequential_qrf(X_train, X_test, predictors, output_vars): dataset_path=None, ) - assert len(train_frames) == 2 + assert len(train_frames) == 4 assert all(len(frame) == 2 for frame in train_frames) assert all(99.0 in set(frame["age"]) for frame in train_frames) +def test_run_qrf_imputation_splits_weighted_real_half_from_clone_half(monkeypatch): + calls = [] + + class FakeDataset: + def load_dataset(self): + return {} + + class FakeMicrosimulation: + def __init__(self, dataset): + self.dataset = FakeDataset() + self.tax_benefit_system = type( + "TBS", + (), + { + "variables": { + var: object() + for var in DEMOGRAPHIC_PREDICTORS + REAL_HALF_INCOME_PREDICTORS + } + }, + )() + + def calculate(self, variable, map_to=None): + if variable == "adjusted_gross_income": + return pd.Series([10_000.0, 50_000.0, 5_000_000.0]) + if variable == "household_weight": + return pd.Series([40_000.0, 4_000.0, 20.0]) + raise ValueError(variable) + + def calculate_dataframe(self, columns): + n = 3 + return pd.DataFrame( + { + column: np.arange(n, dtype=np.float32) + i + for i, column in enumerate(columns) + } + ) + + def fake_sequential_qrf( + X_train, + X_test, + predictors, + output_vars, + weight_col=None, + max_train_samples=None, + ): + calls.append( + { + "predictors": list(predictors), + "output_vars": list(output_vars), + "weight_col": weight_col, + "max_train_samples": max_train_samples, + "train_columns": list(X_train.columns), + "test_columns": list(X_test.columns), + } + ) + return { + var: np.full(len(X_test), len(calls), dtype=np.float32) + for var in output_vars + } + + monkeypatch.setattr("policyengine_us.Microsimulation", FakeMicrosimulation) + monkeypatch.setattr( + puf_impute_module, + "_sequential_qrf", + fake_sequential_qrf, + ) + + data = _make_mock_data(n_persons=3, n_households=1) + for var in REAL_HALF_INCOME_PREDICTORS: + data[var] = {2024: np.arange(3, dtype=np.float32)} + + result = _run_qrf_imputation( + data=data, + time_period=2024, + puf_dataset="puf", + dataset_path="cps.h5", + ) + + assert len(result) == 4 + assert calls[0]["weight_col"] is None + assert calls[1]["weight_col"] is None + assert calls[2]["weight_col"] == PUF_WEIGHT_COLUMN + assert calls[3]["weight_col"] == PUF_WEIGHT_COLUMN + assert calls[2]["max_train_samples"] == puf_impute_module.PUF_SUBSAMPLE_TARGET + assert PUF_WEIGHT_COLUMN in calls[2]["train_columns"] + assert "employment_income" in calls[2]["predictors"] + assert "employment_income" in calls[2]["test_columns"] + assert "employment_income" not in calls[2]["output_vars"] + + def test_retirement_imputation_uses_sstb_income_for_se_eligibility(monkeypatch): class FakeMicrosimulation: def __init__(self, dataset): diff --git a/uv.lock b/uv.lock index 66f1ed9d7..83d35727d 100644 --- a/uv.lock +++ b/uv.lock @@ -2164,7 +2164,7 @@ wheels = [ [[package]] name = "policyengine-us" -version = "1.715.3" +version = "1.726.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "microdf-python" }, @@ -2174,9 +2174,9 @@ dependencies = [ { name = "tables" }, { name = "tqdm" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/60/bc/ea8cf84d7653d4d76d1f7b05feb74722ff903637c616357610de1fd3b431/policyengine_us-1.715.3.tar.gz", hash = "sha256:5b41b22be90ef155a9440bcae7dd26115c887cad92ae8a51d9080a9692053b66", size = 10014788, upload-time = "2026-05-29T21:33:02.993Z" } +sdist = { url = "https://files.pythonhosted.org/packages/af/da/1153e6e05e152bf24387e4e160da25267b0056f9f7418f3c36b6ccf04a83/policyengine_us-1.726.0.tar.gz", hash = "sha256:6383099e52f23be0a69a7f0e02b3f9abd3947c8613225b3f7d06cb24b7d0e614", size = 10199011, upload-time = "2026-06-11T00:41:07.901Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f4/0f/e6b594d46fffeb6e40db3a51441cec6a6e76ade2b178eab3836528dbc15c/policyengine_us-1.715.3-py3-none-any.whl", hash = "sha256:a34f305871f702d94f7a4d220bfd5312f11d83a417e793566892541871dfded3", size = 11037631, upload-time = "2026-05-29T21:32:59.464Z" }, + { url = "https://files.pythonhosted.org/packages/27/dd/a6a62280c6289e6da669410e97c90da99e6585c39d4fa6fe81b9910a1e3c/policyengine_us-1.726.0-py3-none-any.whl", hash = "sha256:1acc74d1f5431f7872c5500ad93604455aa7b98bcc87399218bc459b393386be", size = 11518993, upload-time = "2026-06-11T00:41:04.561Z" }, ] [[package]] @@ -2246,7 +2246,7 @@ requires-dist = [ { name = "pandas", specifier = ">=2.3.1" }, { name = "pip-system-certs", specifier = ">=3.0" }, { name = "policyengine-core", specifier = ">=3.26.1,<3.27" }, - { name = "policyengine-us", specifier = "==1.715.3" }, + { name = "policyengine-us", specifier = "==1.726.0" }, { name = "requests", specifier = ">=2.25.0" }, { name = "scipy", specifier = ">=1.15.3" }, { name = "setuptools", specifier = ">=60" }, diff --git a/validation/puf_qrf_real_half_smoke.py b/validation/puf_qrf_real_half_smoke.py new file mode 100644 index 000000000..a512d763f --- /dev/null +++ b/validation/puf_qrf_real_half_smoke.py @@ -0,0 +1,315 @@ +"""Smoke test the PUF QRF real-half imputation change. + +This deliberately avoids `Microsimulation(dataset=PUF_2015)`, because local CPS +artifacts can be stale relative to the installed PE-US schema. It uses the raw +2015 PUF CSV plus the 2024 CPS H5 to test the core failure mode: + +* old path: unweighted, demographic-only QRF draws +* fixed path: PUF-weighted, income-conditioned QRF draws + +The numbers are diagnostic, not release validation totals. +""" + +from __future__ import annotations + +import argparse +from pathlib import Path + +import h5py +import numpy as np +import pandas as pd +from microimpute.models.qrf import QRF + +from policyengine_us_data.calibration.puf_impute import ( + DEMOGRAPHIC_PREDICTORS, + PUF_SUBSAMPLE_TARGET, + PUF_WEIGHT_COLUMN, + REAL_HALF_INCOME_PREDICTORS, + _stratified_subsample_index, +) +from policyengine_us_data.storage import STORAGE_FOLDER + + +TARGETS = [ + "charitable_cash_donations", + "charitable_non_cash_donations", +] + +PUF_RENAMES = { + "adjusted_gross_income": "E00100", + "charitable_cash_donations": "E19800", + "charitable_non_cash_donations": "E20100", + "employment_income": "E00200", + "self_employment_income": "E00900", + "taxable_interest_income": "E00300", + "tax_exempt_interest_income": "E00400", + "qualified_dividend_income": "E00650", + "short_term_capital_gains": "P22250", + "long_term_capital_gains": "P23250", + "farm_income": "T27800", + "taxable_pension_income": "E01700", + "taxable_private_pension_income": "E01700", + "taxable_ira_distributions": "E01400", + "taxable_unemployment_compensation": "E02300", + "social_security": "E02400", + "social_security_retirement": "E02400", +} + + +def _decode_age(age_range: pd.Series) -> np.ndarray: + midpoints = { + 0: 40, + 1: 22, + 2: 30, + 3: 40, + 4: 50, + 5: 60, + 6: 72, + 7: 82, + } + return age_range.fillna(0).round().astype(int).map(midpoints).fillna(40).values + + +def _load_puf_tax_units(storage_folder: Path) -> pd.DataFrame: + puf_cols = [ + "RECID", + "S006", + "E00100", + "E00200", + "E00300", + "E00400", + "E00600", + "E00650", + "E00900", + "E01400", + "E01500", + "E01700", + "E02300", + "E02400", + "E19800", + "E20100", + "E25850", + "E25860", + "P22250", + "P23250", + "T27800", + "MARS", + "XTOT", + ] + demo_cols = ["RECID", "AGERANGE", "GENDER"] + puf = pd.read_csv(storage_folder / "puf_2015.csv", usecols=puf_cols) + demographics = pd.read_csv( + storage_folder / "demographics_2015.csv", usecols=demo_cols + ) + puf = puf.merge(demographics, on="RECID", how="left") + frame = pd.DataFrame(index=puf.index) + frame[PUF_WEIGHT_COLUMN] = puf["S006"].fillna(0).astype(float) / 100 + frame["age"] = _decode_age(puf["AGERANGE"]) + frame["is_male"] = (puf["GENDER"].fillna(0) == 1).astype(float) + frame["tax_unit_is_joint"] = (puf["MARS"].fillna(0) == 2).astype(float) + frame["tax_unit_count_dependents"] = np.maximum( + puf["XTOT"].fillna(1).astype(float) - 1 - frame["tax_unit_is_joint"], + 0, + ) + frame["is_tax_unit_head"] = 1.0 + frame["is_tax_unit_spouse"] = 0.0 + frame["is_tax_unit_dependent"] = 0.0 + for target, source in PUF_RENAMES.items(): + frame[target] = puf[source].fillna(0).astype(float) + frame["non_qualified_dividend_income"] = puf["E00600"].fillna(0).astype( + float + ) - puf["E00650"].fillna(0).astype(float) + frame["rental_income"] = puf["E25850"].fillna(0).astype(float) - puf[ + "E25860" + ].fillna(0).astype(float) + frame["tax_exempt_pension_income"] = puf["E01500"].fillna(0).astype(float) - puf[ + "E01700" + ].fillna(0).astype(float) + frame["tax_exempt_private_pension_income"] = frame["tax_exempt_pension_income"] + frame["tax_exempt_ira_distributions"] = 0.0 + frame["social_security_disability"] = 0.0 + frame["social_security_survivors"] = 0.0 + frame["social_security_dependents"] = 0.0 + return frame + + +def _series_by_id(values: np.ndarray, ids: np.ndarray) -> pd.Series: + return pd.Series(values).groupby(ids).sum() + + +def _load_cps_tax_units(cps_path: Path) -> tuple[pd.DataFrame, np.ndarray]: + with h5py.File(cps_path, "r") as f: + tax_unit_ids = f["tax_unit_id"][:] + person_tax_unit_ids = f["person_tax_unit_id"][:] + person_household_ids = f["person_household_id"][:] + household_weights = pd.Series( + f["household_weight"][:], + index=f["household_id"][:], + ) + + first_person_by_tax_unit = ( + pd.Series(np.arange(len(person_tax_unit_ids))) + .groupby(person_tax_unit_ids) + .first() + .reindex(tax_unit_ids) + ) + first_person_idx = first_person_by_tax_unit.values.astype(int) + tax_unit_household_ids = person_household_ids[first_person_idx] + weights = household_weights.reindex(tax_unit_household_ids).fillna(0).values + + frame = pd.DataFrame(index=np.arange(len(tax_unit_ids))) + person_age = f["age"][:].astype(float) + adult_count = _series_by_id( + (person_age >= 18).astype(float), person_tax_unit_ids + ) + child_count = _series_by_id( + (person_age < 19).astype(float), person_tax_unit_ids + ) + frame["age"] = person_age[first_person_idx] + frame["is_male"] = 1.0 - f["is_female"][:][first_person_idx].astype(float) + frame["tax_unit_is_joint"] = ( + adult_count.reindex(tax_unit_ids).fillna(0).values >= 2 + ).astype(float) + frame["tax_unit_count_dependents"] = ( + child_count.reindex(tax_unit_ids).fillna(0).values + ) + frame["is_tax_unit_head"] = 1.0 + frame["is_tax_unit_spouse"] = 0.0 + frame["is_tax_unit_dependent"] = 0.0 + + for variable in REAL_HALF_INCOME_PREDICTORS: + if variable not in f: + continue + by_tax_unit = _series_by_id(f[variable][:], person_tax_unit_ids) + frame[variable] = by_tax_unit.reindex(tax_unit_ids).fillna(0).values + + return frame, weights + + +def _weighted_total(values: pd.Series | np.ndarray, weights: np.ndarray) -> float: + return float(np.asarray(values, dtype=float) @ np.asarray(weights, dtype=float)) + + +def _run_qrf( + *, + train: pd.DataFrame, + test: pd.DataFrame, + predictors: list[str], + weight_col: str | None, + max_train_samples: int | None, +) -> pd.DataFrame: + qrf = QRF( + log_level="WARNING", + memory_efficient=True, + max_train_samples=max_train_samples, + ) + return qrf.fit_predict( + X_train=train, + X_test=test, + predictors=predictors, + imputed_variables=TARGETS, + weight_col=weight_col, + n_jobs=1, + ) + + +def _format_billions(value: float) -> str: + return f"${value / 1e9:,.1f}B" + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument( + "--cps-path", + type=Path, + default=STORAGE_FOLDER / "cps_2024.h5", + ) + parser.add_argument( + "--max-train-samples", + type=int, + default=PUF_SUBSAMPLE_TARGET, + ) + args = parser.parse_args() + + puf = _load_puf_tax_units(STORAGE_FOLDER) + cps, cps_weights = _load_cps_tax_units(args.cps_path) + if "adjusted_gross_income" not in cps: + cps["adjusted_gross_income"] = cps[ + [column for column in REAL_HALF_INCOME_PREDICTORS if column in cps] + ].sum(axis=1) + income_predictors = [ + predictor + for predictor in REAL_HALF_INCOME_PREDICTORS + if predictor in cps and predictor in puf + ] + weighted_predictors = DEMOGRAPHIC_PREDICTORS + income_predictors + + old_idx = _stratified_subsample_index( + puf["adjusted_gross_income"].values, + target_n=args.max_train_samples, + ) + old_train = puf.iloc[old_idx].reset_index(drop=True) + weighted_train = puf.loc[puf[PUF_WEIGHT_COLUMN] > 0].reset_index(drop=True) + + print(f"PUF rows: {len(puf):,}; CPS tax units: {len(cps):,}") + print(f"Old unweighted train rows: {len(old_train):,}") + print(f"Weighted train rows before QRF sampling: {len(weighted_train):,}") + print(f"Income predictors used: {', '.join(income_predictors)}") + + old_pred = _run_qrf( + train=old_train, + test=cps, + predictors=DEMOGRAPHIC_PREDICTORS, + weight_col=None, + max_train_samples=None, + ) + fixed_pred = _run_qrf( + train=weighted_train, + test=cps, + predictors=weighted_predictors, + weight_col=PUF_WEIGHT_COLUMN, + max_train_samples=args.max_train_samples, + ) + + rows = [] + for target in TARGETS: + puf_total = _weighted_total(puf[target], puf[PUF_WEIGHT_COLUMN].values) + old_total = _weighted_total(old_pred[target], cps_weights) + fixed_total = _weighted_total(fixed_pred[target], cps_weights) + rows.append( + { + "target": target, + "puf_weighted": _format_billions(puf_total), + "old_qrf": _format_billions(old_total), + "old_vs_puf": f"{old_total / puf_total:,.1f}x", + "fixed_qrf": _format_billions(fixed_total), + "fixed_vs_puf": f"{fixed_total / puf_total:,.1f}x", + } + ) + + puf_total = sum( + _weighted_total(puf[target], puf[PUF_WEIGHT_COLUMN].values) + for target in TARGETS + ) + old_total = sum( + _weighted_total(old_pred[target], cps_weights) for target in TARGETS + ) + fixed_total = sum( + _weighted_total(fixed_pred[target], cps_weights) for target in TARGETS + ) + rows.append( + { + "target": "combined_charitable", + "puf_weighted": _format_billions(puf_total), + "old_qrf": _format_billions(old_total), + "old_vs_puf": f"{old_total / puf_total:,.1f}x", + "fixed_qrf": _format_billions(fixed_total), + "fixed_vs_puf": f"{fixed_total / puf_total:,.1f}x", + } + ) + + print(pd.DataFrame(rows).to_string(index=False)) + + +if __name__ == "__main__": + main()