Skip to content
1 change: 1 addition & 0 deletions changelog.d/703.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added explicit clone-origin flags to extended/enhanced CPS datasets and saved ECPS clone diagnostics for clone weight share, modeled-only-poor share, and extreme childcare/tax checks.
22 changes: 22 additions & 0 deletions policyengine_us_data/calibration/puf_impute.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@

logger = logging.getLogger(__name__)

CLONE_ORIGIN_FLAGS = {
"person": "person_is_puf_clone",
"tax_unit": "tax_unit_is_puf_clone",
"spm_unit": "spm_unit_is_puf_clone",
"family": "family_is_puf_clone",
"household": "household_is_puf_clone",
}

PUF_SUBSAMPLE_TARGET = 20_000
PUF_TOP_PERCENTILE = 99.5

Expand Down Expand Up @@ -531,6 +539,20 @@ def _map_to_entity(pred_values, variable_name):
time_period: np.concatenate([state_fips, state_fips]).astype(np.int32)
}

for entity_key, flag_name in CLONE_ORIGIN_FLAGS.items():
id_variable = f"{entity_key}_id"
if id_variable not in data:
continue
n_entities = len(data[id_variable][time_period])
new_data[flag_name] = {
time_period: np.concatenate(
[
np.zeros(n_entities, dtype=np.int8),
np.ones(n_entities, dtype=np.int8),
]
)
}

if y_full:
for var in IMPUTED_VARIABLES:
if var not in data:
Expand Down
265 changes: 265 additions & 0 deletions policyengine_us_data/datasets/cps/enhanced_cps.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import json
from pathlib import Path

