Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/1170.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Impute PUF-only variables onto positive-weight CPS records with a PUF-weighted, income-conditioned QRF draw instead of reusing the unweighted, demographic-only draw built for the zero-weight clone half. The old shared draw oversampled rich PUF donors onto real CPS records, inflating charitable donations and other PUF-sourced deduction inputs by an order of magnitude. The clone half keeps its rich-preserving draw, and the new real-half training frame respects the Forbes/top-tail donor exclusions.
157 changes: 147 additions & 10 deletions policyengine_us_data/calibration/puf_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,34 @@
"is_tax_unit_dependent",
]

REAL_HALF_INCOME_PREDICTORS = [
"adjusted_gross_income",
"employment_income",
"self_employment_income",
"taxable_interest_income",
"tax_exempt_interest_income",
"qualified_dividend_income",
"non_qualified_dividend_income",
"short_term_capital_gains",
"long_term_capital_gains",
"rental_income",
"farm_income",
"taxable_pension_income",
"tax_exempt_pension_income",
"taxable_private_pension_income",
"tax_exempt_private_pension_income",
"taxable_ira_distributions",
"tax_exempt_ira_distributions",
"taxable_unemployment_compensation",
"social_security",
"social_security_retirement",
"social_security_disability",
"social_security_survivors",
"social_security_dependents",
]

PUF_WEIGHT_COLUMN = "puf_sample_weight"

