Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ database:
python policyengine_us_data/db/etl_snap.py --year $(YEAR)
python policyengine_us_data/db/etl_state_income_tax.py --year $(YEAR)
python policyengine_us_data/db/etl_irs_soi.py --year $(YEAR)
python policyengine_us_data/db/etl_aca_agi_state_targets.py --year $(YEAR)
python policyengine_us_data/db/etl_pregnancy.py --year $(YEAR)
python policyengine_us_data/db/validate_database.py

Expand Down
1 change: 1 addition & 0 deletions policyengine_us_data/db/create_field_valid_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def populate_field_valid_values(session: Session) -> None:
source_values = [
("source", "Census ACS S0101", "survey"),
("source", "IRS SOI", "administrative"),
("source", "CMS Marketplace", "administrative"),
("source", "CMS Medicaid", "administrative"),
("source", "Census ACS S2704", "survey"),
("source", "USDA FNS SNAP", "administrative"),
Expand Down
5 changes: 4 additions & 1 deletion policyengine_us_data/db/create_initial_strata.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def fetch_congressional_districts(year):


def main():
_, year = etl_argparser("Create initial geographic strata for calibration")
_, year = etl_argparser(
"Create initial geographic strata for calibration",
allow_year=True,
)

# State FIPS to name/abbreviation mapping
STATE_NAMES = {
Expand Down
301 changes: 301 additions & 0 deletions policyengine_us_data/db/etl_aca_agi_state_targets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,301 @@
"""ETL for ACA spending/enrollment and AGI state targets into policy_data.db."""

from __future__ import annotations

import logging
import hashlib

import pandas as pd
from sqlmodel import Session, create_engine, select

from policyengine_us_data.db.create_database_tables import (
Stratum,
StratumConstraint,
Target,
)
from policyengine_us_data.storage import STORAGE_FOLDER
from policyengine_us_data.utils.census import STATE_ABBREV_TO_FIPS
from policyengine_us_data.utils.db import etl_argparser, get_geographic_strata

logger = logging.getLogger(__name__)

ACA_SPENDING_2024 = 9.8e10


def _definition_hash(parent_stratum_id: int, constraints: list[StratumConstraint]) -> str:
constraint_strings = [
f"{c.constraint_variable}|{c.operation}|{c.value}" for c in constraints
]
constraint_strings.sort()
fingerprint_text = f"{parent_stratum_id}\n" + "\n".join(constraint_strings)
return hashlib.sha256(fingerprint_text.encode("utf-8")).hexdigest()


def _get_or_create_stratum(
session: Session,
parent_stratum_id: int,
note: str,
constraints: list[StratumConstraint],
) -> Stratum:
definition_hash = _definition_hash(parent_stratum_id, constraints)
existing = session.exec(
select(Stratum).where(Stratum.definition_hash == definition_hash)
).first()
if existing is not None:
return existing

stratum = Stratum(
parent_stratum_id=parent_stratum_id,
notes=note,
)
stratum.constraints_rel = constraints
session.add(stratum)
return stratum


def _upsert_target(
session: Session,
stratum: Stratum,
*,
variable: str,
period: int,
value: float,
source: str,
notes: str | None = None,
) -> None:
if stratum.stratum_id is None:
stratum.targets_rel.append(
Target(
variable=variable,
period=period,
value=value,
active=True,
source=source,
notes=notes,
)
)
return

existing = session.exec(
select(Target).where(
Target.stratum_id == stratum.stratum_id,
Target.variable == variable,
Target.period == period,
Target.reform_id == 0,
)
).first()
if existing is None:
session.add(
Target(
variable=variable,
period=period,
value=value,
active=True,
source=source,
notes=notes,
stratum_id=stratum.stratum_id,
)
)
return

existing.value = value
existing.active = True
existing.source = source
if notes is not None:
existing.notes = notes


def _load_aca_targets(session: Session, year: int, geo_strata: dict) -> None:
data = pd.read_csv(
STORAGE_FOLDER / "calibration_targets" / "aca_spending_and_enrollment_2024.csv"
)

# Monthly to yearly and normalize to national target to match loss.py.
data["spending"] = data["spending"] * 12
data["spending"] = data["spending"] * (
ACA_SPENDING_2024 / data["spending"].sum()
)

for _, row in data.iterrows():
state = str(row["state"]).strip()
state_fips = STATE_ABBREV_TO_FIPS.get(state)
if state_fips is None:
logger.warning("Skipping ACA target for unknown state %s", state)
continue
state_fips = int(state_fips)

parent_stratum_id = geo_strata["state"].get(state_fips)
if parent_stratum_id is None:
logger.warning("No geo stratum for state %s (%s)", state, state_fips)
continue

spending_note = f"State FIPS {state_fips} ACA PTC spending"
enrollment_note = f"State FIPS {state_fips} ACA PTC enrollment"

spending_constraints = [
StratumConstraint(
constraint_variable="state_fips",
operation="==",
value=str(state_fips),
),
]
spending_stratum = _get_or_create_stratum(
session,
parent_stratum_id,
spending_note,
spending_constraints,
)
_upsert_target(
session,
spending_stratum,
variable="aca_ptc",
period=year,
value=float(row["spending"]),
source="CMS Marketplace",
notes="Annualized state ACA PTC spending scaled to national total",
)

enrollment_constraints = [
StratumConstraint(
constraint_variable="state_fips",
operation="==",
value=str(state_fips),
),
StratumConstraint(
constraint_variable="aca_ptc",
operation=">",
value="0",
),
StratumConstraint(
constraint_variable="is_aca_ptc_eligible",
operation="==",
value="True",
),
]
enrollment_stratum = _get_or_create_stratum(
session,
parent_stratum_id,
enrollment_note,
enrollment_constraints,
)
_upsert_target(
session,
enrollment_stratum,
variable="person_count",
period=year,
value=float(row["enrollment"]),
source="CMS Marketplace",
notes="State ACA enrollment (eligible with positive PTC)",
)


def _load_agi_state_targets(session: Session, year: int, geo_strata: dict) -> None:
soi_targets = pd.read_csv(
STORAGE_FOLDER / "calibration_targets" / "agi_state.csv"
)

for _, row in soi_targets.iterrows():
state = str(row["GEO_NAME"]).strip()
state_fips = STATE_ABBREV_TO_FIPS.get(state)
if state_fips is None:
logger.warning("Skipping AGI target for unknown state %s", state)
continue
state_fips = int(state_fips)

parent_stratum_id = geo_strata["state"].get(state_fips)
if parent_stratum_id is None:
logger.warning("No geo stratum for state %s (%s)", state, state_fips)
continue

lower = float(row["AGI_LOWER_BOUND"])
upper = float(row["AGI_UPPER_BOUND"])
is_count = bool(row["IS_COUNT"])
variable = str(row["VARIABLE"]).strip()

if is_count:
target_variable = "tax_unit_count"
note = (
f"State FIPS {state_fips} AGI tax-unit count "
f"({lower} <= AGI < {upper})"
)
else:
target_variable = "adjusted_gross_income"
note = (
f"State FIPS {state_fips} AGI total "
f"({lower} <= AGI < {upper})"
)

constraints = [
StratumConstraint(
constraint_variable="state_fips",
operation="==",
value=str(state_fips),
),
StratumConstraint(
constraint_variable="adjusted_gross_income",
operation="<=",
value=str(upper),
),
]
if is_count:
if lower > 0:
constraints.append(
StratumConstraint(
constraint_variable="adjusted_gross_income",
operation=">=",
value=str(lower),
)
)
else:
constraints.append(
StratumConstraint(
constraint_variable="adjusted_gross_income",
operation=">",
value="0",
)
)
else:
constraints.append(
StratumConstraint(
constraint_variable="adjusted_gross_income",
operation=">=",
value=str(lower),
)
)
stratum = _get_or_create_stratum(
session,
parent_stratum_id,
note,
constraints,
)
_upsert_target(
session,
stratum,
variable=target_variable,
period=year,
value=float(row["VALUE"]),
source="IRS SOI",
)


def main() -> int:
_, year = etl_argparser(
"ETL for ACA spending/enrollment and AGI state targets",
allow_year=True,
)

database_url = f"sqlite:///{STORAGE_FOLDER / 'calibration' / 'policy_data.db'}"
engine = create_engine(database_url)

with Session(engine) as session:
geo_strata = get_geographic_strata(session)
_load_aca_targets(session, year, geo_strata)
_load_agi_state_targets(session, year, geo_strata)
session.commit()

logger.info("Loaded ACA and AGI state targets for %s", year)
return 0


if __name__ == "__main__":
raise SystemExit(main())
5 changes: 4 additions & 1 deletion policyengine_us_data/db/etl_age.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,10 @@ def load_age_data(df_long, geo, year):


def main():
_, year = etl_argparser("ETL for age calibration targets")
_, year = etl_argparser(
"ETL for age calibration targets",
allow_year=True,
)

# --- ETL: Extract, Transform, Load ----

Expand Down
1 change: 1 addition & 0 deletions policyengine_us_data/db/etl_irs_soi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ def add_lag_arg(parser):
args, dataset_year = etl_argparser(
"ETL for IRS SOI calibration targets",
extra_args_fn=add_lag_arg,
allow_year=True,
)
lagged_year = dataset_year - args.lag
geography_year = min(lagged_year, LATEST_PUBLISHED_GEOGRAPHIC_SOI_YEAR)
Expand Down
5 changes: 4 additions & 1 deletion policyengine_us_data/db/etl_medicaid.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ def load_medicaid_data(long_state, long_cd, year):


def main():
_, year = etl_argparser("ETL for Medicaid calibration targets")
_, year = etl_argparser(
"ETL for Medicaid calibration targets",
allow_year=True,
)

# Extract ------------------------------
state_admin_df = extract_administrative_medicaid_data(year)
Expand Down
5 changes: 4 additions & 1 deletion policyengine_us_data/db/etl_pregnancy.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,10 @@ def get_state_pregnancy_rates(


def main():
_, year = etl_argparser("ETL for pregnancy calibration targets")
_, year = etl_argparser(
"ETL for pregnancy calibration targets",
allow_year=True,
)

# CDC VSRR has provisional data for the most recent 1-2 years.
# ACS releases lag by ~1 year (e.g. ACS 2023 released Sep 2024).
Expand Down
5 changes: 4 additions & 1 deletion policyengine_us_data/db/etl_snap.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,10 @@ def load_survey_snap_data(survey_df, year, snap_stratum_lookup):


def main():
_, year = etl_argparser("ETL for SNAP calibration targets")
_, year = etl_argparser(
"ETL for SNAP calibration targets",
allow_year=True,
)

# Extract ---------
zip_file_admin = extract_administrative_snap_data()
Expand Down
5 changes: 4 additions & 1 deletion policyengine_us_data/db/etl_state_income_tax.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ def main():
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
_, year = etl_argparser("ETL for state income tax calibration targets")
_, year = etl_argparser(
"ETL for state income tax calibration targets",
allow_year=True,
)

data_year = min(year, LATEST_STC_YEAR)
if data_year != year:
Expand Down
Loading
Loading