import h5py
from policyengine_core.data import Dataset
import pandas as pd
from policyengine_us_data.utils import (
Expand Down Expand Up @@ -28,6 +32,250 @@
torch = None


def _to_numpy(value) -> np.ndarray:
return np.asarray(getattr(value, "values", value))


def _weighted_share(mask, weights) -> float:
weights = np.asarray(weights, dtype=np.float64)
total_weight = float(weights.sum())
if total_weight <= 0:
return 0.0
mask = np.asarray(mask, dtype=bool)
return 100 * float(weights[mask].sum()) / total_weight


def compute_clone_diagnostics_summary(
*,
household_is_puf_clone,
household_weight,
person_is_puf_clone,
person_weight,
person_in_poverty,
person_reported_in_poverty,
spm_unit_is_puf_clone,
spm_unit_weight,
spm_unit_capped_work_childcare_expenses,
spm_unit_pre_subsidy_childcare_expenses,
spm_unit_taxes,
spm_unit_market_income,
) -> dict[str, float]:
household_is_puf_clone = np.asarray(household_is_puf_clone, dtype=bool)
household_weight = np.asarray(household_weight, dtype=np.float64)
person_is_puf_clone = np.asarray(person_is_puf_clone, dtype=bool)
person_weight = np.asarray(person_weight, dtype=np.float64)
person_in_poverty = np.asarray(person_in_poverty, dtype=bool)
person_reported_in_poverty = np.asarray(person_reported_in_poverty, dtype=bool)
spm_unit_is_puf_clone = np.asarray(spm_unit_is_puf_clone, dtype=bool)
spm_unit_weight = np.asarray(spm_unit_weight, dtype=np.float64)
capped_childcare = np.asarray(
spm_unit_capped_work_childcare_expenses, dtype=np.float64
)
pre_subsidy_childcare = np.asarray(
spm_unit_pre_subsidy_childcare_expenses, dtype=np.float64
)
spm_unit_taxes = np.asarray(spm_unit_taxes, dtype=np.float64)
spm_unit_market_income = np.asarray(spm_unit_market_income, dtype=np.float64)

poor_modeled_only = person_in_poverty & ~person_reported_in_poverty
clone_spm_weight = spm_unit_weight[spm_unit_is_puf_clone].sum()

return {
"clone_household_weight_share_pct": _weighted_share(
household_is_puf_clone, household_weight
),
"clone_person_weight_share_pct": _weighted_share(
person_is_puf_clone, person_weight
),
"clone_poor_modeled_only_person_weight_share_pct": _weighted_share(
person_is_puf_clone & poor_modeled_only,
person_weight,
),
"poor_modeled_only_within_clone_person_weight_share_pct": (
0.0
if person_weight[person_is_puf_clone].sum() <= 0
else _weighted_share(
poor_modeled_only[person_is_puf_clone],
person_weight[person_is_puf_clone],
)
),
"clone_childcare_exceeds_pre_subsidy_share_pct": (
0.0
if clone_spm_weight <= 0
else _weighted_share(
capped_childcare[spm_unit_is_puf_clone]
> pre_subsidy_childcare[spm_unit_is_puf_clone] + 1,
spm_unit_weight[spm_unit_is_puf_clone],
)
),
"clone_childcare_above_5000_share_pct": (
0.0
if clone_spm_weight <= 0
else _weighted_share(
capped_childcare[spm_unit_is_puf_clone] > 5_000,
spm_unit_weight[spm_unit_is_puf_clone],
)
),
"clone_taxes_exceed_market_income_share_pct": (
0.0
if clone_spm_weight <= 0
else _weighted_share(
spm_unit_taxes[spm_unit_is_puf_clone]
> spm_unit_market_income[spm_unit_is_puf_clone] + 1,
spm_unit_weight[spm_unit_is_puf_clone],
)
),
}


def _load_saved_period_array(
file_path: str | Path,
variable_name: str,
period: int,
) -> np.ndarray:
with h5py.File(file_path, "r") as h5_file:
obj = h5_file[variable_name]
if isinstance(obj, h5py.Dataset):
return np.asarray(obj[...])
period_key = str(period)
if period_key in obj:
return np.asarray(obj[period_key][...])
if period in obj:
return np.asarray(obj[period][...])
raise KeyError(f"{variable_name} missing period {period}")


def clone_diagnostics_path(file_path: str | Path) -> Path:
return Path(file_path).with_suffix(".clone_diagnostics.json")


def build_clone_diagnostics_payload(
period_to_diagnostics: dict[int, dict[str, float]],
) -> dict:
if not period_to_diagnostics:
raise ValueError("Expected at least one period of clone diagnostics")

ordered_periods = sorted(period_to_diagnostics)
if len(ordered_periods) == 1:
period = ordered_periods[0]
diagnostics = dict(period_to_diagnostics[period])
diagnostics["period"] = int(period)
return diagnostics

return {
"periods": {
str(period): period_to_diagnostics[period] for period in ordered_periods
}
}


def write_clone_diagnostics_report(file_path: str | Path, diagnostics: dict) -> Path:
output_path = clone_diagnostics_path(file_path)
output_path.write_text(json.dumps(diagnostics, indent=2, sort_keys=True) + "\n")
return output_path


def refresh_clone_diagnostics_report(
file_path: str | Path,
diagnostics_builder,
) -> Path:
output_path = clone_diagnostics_path(file_path)
if output_path.exists():
output_path.unlink()
diagnostics = diagnostics_builder()
return write_clone_diagnostics_report(file_path, diagnostics)


def save_clone_diagnostics_report(
dataset_cls: Type[Dataset],
*,
start_year: int,
end_year: int,
) -> tuple[Path, dict]:
periods = list(range(start_year, end_year + 1))
output_path = refresh_clone_diagnostics_report(
dataset_cls.file_path,
lambda: build_clone_diagnostics_payload(
{
period: build_clone_diagnostics_for_saved_dataset(
dataset_cls,
period,
)
for period in periods
}
),
)
diagnostics_payload = json.loads(output_path.read_text())
return output_path, diagnostics_payload


def build_clone_diagnostics_for_saved_dataset(
dataset_cls: Type[Dataset], period: int
) -> dict[str, float]:
from policyengine_us import Microsimulation

sim = Microsimulation(dataset=dataset_cls)
dataset_path = Path(dataset_cls.file_path)

return build_clone_diagnostics_for_simulation(
sim,
dataset_path=dataset_path,
period=period,
)


def build_clone_diagnostics_for_simulation(
sim,
*,
dataset_path: str | Path,
period: int,
) -> dict[str, float]:
"""Build clone diagnostics from a simulation and saved clone-flag arrays.

The enhanced CPS save path preserves zeroed person/spm-unit weight inputs on
the clone half. For diagnostics, always map the calibrated household weights
to persons/SPM units explicitly instead of reading those stale entity-level
weight inputs back from disk.
"""

person_reported_in_poverty = _to_numpy(
sim.calculate("spm_unit_net_income_reported", period=period, map_to="person")
) < _to_numpy(
sim.calculate("spm_unit_spm_threshold", period=period, map_to="person")
)

return compute_clone_diagnostics_summary(
household_is_puf_clone=_load_saved_period_array(
dataset_path, "household_is_puf_clone", period
),
household_weight=_to_numpy(sim.calculate("household_weight", period=period)),
person_is_puf_clone=_load_saved_period_array(
dataset_path, "person_is_puf_clone", period
),
person_weight=_to_numpy(
sim.calculate("household_weight", period=period, map_to="person")
),
person_in_poverty=_to_numpy(sim.calculate("person_in_poverty", period=period)),
person_reported_in_poverty=person_reported_in_poverty,
spm_unit_is_puf_clone=_load_saved_period_array(
dataset_path, "spm_unit_is_puf_clone", period
),
spm_unit_weight=_to_numpy(
sim.calculate("household_weight", period=period, map_to="spm_unit")
),
spm_unit_capped_work_childcare_expenses=_to_numpy(
sim.calculate("spm_unit_capped_work_childcare_expenses", period=period)
),
spm_unit_pre_subsidy_childcare_expenses=_to_numpy(
sim.calculate("spm_unit_pre_subsidy_childcare_expenses", period=period)
),
spm_unit_taxes=_to_numpy(sim.calculate("spm_unit_taxes", period=period)),
spm_unit_market_income=_to_numpy(
sim.calculate("spm_unit_market_income", period=period)
),
)


def _get_period_array(period_values: dict, period: int) -> np.ndarray:
"""Get a period array from a TIME_PERIOD_ARRAYS variable dict."""
value = period_values.get(period)
Expand Down Expand Up @@ -313,6 +561,23 @@ def generate(self):
logging.info("Post-generation weight validation passed")

self.save_dataset(data)
try:
output_path, diagnostics_payload = save_clone_diagnostics_report(
type(self),
start_year=self.start_year,
end_year=self.end_year,
)
logging.info("Saved clone diagnostics to %s", output_path)
logging.info(
"Clone diagnostics summary: %s",
diagnostics_payload,
)
except Exception:
logging.warning(
"Unable to compute clone diagnostics for %s",
self.file_path,
exc_info=True,
)


class ReweightedCPS_2024(Dataset):
Expand Down
Loading
Loading