IMPUTED_VARIABLES = [
"employment_income",
"partnership_s_corp_income",
Expand Down Expand Up @@ -548,13 +576,21 @@ def puf_clone_dataset(

y_full = None
y_override = None
y_real_full = None
y_real_override = None
if not skip_qrf and puf_dataset is not None:
y_full, y_override = _run_qrf_imputation(
qrf_result = _run_qrf_imputation(
data,
time_period,
puf_dataset,
dataset_path=dataset_path,
)
if len(qrf_result) == 2:
# Backwards-compatible shape for older tests/callers.
y_full, y_override = qrf_result
y_real_full, y_real_override = y_full, y_override
else:
y_full, y_override, y_real_full, y_real_override = qrf_result

cps_sim = None
tbs = None
Expand Down Expand Up @@ -596,8 +632,14 @@ def _map_to_entity(pred_values, variable_name):
values = time_dict[time_period]

if variable in OVERRIDDEN_IMPUTED_VARIABLES and y_override:
pred = _map_to_entity(y_override[variable], variable)
new_data[variable] = {time_period: np.concatenate([pred, pred])}
real_source = (
y_real_override
if y_real_override and variable in y_real_override
else y_override
)
real_pred = _map_to_entity(real_source[variable], variable)
puf_pred = _map_to_entity(y_override[variable], variable)
new_data[variable] = {time_period: np.concatenate([real_pred, puf_pred])}
elif variable in IMPUTED_VARIABLES and y_full:
pred = _map_to_entity(y_full[variable], variable)
new_data[variable] = {time_period: np.concatenate([values, pred])}
Expand Down Expand Up @@ -655,8 +697,21 @@ def _map_to_entity(pred_values, variable_name):
if y_full:
for var in IMPUTED_VARIABLES:
if var not in data:
pred = _map_to_entity(y_full[var], var)
new_data[var] = {time_period: np.concatenate([pred, pred])}
if var in OVERRIDDEN_IMPUTED_VARIABLES and y_override:
real_source = (
y_real_override
if y_real_override and var in y_real_override
else y_override
)
puf_source = y_override
else:
real_source = (
y_real_full if y_real_full and var in y_real_full else y_full
)
puf_source = y_full
real_pred = _map_to_entity(real_source[var], var)
puf_pred = _map_to_entity(puf_source[var], var)
new_data[var] = {time_period: np.concatenate([real_pred, puf_pred])}

if cps_sim is not None:
del cps_sim
Expand Down Expand Up @@ -944,8 +999,14 @@ def _run_qrf_imputation(
demographic predictors via Microsimulation.

Returns:
Tuple of (y_full_imputations, y_override_imputations)
as dicts of {variable: np.ndarray}.
Tuple of:
* y_full_imputations: old rich-preserving ghost-half draws
* y_override_imputations: old rich-preserving ghost-half draws
for variables that override CPS values
* y_real_full_imputations: weighted, income-conditioned draws
for positive-weight CPS records
* y_real_override_imputations: weighted, income-conditioned draws
for override variables on positive-weight CPS records
"""
from policyengine_us import Microsimulation

Expand All @@ -955,6 +1016,7 @@ def _run_qrf_imputation(

puf_agi = puf_sim.calculate("adjusted_gross_income", map_to="person").values
puf_data = puf_sim.dataset.load_dataset()
puf_weight = puf_sim.calculate("household_weight", map_to="person").values

X_train_full = puf_sim.calculate_dataframe(
DEMOGRAPHIC_PREDICTORS + IMPUTED_VARIABLES
Expand All @@ -964,6 +1026,22 @@ def _run_qrf_imputation(
DEMOGRAPHIC_PREDICTORS + OVERRIDDEN_IMPUTED_VARIABLES
)

real_full_targets = [var for var in IMPUTED_VARIABLES if var not in data]
real_override_targets = list(OVERRIDDEN_IMPUTED_VARIABLES)
real_income_predictors = [
var
for var in REAL_HALF_INCOME_PREDICTORS
if var not in set(real_full_targets) | set(real_override_targets)
]
real_predictors = DEMOGRAPHIC_PREDICTORS + real_income_predictors
real_training_cols = list(
dict.fromkeys(real_predictors + real_full_targets + real_override_targets)
)
X_train_real = puf_sim.calculate_dataframe(real_training_cols)
# Keep all person rows for now so the Forbes/top-tail training mask
# below stays index-aligned; the positive-weight filter happens after.
X_train_real[PUF_WEIGHT_COLUMN] = np.asarray(puf_weight, dtype=np.float64)

del puf_sim

tax_unit_ids = _period_array(puf_data, "tax_unit_id", time_period)
Expand Down Expand Up @@ -1002,8 +1080,10 @@ def _run_qrf_imputation(
>= top_tail_threshold
)
if len(forbes_person_mask) == len(puf_agi) and forbes_person_mask.any():
if len(X_train_full) != len(forbes_person_mask) or len(X_train_override) != len(
forbes_person_mask
if (
len(X_train_full) != len(forbes_person_mask)
or len(X_train_override) != len(forbes_person_mask)
or len(X_train_real) != len(forbes_person_mask)
):
logger.warning(
"Skipping Forbes donor exclusion because QRF training "
Expand All @@ -1022,6 +1102,7 @@ def _run_qrf_imputation(
X_train_override = X_train_override.loc[non_forbes_mask].reset_index(
drop=True
)
X_train_real = X_train_real.loc[non_forbes_mask].reset_index(drop=True)

sub_idx = _stratified_subsample_index(puf_agi)
_log_stratified_subsample(
Expand All @@ -1033,15 +1114,35 @@ def _run_qrf_imputation(
X_train_full = X_train_full.iloc[sub_idx].reset_index(drop=True)
X_train_override = X_train_override.iloc[sub_idx].reset_index(drop=True)

X_train_real = X_train_real.loc[
np.asarray(X_train_real[PUF_WEIGHT_COLUMN], dtype=np.float64) > 0
].reset_index(drop=True)

if dataset_path is not None:
cps_sim = Microsimulation(dataset=dataset_path)
X_test = cps_sim.calculate_dataframe(DEMOGRAPHIC_PREDICTORS)
valid_real_predictors = [
predictor
for predictor in real_predictors
if predictor in cps_sim.tax_benefit_system.variables
]
X_test_real = cps_sim.calculate_dataframe(valid_real_predictors)
del cps_sim
else:
X_test = pd.DataFrame()
for pred in DEMOGRAPHIC_PREDICTORS:
if pred in data:
X_test[pred] = data[pred][time_period].astype(np.float32)
X_test_real = pd.DataFrame(index=X_test.index)
for pred in real_predictors:
if pred in data:
X_test_real[pred] = data[pred][time_period].astype(np.float32)

for pred in DEMOGRAPHIC_PREDICTORS:
if pred not in X_test_real and pred in X_test:
X_test_real[pred] = X_test[pred]

real_predictors = [pred for pred in real_predictors if pred in X_test_real]

logger.info("Imputing %d PUF variables (full)", len(IMPUTED_VARIABLES))
y_full = _sequential_qrf(
Expand All @@ -1059,7 +1160,39 @@ def _run_qrf_imputation(
OVERRIDDEN_IMPUTED_VARIABLES,
)

return y_full, y_override
logger.info(
"Imputing %d PUF variables on real CPS half with %d predictors",
len(real_full_targets),
len(real_predictors),
)
y_real_full = (
_sequential_qrf(
X_train_real,
X_test_real,
real_predictors,
real_full_targets,
weight_col=PUF_WEIGHT_COLUMN,
max_train_samples=PUF_SUBSAMPLE_TARGET,
)
if real_full_targets
else {}
)

logger.info(
"Imputing %d override variables on real CPS half with %d predictors",
len(real_override_targets),
len(real_predictors),
)
y_real_override = _sequential_qrf(
X_train_real,
X_test_real,
real_predictors,
real_override_targets,
weight_col=PUF_WEIGHT_COLUMN,
max_train_samples=PUF_SUBSAMPLE_TARGET,
)

return y_full, y_override, y_real_full, y_real_override


def _period_array(
Expand Down Expand Up @@ -1187,6 +1320,8 @@ def _sequential_qrf(
X_test: pd.DataFrame,
predictors: List[str],
output_vars: List[str],
weight_col: Optional[str] = None,
max_train_samples: Optional[int] = None,
) -> Dict[str, np.ndarray]:
"""Run a single sequential QRF preserving covariance.

Expand All @@ -1209,12 +1344,14 @@ def _sequential_qrf(
qrf = QRF(
log_level="INFO",
memory_efficient=True,
max_train_samples=max_train_samples,
)
predictions = qrf.fit_predict(
X_train=X_train,
X_test=X_test,
predictors=predictors,
imputed_variables=output_vars,
weight_col=weight_col,
n_jobs=1,
)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers = [
"Programming Language :: Python :: 3.14",
]
dependencies = [
"policyengine-us==1.715.3",
"policyengine-us==1.726.0",
# policyengine-core 3.26.1 is the current 3.26.x runtime and includes the fix for
# PolicyEngine/policyengine-core#482 (user-set ETERNITY inputs lost
# after _invalidate_all_caches) and is required by policyengine-us 1.682.1+.
Expand Down
Loading
Loading