Skip to content

feat: Support configurable aggregations setup#582

Open
SandeepTuniki wants to merge 17 commits into
masterfrom
config-options-for-aggregations
Open

feat: Support configurable aggregations setup#582
SandeepTuniki wants to merge 17 commits into
masterfrom
config-options-for-aggregations

Conversation

@SandeepTuniki

@SandeepTuniki SandeepTuniki commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

This PR adds support to specify the aggregations to run through a config file.

Notes:

  • All the aggregations can be listed in a single aggregation.yaml file.
  • It supports running aggregations in stages. This helps to order aggregations that have inter-dependencies. By default, every aggregation is considered to have stage: 1. It can overridden.
  • Within a stage, all aggregations are run in parallel.
  • The Workflow triggers the /initiate and /poll APIs to orchestrate with the ingestion helper. We poll the helper through /poll endpoint repeatedly to pass the info back-and-forth until all the stages finish.

- Reorganized aggregation logic into a cohesive 'aggregation' package.
- Implemented AggregationOrchestrator supporting stage-based parallel execution and wildcards.
- Implemented validator utility (CLI & programmatic) validating against schema.json.
- Added new stateless /aggregation/initiate and /poll FastAPI endpoints.
- Retained legacy /run and /status wrappers for backward compatibility.
- Updated spanner-ingestion-workflow.yaml to use the state-passing loop.
- Added comprehensive unit and integration test suites (25 tests total, 100% passing).
@codacy-production

codacy-production Bot commented Jun 24, 2026

Copy link
Copy Markdown

Not up to standards ⛔

🔴 Issues 1 high · 4 medium · 81 minor

Alerts:
⚠ 86 issues (≤ 0 issues of at least minor severity)

Results:
86 new issues

Category Results
UnusedCode 1 medium
Documentation 41 minor
ErrorProne 1 high
CodeStyle 40 minor
Complexity 3 medium

View in Codacy

🟢 Metrics 104 complexity

Metric Results
Complexity 104

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the Data Commons aggregation workflow by introducing a stateless, stage-based AggregationOrchestrator and a YAML configuration validator. It adds a JSON schema for validation, updates the FastAPI endpoints to support stage-based initiation and polling, and updates the Spanner ingestion workflow to use this new polling mechanism. The review feedback highlights several critical improvements: resolving potential state transition bugs in the polling endpoint by explicitly checking for completed jobs, replacing hardcoded stage limits (such as capping stages at 10) with dynamic configuration-based limits, and avoiding global side effects by moving module-level root logger configurations into the CLI entry point.

Comment thread pipeline/workflow/ingestion-helper/routes/aggregation.py
Comment thread pipeline/workflow/ingestion-helper/routes/aggregation.py Outdated
Comment thread pipeline/workflow/ingestion-helper/routes/aggregation.py Outdated
Comment thread pipeline/workflow/ingestion-helper/routes/aggregation.py Outdated
Comment thread pipeline/workflow/ingestion-helper/aggregation/validator.py Outdated
Comment thread pipeline/workflow/ingestion-helper/aggregation/validator.py
Comment thread pipeline/workflow/ingestion-helper/aggregation/orchestrator.py Outdated
…mpty job list

- Fixed a bug where a PENDING BigQuery job could cause premature transition to the next stage.
- Changed the transition check to strictly require a DONE status.
- Added short-circuiting for empty active job lists.
- Added a new integration test 'test_aggregation_poll_still_running' to verify PENDING handling.
- Replaced all hardcoded upper limits of stage '10' in API routes and legacy wrappers.
- Dynamically calculate the maximum stage from the loaded aggregations config.
- Updated integration tests in app_test.py to mock the aggregations list for correct stage limit evaluation.
… side effects

- Removed module-level logging.basicConfig call from validator.py.
- Added logging.basicConfig inside validator's CLI main() function to ensure logging is configured only during standalone script execution.
- Removed global logging.getLogger().setLevel(logging.INFO) call from orchestrator.py to prevent altering root logger levels on module import.
@SandeepTuniki

Copy link
Copy Markdown
Contributor Author

