diff --git a/Makefile b/Makefile index 829c62bf0..eaf75c8df 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,8 @@ documentation-dev: myst clean && \ myst start +DATABASE_YEAR ?= 2024 + database: rm -f policyengine_us_data/storage/calibration/policy_data.db python policyengine_us_data/db/create_database_tables.py @@ -85,6 +87,7 @@ database: python policyengine_us_data/db/etl_tanf.py --year $(YEAR) python policyengine_us_data/db/etl_state_income_tax.py --year $(YEAR) python policyengine_us_data/db/etl_irs_soi.py --year $(YEAR) + python policyengine_us_data/db/etl_aca_agi_state_targets.py --year $(YEAR) python policyengine_us_data/db/etl_pregnancy.py --year $(YEAR) python policyengine_us_data/db/validate_database.py diff --git a/changelog.d/743.added.md b/changelog.d/743.added.md new file mode 100644 index 000000000..1fb5b53c1 --- /dev/null +++ b/changelog.d/743.added.md @@ -0,0 +1 @@ +Added ACA Marketplace spending and enrollment targets plus state AGI targets to the database build. diff --git a/policyengine_us_data/calibration/source_impute.py b/policyengine_us_data/calibration/source_impute.py index ff0de922e..20a988c8e 100644 --- a/policyengine_us_data/calibration/source_impute.py +++ b/policyengine_us_data/calibration/source_impute.py @@ -290,7 +290,9 @@ def _impute_acs( acs = Microsimulation(dataset=ACS_2022) predictors = ACS_PREDICTORS + ["state_fips"] - acs_df = acs.calculate_dataframe(ACS_PREDICTORS + ACS_IMPUTED_VARIABLES) + acs_df = acs.calculate_dataframe( + ACS_PREDICTORS + ACS_IMPUTED_VARIABLES, map_to="person" + ) acs_df["state_fips"] = acs.calculate("state_fips", map_to="person").values.astype( np.float32 ) @@ -301,7 +303,7 @@ def _impute_acs( if dataset_path is not None: cps_sim = Microsimulation(dataset=dataset_path) - cps_df = cps_sim.calculate_dataframe(ACS_PREDICTORS) + cps_df = cps_sim.calculate_dataframe(ACS_PREDICTORS, map_to="person") del cps_sim else: cps_df = pd.DataFrame() diff --git a/policyengine_us_data/datasets/cps/cps.py b/policyengine_us_data/datasets/cps/cps.py index 17a63eca4..2e1d1484e 100644 --- a/policyengine_us_data/datasets/cps/cps.py +++ b/policyengine_us_data/datasets/cps/cps.py @@ -238,7 +238,7 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): "household_size", ] IMPUTATIONS = ["rent", "real_estate_taxes"] - train_df = acs.calculate_dataframe(PREDICTORS + IMPUTATIONS) + train_df = acs.calculate_dataframe(PREDICTORS + IMPUTATIONS, map_to="person") train_df.tenure_type = train_df.tenure_type.map( { "OWNED_OUTRIGHT": "OWNED_WITH_MORTGAGE", @@ -246,7 +246,7 @@ def add_rent(self, cps: h5py.File, person: DataFrame, household: DataFrame): na_action="ignore", ).fillna(train_df.tenure_type) train_df = train_df[train_df.is_household_head].sample(10_000) - inference_df = cps_sim.calculate_dataframe(PREDICTORS) + inference_df = cps_sim.calculate_dataframe(PREDICTORS, map_to="person") mask = inference_df.is_household_head.values inference_df = inference_df[mask] @@ -1872,24 +1872,73 @@ def _update_documentation_with_numbers(log_df, docs_dir): def add_tips(self, cps: h5py.File): self.save_dataset(cps) - from policyengine_us import Microsimulation - sim = Microsimulation(dataset=self) - cps = sim.calculate_dataframe( - [ - "person_id", - "household_id", - "employment_income", - "interest_income", - "dividend_income", - "rental_income", - "age", - "household_weight", - "is_female", - ], - 2025, + existing_data = self.load_dataset() + person_household_id = np.asarray( + existing_data.get( + "person_household_id", + existing_data.get("household_id"), + ) ) - cps = pd.DataFrame(cps) + interest_income = existing_data.get("interest_income") + if interest_income is None: + interest_income = np.asarray( + existing_data.get( + "taxable_interest_income", + np.zeros(len(person_household_id), dtype=np.float32), + ) + ) + np.asarray( + existing_data.get( + "tax_exempt_interest_income", + np.zeros(len(person_household_id), dtype=np.float32), + ) + ) + dividend_income = existing_data.get("dividend_income") + if dividend_income is None: + dividend_income = np.asarray( + existing_data.get( + "qualified_dividend_income", + np.zeros(len(person_household_id), dtype=np.float32), + ) + ) + np.asarray( + existing_data.get( + "non_qualified_dividend_income", + np.zeros(len(person_household_id), dtype=np.float32), + ) + ) + cps = pd.DataFrame( + { + "person_id": np.asarray(existing_data["person_id"]), + "household_id": person_household_id, + "employment_income": np.asarray(existing_data["employment_income"]), + "interest_income": np.asarray(interest_income), + "dividend_income": np.asarray(dividend_income), + "rental_income": np.asarray( + existing_data.get( + "rental_income", + np.zeros(len(person_household_id), dtype=np.float32), + ) + ), + "age": np.asarray(existing_data["age"]), + "is_female": np.asarray(existing_data["is_female"]), + } + ) + household_weight = existing_data.get("household_weight") + if household_weight is not None: + household_weight = np.asarray(household_weight) + if len(household_weight) == len(cps): + cps["household_weight"] = household_weight + else: + household_ids = np.asarray(existing_data["household_id"]) + household_weight_map = dict(zip(household_ids, household_weight)) + cps["household_weight"] = ( + pd.Series(person_household_id) + .map(household_weight_map) + .fillna(0) + .values + ) + else: + cps["household_weight"] = 0.0 # Get is_married from raw CPS data (A_MARITL codes: 1,2 = married) # Note: is_married in policyengine-us is Family-level, but we need diff --git a/policyengine_us_data/db/create_field_valid_values.py b/policyengine_us_data/db/create_field_valid_values.py index 520090120..4dd1b54c6 100644 --- a/policyengine_us_data/db/create_field_valid_values.py +++ b/policyengine_us_data/db/create_field_valid_values.py @@ -69,6 +69,7 @@ def populate_field_valid_values(session: Session) -> None: source_values = [ ("source", "Census ACS S0101", "survey"), ("source", "IRS SOI", "administrative"), + ("source", "CMS Marketplace", "administrative"), ("source", "CMS Medicaid", "administrative"), ("source", "Census ACS S2704", "survey"), ("source", "USDA FNS SNAP", "administrative"), diff --git a/policyengine_us_data/db/create_initial_strata.py b/policyengine_us_data/db/create_initial_strata.py index 8f7b320fc..9e4d98ab3 100644 --- a/policyengine_us_data/db/create_initial_strata.py +++ b/policyengine_us_data/db/create_initial_strata.py @@ -69,7 +69,10 @@ def fetch_congressional_districts(year): def main(): - _, year = etl_argparser("Create initial geographic strata for calibration") + _, year = etl_argparser( + "Create initial geographic strata for calibration", + allow_year=True, + ) # State FIPS to name/abbreviation mapping STATE_NAMES = { diff --git a/policyengine_us_data/db/etl_aca_agi_state_targets.py b/policyengine_us_data/db/etl_aca_agi_state_targets.py new file mode 100644 index 000000000..6dffab7de --- /dev/null +++ b/policyengine_us_data/db/etl_aca_agi_state_targets.py @@ -0,0 +1,293 @@ +"""ETL for ACA spending/enrollment and AGI state targets into policy_data.db.""" + +from __future__ import annotations + +import logging +import hashlib + +import pandas as pd +from sqlmodel import Session, create_engine, select + +from policyengine_us_data.db.create_database_tables import ( + Stratum, + StratumConstraint, + Target, +) +from policyengine_us_data.storage import STORAGE_FOLDER +from policyengine_us_data.utils.census import STATE_ABBREV_TO_FIPS +from policyengine_us_data.utils.db import etl_argparser, get_geographic_strata + +logger = logging.getLogger(__name__) + +ACA_SPENDING_2024 = 9.8e10 + + +def _definition_hash( + parent_stratum_id: int, constraints: list[StratumConstraint] +) -> str: + constraint_strings = [ + f"{c.constraint_variable}|{c.operation}|{c.value}" for c in constraints + ] + constraint_strings.sort() + fingerprint_text = f"{parent_stratum_id}\n" + "\n".join(constraint_strings) + return hashlib.sha256(fingerprint_text.encode("utf-8")).hexdigest() + + +def _get_or_create_stratum( + session: Session, + parent_stratum_id: int, + note: str, + constraints: list[StratumConstraint], +) -> Stratum: + definition_hash = _definition_hash(parent_stratum_id, constraints) + existing = session.exec( + select(Stratum).where(Stratum.definition_hash == definition_hash) + ).first() + if existing is not None: + return existing + + stratum = Stratum( + parent_stratum_id=parent_stratum_id, + notes=note, + ) + stratum.constraints_rel = constraints + session.add(stratum) + return stratum + + +def _upsert_target( + session: Session, + stratum: Stratum, + *, + variable: str, + period: int, + value: float, + source: str, + notes: str | None = None, +) -> None: + if stratum.stratum_id is None: + stratum.targets_rel.append( + Target( + variable=variable, + period=period, + value=value, + active=True, + source=source, + notes=notes, + ) + ) + return + + existing = session.exec( + select(Target).where( + Target.stratum_id == stratum.stratum_id, + Target.variable == variable, + Target.period == period, + Target.reform_id == 0, + ) + ).first() + if existing is None: + session.add( + Target( + variable=variable, + period=period, + value=value, + active=True, + source=source, + notes=notes, + stratum_id=stratum.stratum_id, + ) + ) + return + + existing.value = value + existing.active = True + existing.source = source + if notes is not None: + existing.notes = notes + + +def _load_aca_targets(session: Session, year: int, geo_strata: dict) -> None: + data = pd.read_csv( + STORAGE_FOLDER / "calibration_targets" / "aca_spending_and_enrollment_2024.csv" + ) + + # Monthly to yearly and normalize to national target to match loss.py. + data["spending"] = data["spending"] * 12 + data["spending"] = data["spending"] * (ACA_SPENDING_2024 / data["spending"].sum()) + + for _, row in data.iterrows(): + state = str(row["state"]).strip() + state_fips = STATE_ABBREV_TO_FIPS.get(state) + if state_fips is None: + logger.warning("Skipping ACA target for unknown state %s", state) + continue + state_fips = int(state_fips) + + parent_stratum_id = geo_strata["state"].get(state_fips) + if parent_stratum_id is None: + logger.warning("No geo stratum for state %s (%s)", state, state_fips) + continue + + spending_note = f"State FIPS {state_fips} ACA PTC spending" + enrollment_note = f"State FIPS {state_fips} ACA PTC enrollment" + + spending_constraints = [ + StratumConstraint( + constraint_variable="state_fips", + operation="==", + value=str(state_fips), + ), + ] + spending_stratum = _get_or_create_stratum( + session, + parent_stratum_id, + spending_note, + spending_constraints, + ) + _upsert_target( + session, + spending_stratum, + variable="aca_ptc", + period=year, + value=float(row["spending"]), + source="CMS Marketplace", + notes="Annualized state ACA PTC spending scaled to national total", + ) + + enrollment_constraints = [ + StratumConstraint( + constraint_variable="state_fips", + operation="==", + value=str(state_fips), + ), + StratumConstraint( + constraint_variable="aca_ptc", + operation=">", + value="0", + ), + StratumConstraint( + constraint_variable="is_aca_ptc_eligible", + operation="==", + value="True", + ), + ] + enrollment_stratum = _get_or_create_stratum( + session, + parent_stratum_id, + enrollment_note, + enrollment_constraints, + ) + _upsert_target( + session, + enrollment_stratum, + variable="person_count", + period=year, + value=float(row["enrollment"]), + source="CMS Marketplace", + notes="State ACA enrollment (eligible with positive PTC)", + ) + + +def _load_agi_state_targets(session: Session, year: int, geo_strata: dict) -> None: + soi_targets = pd.read_csv(STORAGE_FOLDER / "calibration_targets" / "agi_state.csv") + + for _, row in soi_targets.iterrows(): + state = str(row["GEO_NAME"]).strip() + state_fips = STATE_ABBREV_TO_FIPS.get(state) + if state_fips is None: + logger.warning("Skipping AGI target for unknown state %s", state) + continue + state_fips = int(state_fips) + + parent_stratum_id = geo_strata["state"].get(state_fips) + if parent_stratum_id is None: + logger.warning("No geo stratum for state %s (%s)", state, state_fips) + continue + + lower = float(row["AGI_LOWER_BOUND"]) + upper = float(row["AGI_UPPER_BOUND"]) + is_count = bool(row["IS_COUNT"]) + if is_count: + target_variable = "tax_unit_count" + note = ( + f"State FIPS {state_fips} AGI tax-unit count ({lower} <= AGI < {upper})" + ) + else: + target_variable = "adjusted_gross_income" + note = f"State FIPS {state_fips} AGI total ({lower} <= AGI < {upper})" + + constraints = [ + StratumConstraint( + constraint_variable="state_fips", + operation="==", + value=str(state_fips), + ), + StratumConstraint( + constraint_variable="adjusted_gross_income", + operation="<=", + value=str(upper), + ), + ] + if is_count: + if lower > 0: + constraints.append( + StratumConstraint( + constraint_variable="adjusted_gross_income", + operation=">=", + value=str(lower), + ) + ) + else: + constraints.append( + StratumConstraint( + constraint_variable="adjusted_gross_income", + operation=">", + value="0", + ) + ) + else: + constraints.append( + StratumConstraint( + constraint_variable="adjusted_gross_income", + operation=">=", + value=str(lower), + ) + ) + stratum = _get_or_create_stratum( + session, + parent_stratum_id, + note, + constraints, + ) + _upsert_target( + session, + stratum, + variable=target_variable, + period=year, + value=float(row["VALUE"]), + source="IRS SOI", + ) + + +def main() -> int: + _, year = etl_argparser( + "ETL for ACA spending/enrollment and AGI state targets", + allow_year=True, + ) + + database_url = f"sqlite:///{STORAGE_FOLDER / 'calibration' / 'policy_data.db'}" + engine = create_engine(database_url) + + with Session(engine) as session: + geo_strata = get_geographic_strata(session) + _load_aca_targets(session, year, geo_strata) + _load_agi_state_targets(session, year, geo_strata) + session.commit() + + logger.info("Loaded ACA and AGI state targets for %s", year) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/policyengine_us_data/db/etl_age.py b/policyengine_us_data/db/etl_age.py index 9ae148337..0d01d7e0a 100644 --- a/policyengine_us_data/db/etl_age.py +++ b/policyengine_us_data/db/etl_age.py @@ -239,7 +239,10 @@ def load_age_data(df_long, geo, year): def main(): - _, year = etl_argparser("ETL for age calibration targets") + _, year = etl_argparser( + "ETL for age calibration targets", + allow_year=True, + ) # --- ETL: Extract, Transform, Load ---- diff --git a/policyengine_us_data/db/etl_irs_soi.py b/policyengine_us_data/db/etl_irs_soi.py index b75cded12..ae5a553e8 100644 --- a/policyengine_us_data/db/etl_irs_soi.py +++ b/policyengine_us_data/db/etl_irs_soi.py @@ -1603,6 +1603,7 @@ def add_lag_arg(parser): args, dataset_year = etl_argparser( "ETL for IRS SOI calibration targets", extra_args_fn=add_lag_arg, + allow_year=True, ) lagged_year = dataset_year - args.lag geography_year = min(lagged_year, LATEST_PUBLISHED_GEOGRAPHIC_SOI_YEAR) diff --git a/policyengine_us_data/db/etl_medicaid.py b/policyengine_us_data/db/etl_medicaid.py index 9be880876..49a4a4af0 100644 --- a/policyengine_us_data/db/etl_medicaid.py +++ b/policyengine_us_data/db/etl_medicaid.py @@ -263,7 +263,10 @@ def load_medicaid_data(long_state, long_cd, year): def main(): - _, year = etl_argparser("ETL for Medicaid calibration targets") + _, year = etl_argparser( + "ETL for Medicaid calibration targets", + allow_year=True, + ) # Extract ------------------------------ state_admin_df = extract_administrative_medicaid_data(year) diff --git a/policyengine_us_data/db/etl_pregnancy.py b/policyengine_us_data/db/etl_pregnancy.py index e8756cfb5..b5b5567cd 100644 --- a/policyengine_us_data/db/etl_pregnancy.py +++ b/policyengine_us_data/db/etl_pregnancy.py @@ -333,7 +333,10 @@ def get_state_pregnancy_rates( def main(): - _, year = etl_argparser("ETL for pregnancy calibration targets") + _, year = etl_argparser( + "ETL for pregnancy calibration targets", + allow_year=True, + ) # CDC VSRR has provisional data for the most recent 1-2 years. # ACS releases lag by ~1 year (e.g. ACS 2023 released Sep 2024). diff --git a/policyengine_us_data/db/etl_snap.py b/policyengine_us_data/db/etl_snap.py index df791c408..def8a6890 100644 --- a/policyengine_us_data/db/etl_snap.py +++ b/policyengine_us_data/db/etl_snap.py @@ -293,7 +293,10 @@ def load_survey_snap_data(survey_df, year, snap_stratum_lookup): def main(): - _, year = etl_argparser("ETL for SNAP calibration targets") + _, year = etl_argparser( + "ETL for SNAP calibration targets", + allow_year=True, + ) # Extract --------- zip_file_admin = extract_administrative_snap_data() diff --git a/policyengine_us_data/db/etl_state_income_tax.py b/policyengine_us_data/db/etl_state_income_tax.py index f9035f74d..e36a37434 100644 --- a/policyengine_us_data/db/etl_state_income_tax.py +++ b/policyengine_us_data/db/etl_state_income_tax.py @@ -264,7 +264,10 @@ def main(): level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) - _, year = etl_argparser("ETL for state income tax calibration targets") + _, year = etl_argparser( + "ETL for state income tax calibration targets", + allow_year=True, + ) data_year = min(year, LATEST_STC_YEAR) if data_year != year: diff --git a/policyengine_us_data/utils/db.py b/policyengine_us_data/utils/db.py index 0991dada0..6fad2f3df 100644 --- a/policyengine_us_data/utils/db.py +++ b/policyengine_us_data/utils/db.py @@ -15,6 +15,8 @@ def etl_argparser( description: str, extra_args_fn=None, + *, + allow_year: bool = False, ) -> Tuple[argparse.Namespace, int]: """Shared argument parsing for ETL scripts. @@ -22,22 +24,32 @@ def etl_argparser( description: Description for the argparse help text. extra_args_fn: Optional callable that receives the parser to add extra arguments before parsing. + allow_year: If True, allow --year to be explicitly optional for ETLs + that can fall back to the default year. Returns: (args, year) tuple. """ parser = argparse.ArgumentParser(description=description) - parser.add_argument( - "--year", - type=int, - default=DEFAULT_YEAR, - help="Target year for calibration data. Default: %(default)s", - ) + if allow_year: + parser.add_argument( + "--year", + type=int, + default=None, + help="Target year for calibration data. Defaults to %(default)s.", + ) + else: + parser.add_argument( + "--year", + type=int, + default=DEFAULT_YEAR, + help="Target year for calibration data. Default: %(default)s", + ) if extra_args_fn is not None: extra_args_fn(parser) args = parser.parse_args() - year = args.year + year = args.year if args.year is not None else DEFAULT_YEAR print(f"Using year: {year}") return args, year @@ -78,7 +90,7 @@ def get_simple_stratum_by_ucgid(session: Session, ucgid: str) -> Optional[Stratu def get_root_strata(session: Session) -> List[Stratum]: """Finds all strata that do not have a parent""" - statement = select(Stratum).where(Stratum.parent_stratum_id == None) + statement = select(Stratum).where(Stratum.parent_stratum_id.is_(None)) return session.exec(statement).all() diff --git a/tests/integration/test_cps_generation.py b/tests/integration/test_cps_generation.py index 78621e723..5d60761d6 100644 --- a/tests/integration/test_cps_generation.py +++ b/tests/integration/test_cps_generation.py @@ -1,8 +1,9 @@ +import h5py +import numpy as np import pandas as pd def test_add_tips_derives_tipped_status_from_raw_cps(monkeypatch): - import policyengine_us import policyengine_us_data.datasets.sipp as sipp_module from policyengine_us_data.datasets.cps.cps import add_tips @@ -34,29 +35,31 @@ class FakeDataset: def __init__(self): self.raw_cps = FakeRawCPS() self.saved_dataset = None + self.base_dataset = { + "person_id": [1, 2], + "person_household_id": [10, 20], + "employment_income": [25_000.0, 30_000.0], + "taxable_interest_income": [100.0, 0.0], + "tax_exempt_interest_income": [25.0, 0.0], + "qualified_dividend_income": [40.0, 0.0], + "non_qualified_dividend_income": [10.0, 0.0], + "rental_income": [0.0, 0.0], + "age": [30, 45], + "household_weight": [1.0, 1.0], + "is_female": [False, True], + } def save_dataset(self, data): - self.saved_dataset = data - - class FakeMicrosimulation: - def __init__(self, dataset): - self.dataset = dataset + if self.saved_dataset is None: + self.saved_dataset = {} + if hasattr(data, "items"): + for key, value in data.items(): + self.saved_dataset[key] = ( + value.values if hasattr(value, "values") else value + ) - def calculate_dataframe(self, columns, year): - base = pd.DataFrame( - { - "person_id": [1, 2], - "household_id": [10, 20], - "employment_income": [25_000, 30_000], - "interest_income": [0.0, 0.0], - "dividend_income": [0.0, 0.0], - "rental_income": [0.0, 0.0], - "age": [30, 45], - "household_weight": [1.0, 1.0], - "is_female": [False, True], - } - ) - return base[columns] + def load_dataset(self): + return self.base_dataset class FakeTipModel: def predict(self, X_test, mean_quantile): @@ -65,6 +68,8 @@ def predict(self, X_test, mean_quantile): class FakeAssetModel: def predict(self, X_test, mean_quantile): + assert X_test["interest_income"].tolist() == [125.0, 0.0] + assert X_test["dividend_income"].tolist() == [50.0, 0.0] return pd.DataFrame( { "bank_account_assets": [0.0, 0.0], @@ -73,7 +78,6 @@ def predict(self, X_test, mean_quantile): } ) - monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation) monkeypatch.setattr(sipp_module, "get_tip_model", lambda: FakeTipModel()) monkeypatch.setattr(sipp_module, "get_asset_model", lambda: FakeAssetModel()) @@ -90,3 +94,105 @@ def predict(self, X_test, mean_quantile): assert dataset.saved_dataset["bank_account_assets"].tolist() == [0.0, 0.0] assert dataset.saved_dataset["stock_assets"].tolist() == [0.0, 0.0] assert dataset.saved_dataset["bond_assets"].tolist() == [0.0, 0.0] + + +def test_add_rent_requests_person_level_frames(monkeypatch, tmp_path): + import policyengine_us + import policyengine_us_data.datasets.acs.acs as acs_module + from policyengine_us_data.datasets.cps.cps import add_rent + + fake_acs_dataset = object() + monkeypatch.setattr(acs_module, "ACS_2022", fake_acs_dataset) + + class FakeDataset: + def __init__(self): + self.file_path = tmp_path / "cps_2024.h5" + self.saved_datasets = [] + + def save_dataset(self, data): + self.saved_datasets.append(data.copy()) + + class FakeMicrosimulation: + calls = [] + + def __init__(self, dataset): + self.dataset = dataset + + def calculate_dataframe( + self, columns, period=None, map_to=None, use_weights=True + ): + FakeMicrosimulation.calls.append((self.dataset, tuple(columns), map_to)) + if self.dataset is fake_acs_dataset: + rows = 10_050 + return pd.DataFrame( + { + "is_household_head": [True] * rows, + "age": np.full(rows, 45, dtype=np.int32), + "is_male": np.ones(rows, dtype=bool), + "tenure_type": np.array(["RENTED"] * rows), + "employment_income": np.full(rows, 50_000, dtype=np.int32), + "self_employment_income": np.zeros(rows, dtype=np.int32), + "social_security": np.zeros(rows, dtype=np.int32), + "pension_income": np.zeros(rows, dtype=np.int32), + "state_code_str": np.array(["CA"] * rows), + "household_size": np.full(rows, 2, dtype=np.int32), + "rent": np.full(rows, 1_500, dtype=np.int32), + "real_estate_taxes": np.zeros(rows, dtype=np.int32), + } + )[list(columns)] + + return pd.DataFrame( + { + "is_household_head": [True, False, True], + "age": [40, 12, 70], + "is_male": [True, False, False], + "tenure_type": ["RENTED", "NONE", "OWNED_WITH_MORTGAGE"], + "employment_income": [60_000, 0, 10_000], + "self_employment_income": [0, 0, 0], + "social_security": [0, 0, 8_000], + "pension_income": [0, 0, 2_000], + "state_code_str": ["CA", "CA", "NY"], + "household_size": [2, 2, 1], + } + )[list(columns)] + + class FakeQRFModel: + def predict(self, X_test): + assert len(X_test) == 2 + return pd.DataFrame( + { + "rent": [1_200.0, 0.0], + "real_estate_taxes": [0.0, 4_000.0], + } + ) + + class FakeQRF: + def fit(self, X_train, predictors, imputed_variables): + assert len(X_train) == 10_000 + assert predictors[-1] == "household_size" + assert imputed_variables == ["rent", "real_estate_taxes"] + return FakeQRFModel() + + monkeypatch.setattr(policyengine_us, "Microsimulation", FakeMicrosimulation) + monkeypatch.setattr("policyengine_us_data.datasets.cps.cps.QRF", FakeQRF) + + dataset = FakeDataset() + with h5py.File(dataset.file_path, "w") as stale: + stale.create_dataset("stale_var", data=np.array([1], dtype=np.int8)) + + cps = { + "age": np.array([40, 12, 70], dtype=np.int32), + "spm_unit_capped_housing_subsidy_reported": np.zeros(3, dtype=np.float32), + } + person = pd.DataFrame({"P_SEQ": [1, 2, 1]}) + household = pd.DataFrame({"H_TENURE": [2, 1]}) + + add_rent(dataset, cps, person, household) + + assert [call[2] for call in FakeMicrosimulation.calls] == ["person", "person"] + np.testing.assert_array_equal(cps["rent"], np.array([1200, 0, 0], dtype=np.int32)) + np.testing.assert_array_equal( + cps["real_estate_taxes"], + np.array([0, 0, 4000], dtype=np.int32), + ) + assert not dataset.file_path.exists() diff --git a/tests/integration/test_database_build.py b/tests/integration/test_database_build.py index 37f9f5f6c..5ea0cb24d 100644 --- a/tests/integration/test_database_build.py +++ b/tests/integration/test_database_build.py @@ -31,6 +31,7 @@ ("db/etl_tanf.py", ["--year", "2024"]), ("db/etl_state_income_tax.py", ["--year", "2024"]), ("db/etl_irs_soi.py", ["--year", "2024"]), + ("db/etl_aca_agi_state_targets.py", ["--year", "2024"]), ("db/etl_pregnancy.py", ["--year", "2024"]), ("db/validate_database.py", []), ] @@ -196,6 +197,52 @@ def test_state_income_tax_targets(built_db): assert tn_val == 2_926_000 +def test_state_aca_and_agi_targets_loaded(built_db): + """ACA spending/enrollment and AGI state targets should be present.""" + conn = sqlite3.connect(str(built_db)) + aca_spending = conn.execute( + """ + SELECT COUNT(*) + FROM target_overview + WHERE variable = 'aca_ptc' + AND geo_level = 'state' + """ + ).fetchone()[0] + aca_enrollment = conn.execute( + """ + SELECT COUNT(*) + FROM target_overview + WHERE variable = 'person_count' + AND geo_level = 'state' + AND domain_variable LIKE '%aca_ptc%' + """ + ).fetchone()[0] + agi_amount = conn.execute( + """ + SELECT COUNT(*) + FROM target_overview + WHERE variable = 'adjusted_gross_income' + AND geo_level = 'state' + AND domain_variable LIKE '%adjusted_gross_income%' + """ + ).fetchone()[0] + agi_count = conn.execute( + """ + SELECT COUNT(*) + FROM target_overview + WHERE variable = 'tax_unit_count' + AND geo_level = 'state' + AND domain_variable LIKE '%adjusted_gross_income%' + """ + ).fetchone()[0] + conn.close() + + assert aca_spending > 0, "Missing ACA spending targets by state" + assert aca_enrollment > 0, "Missing ACA enrollment targets by state" + assert agi_amount > 0, "Missing state AGI amount targets" + assert agi_count > 0, "Missing state AGI count targets" + + def test_tanf_targets(built_db): """TANF recipient-family and spending targets should load from ACF files.""" conn = sqlite3.connect(str(built_db))