diff --git a/bedrock/publish/README.md b/bedrock/publish/README.md index fbe4a64b..3b381671 100644 --- a/bedrock/publish/README.md +++ b/bedrock/publish/README.md @@ -19,10 +19,11 @@ shareable file formats, mirroring the shape of `useeior`'s configs via `bedrock.publish.emission_factors`. Emits a long-form CO2e table (`CornerstoneSupplyChainGHG_CO2e_USD.csv`) with three supply-chain factor columns (without margins, margins, with margins). - Purchaser-price adjustment uses industry price ratios from - `model_base_year` to `--dollar_year`. Phi and margin factors are - placeholders (identity Phi, zero margins) until real valuation matrices - land. + Purchaser-price adjustment applies PRO:PUR (Phi) from + `derive_phi_cornerstone_usa()` and rebases denominators from + `model_base_year` to `--dollar_year` via commodity price indices. + Margin SEF column remains zero until `N_margin` is wired. + CLI: `--purchaser_price` / `--no-purchaser_price` (default on). - **Supply-chain factors (R repo)**: not ported. Upstream counterpart: [cornerstone-data/supply-chain-factors](https://github.com/cornerstone-data/supply-chain-factors). - **Import-emissions matrices** (`A_m`, `M_m`, `N_m`): require real @@ -30,10 +31,13 @@ shareable file formats, mirroring the shape of `useeior`'s produce. Registered in `excel/writer.py` as `lambda: None` placeholders; sheets are omitted via the "skip if NULL" rule until `B_imp` lands. -- **Useeior-only valuation matrices** (`Rho`, `Phi`, `Tau`) and - long-form metadata (`demands`, `SectorCrosswalk`): registered as - placeholders. Each carries an inline `TODO` pointing at the design - call or missing derivation. +- **`Phi` sheet**: emitted when `useeio_margins` or + `cornerstone_industry_avg_margins` is active (`get_Phi()` in + `model_objects.py`). Values are at **`model_base_year` only** (one + column per config, e.g. 2017 for `useeio_phoebe_23`); not a full + year panel like useeior's `Phi` matrix yet. +- **Useeior-only valuation matrices** (`Rho`, `Tau`) and long-form + metadata (`demands`, `SectorCrosswalk`): registered as placeholders. ## Known divergence from useeior (B units) @@ -165,3 +169,13 @@ factor. Commodity codes are emitted with a `/US` suffix. Shared cached getters live in `bedrock/publish/model_objects.py` (also used by the XLSX publisher). + +### Phi / SEF validation (what is compared) + +| Artifact | Automated test | Year / basis | +|----------|----------------|--------------| +| `Phi` sheet vs pinned phoebe USEEIO workbook | `test_published_phi_matches_useeio_workbook` (`eeio_integration`) | Workbook **`Phi` column `model_base_year`** only (2017 on phoebe pin) | +| SEF CSV wiring (Phi × CPI on `N`) | `test_sef_phi_wiring` (`eeio_integration`) | Export at `--dollar_year` (test uses 2024); **Phi stays at `model_base_year` (2017)** | +| SEF vs [supply-chain-factors](https://github.com/cornerstone-data/supply-chain-factors) or Zenodo NAICS publish | None in bedrock | Not in scope for Phi PR (#449) | + +Default `uv run pytest` excludes `eeio_integration`; run `-m eeio_integration` for workbook Phi parity and SEF wiring tests. diff --git a/bedrock/publish/__tests__/test_sef_phi_wiring.py b/bedrock/publish/__tests__/test_sef_phi_wiring.py new file mode 100644 index 00000000..7f6598bf --- /dev/null +++ b/bedrock/publish/__tests__/test_sef_phi_wiring.py @@ -0,0 +1,56 @@ +"""Tests that emission-factor publish applies Phi and dollar-year rebasing.""" + +from __future__ import annotations + +import numpy as np +import pytest + +from bedrock.publish.__tests__._helpers import setup_config, teardown +from bedrock.publish.emission_factors.table import ( + COL_WITHOUT, + build_emission_factor_table, + finalize_cornerstone_ef_table, +) +from bedrock.publish.model_objects import get_N +from bedrock.transform.iot.derive_PRO_to_PUR_ratio import phi_for_sectors +from bedrock.utils.config.usa_config import get_usa_config +from bedrock.utils.economic.inflation_helpers_cornerstone import ( + get_vnorm_adjusted_commodity_price_ratio, +) +from bedrock.utils.emissions.characterization import GREENHOUSE_GASES_INDICATOR + + +@pytest.mark.eeio_integration +def test_sef_applies_phi_and_dollar_year() -> None: + """without_margins / (N_producer / cpi) equals phi per sector.""" + setup_config('useeio_phoebe_23') + try: + dollar_year = 2024 + cfg = get_usa_config() + table = finalize_cornerstone_ef_table( + build_emission_factor_table( + dollar_year=dollar_year, + purchaser_price=True, + ) + ) + n_producer = get_N().loc[GREENHOUSE_GASES_INDICATOR].astype(float) + pi = get_vnorm_adjusted_commodity_price_ratio(cfg.model_base_year, dollar_year) + n_producer_cpi = n_producer / pi.reindex(n_producer.index, fill_value=1.0) + phi = phi_for_sectors(n_producer.index) + + for _, row in table.iterrows(): + code = str(row['Cornerstone Commodity Code']).removesuffix('/US') + without = float(row[COL_WITHOUT]) + if code not in n_producer_cpi.index: + continue + denom = float(n_producer_cpi[code]) + if denom == 0.0: + continue + np.testing.assert_allclose( + without / denom, + float(phi[code]), + rtol=1e-9, + err_msg=f'sector {code}', + ) + finally: + teardown() diff --git a/bedrock/publish/__tests__/test_sef_vs_useeio_baseline.py b/bedrock/publish/__tests__/test_sef_vs_useeio_baseline.py new file mode 100644 index 00000000..569229cf --- /dev/null +++ b/bedrock/publish/__tests__/test_sef_vs_useeio_baseline.py @@ -0,0 +1,102 @@ +"""eeio_integration: published Phi vs pinned phoebe USEEIO workbook.""" + +from __future__ import annotations + +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +import bedrock.utils.config.common as common +from bedrock.publish.__tests__._helpers import clear_all_caches, teardown +from bedrock.publish.model_objects import PUBLISH_LOCATION, get_Phi +from bedrock.utils.config.usa_config import ( + get_usa_config, + reset_usa_config, + set_global_usa_config, +) +from bedrock.utils.validation.useeio_excel_baseline import ( + _local_cache_path, + ensure_useeio_xlsx_local, + load_useeio_baseline_pin_overrides, +) + +_PIN_JSON = ( + Path(__file__).resolve().parents[2] + / 'utils' + / 'snapshots' + / 'useeio_baseline_pin.json' +) + +_PHI_RTOL = 0.01 + + +def _setup_phoebe_with_useeio_pin() -> dict[str, str]: + pin = load_useeio_baseline_pin_overrides(str(_PIN_JSON)) + overrides: dict[str, object] = { + **pin, + 'diagnostics_baseline_source': 'gcs_useeio_xlsx', + } + clear_all_caches() + reset_usa_config(should_reset_env_var=True) + set_global_usa_config( + 'useeio_phoebe_23', + diagnostics_cli_overrides=overrides, + ) + common.download_fba_on_api_error = True + return pin + + +def _strip_loc_suffix(values: pd.Index) -> pd.Index: + suffix = f'/{PUBLISH_LOCATION}' + return pd.Index( + [str(v)[: -len(suffix)] if str(v).endswith(suffix) else str(v) for v in values], + name=values.name, + ) + + +def _load_workbook_phi(xlsx_path: str, year: int) -> pd.Series: + raw = pd.read_excel(xlsx_path, sheet_name='Phi', header=None, engine='openpyxl') + headers = ( + raw.iloc[0, 1:].astype(str).str.strip().str.replace(r'\.0$', '', regex=True) + ) + sectors = raw.iloc[1:, 0].astype(str).str.strip() + values = raw.iloc[1:, 1:].copy() + values.columns = pd.Index(headers) + values.index = pd.Index(sectors) + year_str = str(year) + phi = values[year_str].astype(float) + phi.index = pd.Index( + [s[:-3] if s.endswith('/US') else s for s in phi.index], name='sector' + ) + return phi.dropna() + + +@pytest.mark.eeio_integration +def test_published_phi_matches_useeio_workbook() -> None: + try: + pin = _setup_phoebe_with_useeio_pin() + cfg = get_usa_config() + xlsx = _local_cache_path(pin['useeio_baseline_xlsx_gs_uri']) + ensure_useeio_xlsx_local( + pin['useeio_baseline_xlsx_gs_uri'], + pin['useeio_baseline_xlsx_sha256'], + xlsx, + ) + bedrock_phi_df = get_Phi() + assert bedrock_phi_df is not None + bedrock_phi = bedrock_phi_df[str(cfg.model_base_year)].astype(float) + bedrock_phi.index = _strip_loc_suffix(bedrock_phi.index) + + ref_phi = _load_workbook_phi(xlsx, cfg.model_base_year) + common_sectors = bedrock_phi.index.intersection(ref_phi.index) + assert len(common_sectors) > 100 + np.testing.assert_allclose( + bedrock_phi.reindex(common_sectors).astype(float), + ref_phi.reindex(common_sectors).astype(float), + rtol=_PHI_RTOL, + atol=1e-12, + ) + finally: + teardown() diff --git a/bedrock/publish/cache_reset.py b/bedrock/publish/cache_reset.py index d803d9b0..480b5bd3 100644 --- a/bedrock/publish/cache_reset.py +++ b/bedrock/publish/cache_reset.py @@ -4,6 +4,10 @@ from collections.abc import Callable +from bedrock.extract.iot.io_2017 import ( + load_2017_margins_after_redef_usa, + load_2017_margins_before_redef_usa, +) from bedrock.publish.model_objects import clear_publish_caches from bedrock.transform.eeio.cornerstone_disagg_pipeline import ( cornerstone_sector_disagg_active, @@ -55,6 +59,8 @@ derive_cornerstone_Aq_scaled, derive_cornerstone_B_non_finetuned, derive_cornerstone_y_nab, + load_2017_margins_before_redef_usa, + load_2017_margins_after_redef_usa, ] diff --git a/bedrock/publish/emission_factors/cli.py b/bedrock/publish/emission_factors/cli.py index 998333a6..dacf1d09 100644 --- a/bedrock/publish/emission_factors/cli.py +++ b/bedrock/publish/emission_factors/cli.py @@ -50,11 +50,17 @@ def _default_output_dir(config_name: str) -> str: default=False, help='Also write M_pur and N_pur CSVs under matrices/.', ) +@click.option( + '--purchaser_price/--no-purchaser_price', + default=True, + help='Apply PRO:PUR (Phi) purchaser-price adjustment (default: on).', +) def publish( config_name: str, dollar_year: int, output_dir: str | None, write_matrices: bool, + purchaser_price: bool, ) -> None: from bedrock.publish.emission_factors.writer import write_emission_factors @@ -68,6 +74,7 @@ def publish( local_dir, config_name=config_name, dollar_year=dollar_year, + purchaser_price=purchaser_price, write_matrices=write_matrices, ) diff --git a/bedrock/publish/emission_factors/placeholders.py b/bedrock/publish/emission_factors/placeholders.py index 7103426a..57bc8849 100644 --- a/bedrock/publish/emission_factors/placeholders.py +++ b/bedrock/publish/emission_factors/placeholders.py @@ -4,17 +4,13 @@ import pandas as pd +from bedrock.transform.iot.derive_PRO_to_PUR_ratio import phi_for_sectors from bedrock.utils.config.usa_config import get_usa_config from bedrock.utils.economic.inflation_helpers_cornerstone import ( get_vnorm_adjusted_commodity_price_ratio, ) -def placeholder_phi(sector_index: pd.Index) -> pd.Series[float]: - """PRO:PUR price ratio; identity until real Phi is wired.""" - return pd.Series(1.0, index=sector_index, dtype=float) - - def placeholder_margin_ef(without_margins: pd.Series) -> pd.Series[float]: """Per-sector margin supply-chain factors; zeros until N_margin is wired.""" return pd.Series(0.0, index=without_margins.index, dtype=float) @@ -39,8 +35,7 @@ def adjust_publish_matrix( out = out.div(price_ratio_for_columns.values, axis=1) if purchaser_price: - phi = placeholder_phi(out.columns) - aligned_phi = phi.reindex(out.columns, fill_value=1.0) - out = out.mul(aligned_phi.values, axis=1) + phi = phi_for_sectors(out.columns) + out = out.mul(phi.reindex(out.columns, fill_value=1.0).values, axis=1) return out diff --git a/bedrock/publish/emission_factors/table.py b/bedrock/publish/emission_factors/table.py index d072a0b5..69461c54 100644 --- a/bedrock/publish/emission_factors/table.py +++ b/bedrock/publish/emission_factors/table.py @@ -36,8 +36,9 @@ def _greenhouse_gases_row(n: pd.DataFrame) -> pd.Series: return cast(pd.Series, row) -def _unit_label(dollar_year: int) -> str: - return f'kg CO2e / {dollar_year} USD, purchaser price' +def _unit_label(dollar_year: int, *, purchaser_price: bool) -> str: + price_type = 'purchaser price' if purchaser_price else 'producer price' + return f'kg CO2e / {dollar_year} USD, {price_type}' def _commodity_base_code(code: str) -> str: @@ -55,12 +56,14 @@ def _is_excluded_commodity(code: str) -> bool: return False -def build_emission_factor_table(*, dollar_year: int) -> pd.DataFrame: - """Long-form CO2e supply-chain factors at purchaser price in ``dollar_year``.""" +def build_emission_factor_table( + *, dollar_year: int, purchaser_price: bool = True +) -> pd.DataFrame: + """Long-form CO2e supply-chain factors in ``dollar_year`` USD.""" n_pur = adjust_publish_matrix( get_N(), dollar_year=dollar_year, - purchaser_price=True, + purchaser_price=purchaser_price, ) without = _greenhouse_gases_row(n_pur) margins = placeholder_margin_ef(without) @@ -73,7 +76,7 @@ def build_emission_factor_table(*, dollar_year: int) -> pd.DataFrame: COL_CODE: code, COL_NAME: COMMODITY_DESC.get(code, ''), COL_GHG: GHG_LABEL, - COL_UNIT: _unit_label(dollar_year), + COL_UNIT: _unit_label(dollar_year, purchaser_price=purchaser_price), COL_WITHOUT: float(without[code]), COL_MARGINS: float(margins[code]), COL_WITH: float(with_margins[code]), @@ -95,16 +98,18 @@ def finalize_cornerstone_ef_table(table: pd.DataFrame) -> pd.DataFrame: return out.reset_index(drop=True) -def build_purchaser_matrices(*, dollar_year: int) -> tuple[pd.DataFrame, pd.DataFrame]: - """Return M and N at purchaser price in ``dollar_year`` (raw sector labels).""" +def build_purchaser_matrices( + *, dollar_year: int, purchaser_price: bool = True +) -> tuple[pd.DataFrame, pd.DataFrame]: + """Return M and N adjusted to ``dollar_year`` (raw sector labels).""" m_pur = adjust_publish_matrix( get_M(), dollar_year=dollar_year, - purchaser_price=True, + purchaser_price=purchaser_price, ) n_pur = adjust_publish_matrix( get_N(), dollar_year=dollar_year, - purchaser_price=True, + purchaser_price=purchaser_price, ) return m_pur, n_pur diff --git a/bedrock/publish/emission_factors/writer.py b/bedrock/publish/emission_factors/writer.py index b17addef..e23baa73 100644 --- a/bedrock/publish/emission_factors/writer.py +++ b/bedrock/publish/emission_factors/writer.py @@ -20,6 +20,7 @@ def write_emission_factors( *, config_name: str, dollar_year: int, + purchaser_price: bool = True, write_matrices: bool = False, ) -> dict[str, str]: """Write CO2e SEF CSV (and optional M/N purchaser matrices) under ``output_dir``.""" @@ -27,7 +28,10 @@ def write_emission_factors( os.makedirs(output_dir, exist_ok=True) table = finalize_cornerstone_ef_table( - build_emission_factor_table(dollar_year=dollar_year) + build_emission_factor_table( + dollar_year=dollar_year, + purchaser_price=purchaser_price, + ) ) co2e_name = f'CornerstoneSupplyChainGHG_CO2e_USD{dollar_year}.csv' co2e_path = os.path.join(output_dir, co2e_name) @@ -44,7 +48,10 @@ def write_emission_factors( if write_matrices: matrices_dir = os.path.join(output_dir, 'matrices') os.makedirs(matrices_dir, exist_ok=True) - m_pur, n_pur = build_purchaser_matrices(dollar_year=dollar_year) + m_pur, n_pur = build_purchaser_matrices( + dollar_year=dollar_year, + purchaser_price=purchaser_price, + ) m_path = os.path.join(matrices_dir, f'M_pur_{dollar_year}.csv') n_path = os.path.join(matrices_dir, f'N_pur_{dollar_year}.csv') apply_loc_suffix(m_pur).to_csv(m_path) diff --git a/bedrock/publish/excel/writer.py b/bedrock/publish/excel/writer.py index 897054b2..2896164a 100644 --- a/bedrock/publish/excel/writer.py +++ b/bedrock/publish/excel/writer.py @@ -83,6 +83,7 @@ get_Mdom, get_N, get_Ndom, + get_Phi, get_q, get_U, get_Udom, @@ -294,10 +295,8 @@ def _build_matrix_registry(config_name: str) -> list[SheetSpec]: SheetSpec('N', get_N), SheetSpec('N_d', get_Ndom), SheetSpec('N_m', lambda: None), # requires B_imp - # Rho, Phi, Tau are useeior valuation-adjustment matrices with no - # direct bedrock analogue. Leave as TODO until a design call is made. SheetSpec('Rho', lambda: None), - SheetSpec('Phi', lambda: None), + SheetSpec('Phi', get_Phi), SheetSpec('Tau', lambda: None), # --- outputs (useeior writes these after the matrices block) --- SheetSpec('q', get_q), diff --git a/bedrock/publish/model_objects.py b/bedrock/publish/model_objects.py index 1f9ccc1a..a0f52e1d 100644 --- a/bedrock/publish/model_objects.py +++ b/bedrock/publish/model_objects.py @@ -208,7 +208,28 @@ def get_q() -> pd.Series: return pd.Series(derive_Aq_usa().scaled_q, name='q') -_CACHED_GETTERS: tuple[Callable[[], pd.DataFrame | pd.Series], ...] = ( +@functools.cache +def get_Phi() -> pd.DataFrame | None: + """Producer-to-purchaser ratio per sector for ``model_base_year`` (Excel ``Phi`` sheet).""" + from bedrock.transform.iot.derive_PRO_to_PUR_ratio import ( + margins_phi_active, + phi_for_sectors, + ) + + if not margins_phi_active(): + return None + cfg = get_usa_config() + sectors = get_N().columns + phi = phi_for_sectors(sectors) + out = pd.DataFrame( + {str(cfg.model_base_year): phi.astype(float).values}, + index=phi.index, + ) + out.index.name = 'sector' + return out + + +_CACHED_GETTERS: tuple[Callable[[], pd.DataFrame | pd.Series | None], ...] = ( get_V, get_U, get_Udom, @@ -224,6 +245,7 @@ def get_q() -> pd.Series: get_Mdom, get_N, get_Ndom, + get_Phi, get_q, ) diff --git a/bedrock/transform/iot/__tests__/test_phi_helpers.py b/bedrock/transform/iot/__tests__/test_phi_helpers.py new file mode 100644 index 00000000..71f29c46 --- /dev/null +++ b/bedrock/transform/iot/__tests__/test_phi_helpers.py @@ -0,0 +1,48 @@ +"""Tests for PRO:PUR (Phi) helper functions.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +import pandas as pd + +from bedrock.transform.iot.derive_PRO_to_PUR_ratio import ( + apply_phi_to_ef_vector, + margins_phi_active, + phi_for_sectors, +) +from bedrock.utils.config.usa_config import USAConfig + + +class TestMarginsPhiActive: + def test_active_when_useeio_margins(self) -> None: + cfg = USAConfig(useeio_margins=True) + assert margins_phi_active(cfg) is True + + def test_inactive_when_no_margins_flag(self) -> None: + cfg = USAConfig() + assert margins_phi_active(cfg) is False + + +@patch( + 'bedrock.transform.iot.derive_PRO_to_PUR_ratio.margins_phi_active', + return_value=True, +) +@patch('bedrock.transform.iot.derive_PRO_to_PUR_ratio.derive_phi_cornerstone_usa') +def test_apply_phi_to_ef_vector(mock_phi: MagicMock, _mock_active: MagicMock) -> None: + mock_phi.return_value = pd.Series({'1111A0': 0.5, '221100': 0.8}) + ef = pd.Series({'1111A0': 10.0, '221100': 20.0, '311111': 2.0}) + got = apply_phi_to_ef_vector(ef) + assert got['1111A0'] == 5.0 + assert got['221100'] == 16.0 + assert got['311111'] == 2.0 + + +@patch( + 'bedrock.transform.iot.derive_PRO_to_PUR_ratio.margins_phi_active', + return_value=False, +) +def test_phi_for_sectors_identity_when_inactive(_mock_active: MagicMock) -> None: + idx = pd.Index(['1111A0', '221100'], name='sector') + got = phi_for_sectors(idx) + pd.testing.assert_series_equal(got, pd.Series(1.0, index=idx, dtype=float)) diff --git a/bedrock/transform/iot/derive_PRO_to_PUR_ratio.py b/bedrock/transform/iot/derive_PRO_to_PUR_ratio.py index a0492c98..a8150726 100644 --- a/bedrock/transform/iot/derive_PRO_to_PUR_ratio.py +++ b/bedrock/transform/iot/derive_PRO_to_PUR_ratio.py @@ -19,7 +19,7 @@ from bedrock.extract.iot.io_2017 import load_2017_margins_usa from bedrock.transform.eeio.derived_2017_helpers import EXPANDED_SECTORS_2012_TO_2017 -from bedrock.utils.config.usa_config import get_usa_config +from bedrock.utils.config.usa_config import USAConfig, get_usa_config from bedrock.utils.economic.inflation_helpers_cornerstone import ( get_sector_commodity_price_ratio, get_vnorm_adjusted_commodity_price_ratio, @@ -305,3 +305,21 @@ def derive_phi_cornerstone_usa() -> pd.Series: return (margins["Producers' Value"] / margins["Purchasers' Value"]).replace( [np.inf, -np.inf, np.nan], 1.0 ) + + +def margins_phi_active(cfg: USAConfig | None = None) -> bool: + """Return whether margins-based Phi should be applied for *cfg*.""" + c = cfg or get_usa_config() + return bool(c.useeio_margins or c.cornerstone_industry_avg_margins) + + +def phi_for_sectors(sector_index: pd.Index) -> pd.Series[float]: + """Phi aligned to *sector_index*; identity when margins methodology is inactive.""" + if not margins_phi_active(): + return pd.Series(1.0, index=sector_index, dtype=float) + return derive_phi_cornerstone_usa().reindex(sector_index, fill_value=1.0) + + +def apply_phi_to_ef_vector(ef: pd.Series[float]) -> pd.Series[float]: + """Convert producer-price EFs to purchaser price via sector Phi.""" + return ef * phi_for_sectors(ef.index) diff --git a/bedrock/utils/validation/analysis/README.md b/bedrock/utils/validation/analysis/README.md index 2a87be96..f6ecbb30 100644 --- a/bedrock/utils/validation/analysis/README.md +++ b/bedrock/utils/validation/analysis/README.md @@ -59,7 +59,9 @@ back, caches them locally as parquet, and renders analysis figures. `pinned_useeio_baseline` column into `D_and_diffs_merged` / `N_and_diffs_merged` / `config_summary_merged`, sourced from the first input run's - `D_old_inflated` / `N_old_inflated` and pin metadata. This lets a + `D_old_inflated` and `N_old_purchaser` (falling back to + `N_old_inflated`) and pin metadata. Per-config N columns prefer + `N_new_purchaser`, then `N_new_inflated`, then `N_new`. This lets a USEEIO-rebuild combo's net-diff show `run.D_new − pinned_baseline` instead of the default self vs self. Combos that don't opt in (e.g. the v0.2 release-vs-release setup) keep their original output schema @@ -101,8 +103,10 @@ uv run python -m bedrock.utils.validation.analysis.combine_ef_diagnostics \ ``` `--combo` picks a `ComboSpec` from `combinations.COMBINATIONS`. Merged -`D_and_diffs` / `N_and_diffs` tabs use each run's `D_new_inflated` / -`N_new_inflated` column when present, otherwise `D_new` / `N_new`. The local +`D_and_diffs` / `N_and_diffs` tabs use each run's `D_new_inflated` when +present, otherwise `D_new`. N columns prefer `N_new_purchaser` (when the +producer emitted purchaser-price columns), then `N_new_inflated`, then +`N_new`. The local workbook is always written, defaulting to `analysis/output//ef_diagnostics_merged.xlsx`; pass `--output-xlsx ` to override or `--output-xlsx ""` to skip. The Google Sheets push diff --git a/bedrock/utils/validation/analysis/combinations.py b/bedrock/utils/validation/analysis/combinations.py index b8583585..29f6a965 100644 --- a/bedrock/utils/validation/analysis/combinations.py +++ b/bedrock/utils/validation/analysis/combinations.py @@ -62,8 +62,9 @@ class ComboSpec: # ``useeio_phoebe_23`` is the Bedrock attempt to rebuild the pinned # USEEIO model and is itself imperfect, so its net-diff column # subtracts the synthetic ``pinned_useeio_baseline`` (sourced from - # this Sheet's ``D_old_inflated`` / ``N_old_inflated``, i.e. the - # pinned USEEIO Excel baseline). Every restoration step then + # this Sheet's ``D_old_inflated`` and ``N_old_purchaser``, i.e. + # the pinned USEEIO Excel baseline at purchaser price). Every + # restoration step then # compares against the rebuild so the chain of divergence reads # naturally. 'useeio_phoebe_23': 'pinned_useeio_baseline', diff --git a/bedrock/utils/validation/analysis/combine_ef_diagnostics.py b/bedrock/utils/validation/analysis/combine_ef_diagnostics.py index b5a1657c..53f349fb 100644 --- a/bedrock/utils/validation/analysis/combine_ef_diagnostics.py +++ b/bedrock/utils/validation/analysis/combine_ef_diagnostics.py @@ -25,8 +25,11 @@ an extra ``pinned_useeio_baseline`` column sourced from that run's ``D_old_inflated`` is inserted before the per-config columns. 2) ``N_and_diffs_merged`` -- ``sector``, ``sector_name``, one column per - ``config_name`` containing ``N_new_inflated`` when present, otherwise - ``N_new``. Mirrors (1) for ``pinned_useeio_baseline``. + ``config_name`` containing ``N_new_purchaser`` when present (USEEIO + Excel-baseline runs with active margins), otherwise ``N_new_inflated``, + otherwise ``N_new``. When ``pinned_useeio_baseline`` is included, + that column uses ``N_old_purchaser`` when present, otherwise + ``N_old_inflated``. 3) ``D_net_diff`` / ``N_net_diff`` -- per-config net differences computed as ``config - target_config`` using the combo's target mapping. 4) ``totals`` -- one USA-level row per ``config_name``. Source depends @@ -107,8 +110,9 @@ # Synthetic baseline column, injected into merged_d / merged_n / # config_summary_merged ONLY when a combo's target_mapping references it. -# Values come from the FIRST input run's ``D_old_inflated`` / ``N_old_inflated`` -# columns -- i.e. whatever pinned baseline that run was compared against in +# Values come from the FIRST input run's ``D_old_inflated`` and +# ``N_old_purchaser`` (falling back to ``N_old_inflated``) -- i.e. whatever +# pinned baseline that run was compared against in # its own diagnostics. Only meaningful when that first run is a USEEIO # comparison (see USEEIO_EXCEL_BASELINE_SOURCE above): then the column # represents the pinned USEEIO Excel baseline and its pin metadata @@ -237,24 +241,24 @@ def _resolve_metric_column( label: str, sheet_id: str, preferred_col: str, - fallback_col: str, + fallback_cols: tuple[str, ...], ) -> str: - """Return ``preferred_col`` when present on the tab, else ``fallback_col``.""" - if preferred_col in df.columns: - return preferred_col - if fallback_col in df.columns: - logger.info( - "Tab '%s' in Sheet '%s' (%s): no `%s`; using `%s`.", - tab, - label, - sheet_id, - preferred_col, - fallback_col, - ) - return fallback_col + """Return the first matching column from ``preferred_col`` then *fallback_cols*.""" + for col in (preferred_col, *fallback_cols): + if col in df.columns: + if col != preferred_col: + logger.info( + "Tab '%s' in Sheet '%s' (%s): no `%s`; using `%s`.", + tab, + label, + sheet_id, + preferred_col, + col, + ) + return col raise ValueError( f"Tab '{tab}' in Sheet '{label}' ({sheet_id}) must have `{preferred_col}` " - f'or `{fallback_col}`. Found: {list(df.columns)}' + f'or one of {fallback_cols!r}. Found: {list(df.columns)}' ) @@ -264,7 +268,7 @@ def _read_sector_and_values( tab: str, *, preferred_metric_col: str, - fallback_metric_col: str | None = None, + fallback_metric_cols: tuple[str, ...] = (), refresh: bool = False, ) -> tuple[pd.DataFrame, pd.Series]: """Read one ``D_and_diffs`` / ``N_and_diffs`` tab and return ``(sector_meta, values)``.""" @@ -274,7 +278,7 @@ def _read_sector_and_values( f"Tab '{tab}' in Sheet '{label}' ({sheet_id}) must have a `sector_name` " f'column. Found: {list(df.columns)}' ) - if fallback_metric_col is None: + if not fallback_metric_cols: if preferred_metric_col not in df.columns: raise ValueError( f"Tab '{tab}' in Sheet '{label}' ({sheet_id}) must have a " @@ -288,7 +292,7 @@ def _read_sector_and_values( label=label, sheet_id=sheet_id, preferred_col=preferred_metric_col, - fallback_col=fallback_metric_col, + fallback_cols=fallback_metric_cols, ) sector_code_col = _find_sector_code_column(df, tab=tab) @@ -479,7 +483,7 @@ def merge_sheets( # Opt-in synthetic baseline column. Gated by the combo's target_mapping # referencing PINNED_USEEIO_BASELINE_COLUMN, which is only a sensible # thing to do when the first input run is a USEEIO Excel-baseline - # comparison (its D_old_inflated / N_old_inflated then reflect the + # comparison (its D_old_inflated and N_old_purchaser then reflect the # pinned baseline, and the pin metadata in its config_summary is what # feeds the pinned_useeio_baseline column of config_summary_merged). # Combos that don't reference it (e.g. v0.2-style release-vs-release @@ -495,8 +499,9 @@ def merge_sheets( 'target_mapping references %r but the first input run %r is not ' 'a USEEIO Excel-baseline comparison ' '(diagnostics_baseline_source != %r). The pinned_useeio_baseline ' - "column will carry that run's D_old_inflated / N_old_inflated " - 'values but its pin metadata will be empty.', + "column will carry that run's D_old_inflated and N_old_purchaser " + '(or N_old_inflated fallback) values but its pin metadata will ' + 'be empty.', PINNED_USEEIO_BASELINE_COLUMN, sheet_inputs_in_order[0][1], USEEIO_EXCEL_BASELINE_SOURCE, @@ -513,7 +518,8 @@ def merge_sheets( sheet_id=first_sheet_id, label=first_label, tab='N_and_diffs', - preferred_metric_col='N_old_inflated', + preferred_metric_col='N_old_purchaser', + fallback_metric_cols=('N_old_inflated',), refresh=refresh, ) @@ -523,15 +529,15 @@ def merge_sheets( label=label, tab='D_and_diffs', preferred_metric_col='D_new_inflated', - fallback_metric_col='D_new', + fallback_metric_cols=('D_new',), refresh=refresh, ) _, n_new = _read_sector_and_values( sheet_id=sheet_id, label=label, tab='N_and_diffs', - preferred_metric_col='N_new_inflated', - fallback_metric_col='N_new', + preferred_metric_col='N_new_purchaser', + fallback_metric_cols=('N_new_inflated', 'N_new'), refresh=refresh, ) diff --git a/bedrock/utils/validation/calculate_ef_diagnostics.py b/bedrock/utils/validation/calculate_ef_diagnostics.py index 29fa7376..dd18c8ba 100644 --- a/bedrock/utils/validation/calculate_ef_diagnostics.py +++ b/bedrock/utils/validation/calculate_ef_diagnostics.py @@ -60,6 +60,47 @@ def _merge_ef_new_inflated_into_comparison( return out[ordered] +def _merge_ef_new_purchaser_into_comparison( + comparison: pd.DataFrame, + purchaser_new: pd.DataFrame, + *, + ef_name: str, + purchaser_old: pd.DataFrame | None = None, +) -> pd.DataFrame: + """Insert purchaser columns; ``{ef}_perc_diff`` uses new vs old purchaser price.""" + new_col = f'{ef_name}_new' + inflated_col = f'{ef_name}_new_inflated' + old_inflated_col = f'{ef_name}_old_inflated' + purchaser_new_col = f'{ef_name}_new_purchaser' + purchaser_old_col = f'{ef_name}_old_purchaser' + perc_col = f'{ef_name}_perc_diff' + + new_ser = _ef_vector_as_series(purchaser_new).reindex(comparison.index) + if purchaser_old is not None: + old_ser = _ef_vector_as_series(purchaser_old).reindex(comparison.index) + else: + old_ser = ta.cast('pd.Series[float]', comparison[old_inflated_col]) + + out = comparison.copy() + out[purchaser_new_col] = new_ser + if purchaser_old is not None: + out[purchaser_old_col] = old_ser + out[perc_col] = _vector_perc_diff(new_ser, old_ser) + + cols = comparison.columns.tolist() + if inflated_col in cols: + insert_at = cols.index(inflated_col) + else: + insert_at = cols.index(new_col) + ordered = cols[:insert_at] + [purchaser_new_col] + cols[insert_at:] + if purchaser_old is not None: + old_insert_at = ordered.index(old_inflated_col) + ordered = ( + ordered[:old_insert_at] + [purchaser_old_col] + ordered[old_insert_at:] + ) + return out[ordered] + + def _add_comparison_type_column( df: pd.DataFrame, mapped_sectors: ta.Dict[str, str], @@ -189,6 +230,20 @@ def calculate_ef_diagnostics(sheet_id: str) -> None: time.time() - t0, ) + if efs.N_new_purchaser is not None: + t0 = time.time() + N_comparison = _merge_ef_new_purchaser_into_comparison( + N_comparison, + efs.N_new_purchaser, + ef_name='N', + purchaser_old=efs.N_old_purchaser, + ) + logger.info( + '[TIMING] attach N_new_purchaser/N_old_purchaser columns to N_and_diffs ' + 'in %.1fs', + time.time() - t0, + ) + t0 = time.time() update_sheet_tab( sheet_id, diff --git a/bedrock/utils/validation/diagnostics_helpers.py b/bedrock/utils/validation/diagnostics_helpers.py index d794c233..58f09c8a 100644 --- a/bedrock/utils/validation/diagnostics_helpers.py +++ b/bedrock/utils/validation/diagnostics_helpers.py @@ -80,11 +80,28 @@ class EfsForDiagnostics(BaseModel): N_old: OldEfSet D_new_inflated: ta.Optional[pd.DataFrame] = None N_new_inflated: ta.Optional[pd.DataFrame] = None + N_new_purchaser: ta.Optional[pd.DataFrame] = None + N_old_purchaser: ta.Optional[pd.DataFrame] = None class Config: arbitrary_types_allowed = True +def n_purchaser_adjustment_eligibility(cfg: USAConfig) -> tuple[bool, str]: + """Whether purchaser-price N columns should be computed for USEEIO baseline comparisons. + + When eligible, both ``N_new_purchaser`` and ``N_old_purchaser`` are emitted so + ``N_perc_diff`` compares purchaser vs purchaser. + """ + if cfg.diagnostics_baseline_source != 'gcs_useeio_xlsx': + return (False, 'diagnostics_baseline_source is not gcs_useeio_xlsx') + from bedrock.transform.iot.derive_PRO_to_PUR_ratio import margins_phi_active + + if not margins_phi_active(cfg): + return (False, 'no active margins methodology flag (useeio or cornerstone avg)') + return (True, '') + + def d_n_new_inflated_eligibility(cfg: USAConfig) -> tuple[bool, str]: """Whether ``D_new_inflated`` / ``N_new_inflated`` should be computed for *cfg*. @@ -327,6 +344,16 @@ def align_efs_across_schemas( if efs.N_new_inflated is not None else None ), + N_new_purchaser=( + _reindex_and_fill(efs.N_new_purchaser, full_index, new_fill) + if efs.N_new_purchaser is not None + else None + ), + N_old_purchaser=( + _reindex_and_fill(efs.N_old_purchaser, full_index, old_fill) + if efs.N_old_purchaser is not None + else None + ), ) return aligned_efs, active_mappings @@ -645,6 +672,27 @@ def pull_efs_for_diagnostics() -> EfsForDiagnostics: ) logger.info(f'[TIMING] Inflation adjustment completed in {time.time() - t0:.1f}s') + n_new_purchaser: pd.DataFrame | None = None + n_old_purchaser: pd.DataFrame | None = None + emit_purchaser, purchaser_skip_reason = n_purchaser_adjustment_eligibility(config) + if emit_purchaser: + from bedrock.transform.iot.derive_PRO_to_PUR_ratio import apply_phi_to_ef_vector + + if n_new_inflated is not None: + n_base = ta.cast('pd.Series[float]', n_new_inflated.squeeze()) + else: + n_base = ta.cast('pd.Series[float]', N_new.squeeze()) + n_new_purchaser = apply_phi_to_ef_vector(n_base).to_frame() + n_old_purchaser = apply_phi_to_ef_vector( + ta.cast('pd.Series[float]', N_old_inflated.squeeze()) + ).to_frame() + logger.info( + '[TIMING] N_new_purchaser and N_old_purchaser (Phi on new/old N) ' + 'computed for USEEIO baseline' + ) + else: + logger.info('Skipping N_new_purchaser: %s', purchaser_skip_reason) + return EfsForDiagnostics( D_new=D_new.to_frame(), N_new=N_new.to_frame(), @@ -658,6 +706,8 @@ def pull_efs_for_diagnostics() -> EfsForDiagnostics: ), D_new_inflated=d_new_inflated, N_new_inflated=n_new_inflated, + N_new_purchaser=n_new_purchaser, + N_old_purchaser=n_old_purchaser, ) diff --git a/bedrock/utils/validation/generate_diagnostics.py b/bedrock/utils/validation/generate_diagnostics.py index 532dd70f..9ec67f47 100644 --- a/bedrock/utils/validation/generate_diagnostics.py +++ b/bedrock/utils/validation/generate_diagnostics.py @@ -133,6 +133,11 @@ def generate_diagnostics( ) # Run metadata only — diagnostics_baseline_source and useeio_* already appear # once in config_df from USAConfig.to_dataframe(). + from bedrock.utils.validation.diagnostics_helpers import ( + n_purchaser_adjustment_eligibility, + ) + + purchaser_applied, _ = n_purchaser_adjustment_eligibility(cfg) git_rows: list[dict[str, str]] = [ {'config_field': 'git_commit', 'value': GIT_HASH_LONG or 'unknown'}, { @@ -141,6 +146,10 @@ def generate_diagnostics( }, {'config_field': 'git_pr_url', 'value': pr_url or GIT_PR_URL or 'N/A'}, {'config_field': 'baseline_snapshot_key_used', 'value': baseline_snap}, + { + 'config_field': 'purchaser_price_adjustment_applied', + 'value': str(purchaser_applied), + }, ] git_metadata = pd.DataFrame(git_rows) config_df = pd.concat([git_metadata, config_df], ignore_index=True)