/gemini review

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new stateless, stage-based aggregation orchestration mechanism for Data Commons ingestion, replacing the legacy sequential execution with a more robust multi-stage workflow. It adds a YAML-based configuration, a JSON schema validator, and new API endpoints (/initiate and /poll) in the ingestion helper, while updating the GCP Cloud Workflow to support this polling model. Feedback on the changes highlights two key issues: first, the 'entity' aggregation type defined in the schema is not handled in the orchestrator's routing logic, which could lead to silent failures; second, the validator bypasses schema validation by returning early when the 'aggregations' key is missing. Suggestions have been provided to raise a ValueError for unsupported step types and to let jsonschema handle the missing key validation.

Comment thread pipeline/workflow/ingestion-helper/aggregation/orchestrator.py Outdated
Comment thread pipeline/workflow/ingestion-helper/aggregation/validator.py Outdated
…or schema bypass

- Added an else block in orchestrator's execute_stage() that raises ValueError for unsupported/unimplemented step types (like 'entity'), preventing silent failures.
- Removed the early-return validation bypass in validator.py, ensuring that jsonschema strictly validates missing 'aggregations' keys.
- Added unit test 'test_execute_stage_unsupported_type' in orchestrator_test.py to verify the ValueError fail-fast routing.
- Added unit tests 'test_validate_config_missing_aggregations_key' and 'test_validate_config_empty_file' in validator_test.py to cover the validation fixes.
…loop performance issue

- Added a new get_active_stages() helper in orchestrator.py that returns a sorted list of unique active and enabled stage numbers.
- Replaced all sequential while and range loops in routes/aggregation.py with direct list-comprehension jumps using get_active_stages().
- This completely resolves the performance spike and timeout risk when a very large stage number (e.g. 100 million) is configured.
- Added unit test 'test_get_active_stages' in orchestrator_test.py.
- Updated and significantly simplified integration test mocks in app_test.py to mock get_active_stages() instead of aggregations/has_stage.
… and add docstring

- Renamed the generic StateObject class to AggregationWorkflowState to better reflect its purpose in representing the state of the multi-stage aggregation workflow.
- Added a comprehensive, professional docstring to AggregationWorkflowState explaining its role in the stateless polling loop coordinated by Google Cloud Workflows.
- Updated all type annotations, route definitions, and return statements inside routes/aggregation.py.
- Verified that no other files in the workspace referenced the old class name directly, and that all 30 tests continue to pass 100%.
…temporary TODOs

- Reverted all 'Legacy' prefix names on compatibility Pydantic models (e.g. AggregationRequest, AggregationResponse) back to their original names, ensuring perfect backward compatibility for client-side code generators.
- Reverted compatibility route method names back to their original names (run_aggregation and get_aggregation_status).
- Added deprecated=True to the FastAPI route decorators for /run and /status to natively flag them in the OpenAPI/Swagger documentation UI.
- Added clear TODO comments and docstrings advising that these are temporary compatibility components to be removed once all consumers migrate.
- Renamed the test case in app_test.py to test_aggregation_run to match the method name.
- Deleted redundant ASCII box section dividers and decorative headers (e.g. Pydantic Models for the New Stateless API, Router Definition) from routes/aggregation.py.
- Simplified backward compatibility section markers to clean, single-line TODO comments.
- This removes visual noise and aligns the file with clean pythonic commenting best practices.
…d endpoints

- Replaced the global section-level TODO comments in routes/aggregation.py with localized, specific TODO comments.
- Placed an explicit, actionable TODO comment directly above each of the four compatibility models (AggregationRequest, AggregationStatusRequest, etc.).
- Placed an explicit TODO comment directly above the /run and /status route decorators.
- This ensures technical debt is highly visible, actionable, and tied directly to the specific components slated for removal after consumer migration.
- Fixed a PEP 8 E701 violation in orchestrator.py by splitting a single-line 'if' statement into a standard multi-line block.
- Resolved a Ruff F401 unused import warning in app_test.py by removing 'import os'.
- Verified that all 30 tests continue to pass 100% after the style cleanup.
@SandeepTuniki SandeepTuniki marked this pull request as ready for review June 24, 2026 18:26
@SandeepTuniki SandeepTuniki requested a review from vish-cs June 25, 2026 08:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant