From e9f61f4440c2540fb49b5ac7c0dc333a8a73c911 Mon Sep 17 00:00:00 2001 From: "Jason(Zhe-You) Liu" <68415893+jason810496@users.noreply.github.com> Date: Thu, 25 Jun 2026 11:10:45 +0900 Subject: [PATCH 01/29] [v3-3-test] Correct the example config for the coordinators (#68929) (#68940) * Correct the example and docstring for the coordinators * Guard the config.yml coordinators example against drift (cherry picked from commit 4aafb95) --- .../src/airflow/config_templates/config.yml | 7 ++++ .../coordinators/executable/coordinator.py | 7 ++-- .../sdk/coordinators/java/coordinator.py | 9 +++--- .../execution_time/test_coordinator.py | 32 ++++++++++++++++++- 4 files changed, 45 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 29763f422feb7..39423755a6841 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2091,6 +2091,7 @@ sdk: "jdk-17": { "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", "kwargs": { + "jars_root": ["/opt/airflow/java-bundles"], "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", "jvm_args": ["-Xmx1024m"] }, @@ -2099,6 +2100,12 @@ sdk: "worker_container_repository": "apache/airflow", "worker_container_tag": "3.3.0" } + }, + "go-sdk": { + "classpath": "airflow.sdk.coordinators.executable.ExecutableCoordinator", + "kwargs": { + "executables_root": ["/opt/airflow/executable-bundles"] + } } } default: ~ diff --git a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py index ed11f97165d1a..2275628d0b1c8 100644 --- a/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py +++ b/task-sdk/src/airflow/sdk/coordinators/executable/coordinator.py @@ -370,12 +370,11 @@ class ExecutableCoordinator(SubprocessCoordinator): Configuration is taken from the ``[sdk] coordinators`` entry that constructs this instance:: - { - "name": "go", + "go": { "classpath": "airflow.sdk.coordinators.executable.ExecutableCoordinator", "kwargs": { - "executables_root": ["~/airflow/executable-bundles"], - }, + "executables_root": ["~/airflow/executable-bundles"] + } } :param executables_root: A list of directories scanned for executable diff --git a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py index cb74c64dfd48f..da35d0f37b84e 100644 --- a/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py +++ b/task-sdk/src/airflow/sdk/coordinators/java/coordinator.py @@ -174,14 +174,13 @@ class JavaCoordinator(SubprocessCoordinator): Configuration is taken from the ``[sdk] coordinators`` entry that constructs this instance:: - { - "name": "jdk-17", + "jdk-17": { "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", "kwargs": { - "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", - "jvm_args": ["-Xmx1024m"], "jars_root": ["~/airflow/jars"], - }, + "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", + "jvm_args": ["-Xmx1024m"] + } } :param java_executable: Path to the ``java`` command (defaults to diff --git a/task-sdk/tests/task_sdk/execution_time/test_coordinator.py b/task-sdk/tests/task_sdk/execution_time/test_coordinator.py index 475b587bf7f1f..40db5378afccf 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_coordinator.py +++ b/task-sdk/tests/task_sdk/execution_time/test_coordinator.py @@ -22,7 +22,8 @@ import pytest -from airflow.sdk.configuration import conf +from airflow.sdk._shared.module_loading import import_string +from airflow.sdk.configuration import conf, retrieve_configuration_description from airflow.sdk.execution_time.coordinator import ( BaseCoordinator, CoordinatorManager, @@ -199,3 +200,32 @@ def test_extra_for_queue_does_not_instantiate_coordinator(self, sdk_config): "pod_template_file": "/opt/airflow/pod_templates/boom.yaml" } assert manager._created_coordinators == {} + + +class TestConfigYamlCoordinatorsExample: + """Guard the ``[sdk] coordinators`` example in ``config.yml`` against drift. + + Nothing else exercises the example, so a broken one (e.g. dropping the + required ``jars_root`` kwarg) can ship unnoticed. Loading it through + CoordinatorManager and constructing every entry keeps the example honest. + """ + + def test_every_example_coordinator_constructs(self, sdk_config): + description = retrieve_configuration_description() + coordinators_example = description["sdk"]["options"]["coordinators"]["example"] + specs = json.loads(coordinators_example) + assert specs, "config.yml [sdk] coordinators example must not be empty" + + # The example's own queue_to_coordinator illustrates different keys, so + # route every coordinator through a synthetic queue to construct each one. + queue_to_coordinator = {f"queue-{key}": key for key in specs} + sdk_config( + coordinators=coordinators_example, + queue_to_coordinator=json.dumps(queue_to_coordinator), + ) + manager = CoordinatorManager.from_config() + assert set(manager._coordinator_specs) == set(specs) + + for queue, key in queue_to_coordinator.items(): + coordinator = manager.for_queue(queue) + assert isinstance(coordinator, import_string(specs[key]["classpath"])) From cb174a6937aeff42e8bbb8d1a62089e60cfc7071 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 25 Jun 2026 11:08:14 +0200 Subject: [PATCH 02/29] [v3-3-test] Fix race condition on sort param when returning dagRuns (#68842) (#68948) (cherry picked from commit 31f7ba742092432bd5a686ca028f62d55c257184) Co-authored-by: Hasnain Raza <4294680+HasnainRaz@users.noreply.github.com> --- .../airflow/api_fastapi/common/parameters.py | 2 +- .../api_fastapi/common/test_parameters.py | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 17185fe20e251..935b547c1a7cf 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -718,7 +718,7 @@ def dynamic_depends(self, default: str | Sequence[str] | None = None) -> Callabl ) def inner(order_by: list[str] = _order_by_query) -> SortParam: - return self.set_value(order_by) + return SortParam(self.allowed_attrs, self.model, self.to_replace).set_value(order_by) return inner diff --git a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py index d02496ae87d30..4d2f473f4f830 100644 --- a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py +++ b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py @@ -165,6 +165,27 @@ def test_primary_key_is_not_duplicated_when_alias_maps_to_pk(self): resolved = param.get_resolved_columns() assert [name for name, _col, _desc in resolved] == ["import_error_id"] + def test_dynamic_depends_returns_independent_instances(self): + """Each call to the inner closure must produce a separate SortParam instance. + + Two concurrent requests with different order_by values must not share state — + a mutation on one must not affect the other. + """ + sort_param = SortParam(["id", "run_id", "logical_date"], DagRun, {"dag_run_id": "run_id"}) + inner = sort_param.dynamic_depends(default="id") + + instance_a = inner(order_by=["logical_date"]) + instance_b = inner(order_by=["run_id"]) + + assert instance_a is not instance_b + assert instance_a.value == ["logical_date"] + assert instance_b.value == ["run_id"] + # Resolving one must not affect the other. + cols_a = [name for name, _col, _desc in instance_a.get_resolved_columns()] + cols_b = [name for name, _col, _desc in instance_b.get_resolved_columns()] + assert cols_a[0] == "logical_date" + assert cols_b[0] == "run_id" + def _compile(statement): return str(statement.compile(compile_kwargs={"literal_binds": True})).lower() From eda27d177151ad3335cc1294330c23b3d48a5bc0 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 25 Jun 2026 17:46:08 +0100 Subject: [PATCH 03/29] [v3-3-test] Allow missing `api_auth.jwt_secret` for `InProcessExecutionAPI` (#68980) (#68982) PR #68840 fixed an issue where exceptions thrown by the FastAPI lifecycle hook were swallowed. In doing so, it exposed a pre-existing problem where the lifecycle hook couldn't run when the JWT secret was not provided. As the `InProcessExecutionAPI` overrides auth, it doesn't need a JWT secret, and we certainly don't want to start crashing processes that previously ran fine as a result of a missing secret that we don't need. This commit stubs out the registered `JWTValidator` in the FastAPI app created for the `InProcessExecutionAPI`. This is done in an isolated services registry to ensure we don't leak this into any real app instantiations. (cherry picked from commit e886dfd05eee4b8b38efdbd34f2fae5ae49ac6cd) Co-authored-by: Nick Stenning --- .../airflow/api_fastapi/execution_api/app.py | 19 +++++++++++++++---- .../api_fastapi/execution_api/test_app.py | 10 ++++++++++ task-sdk/dev/generate_task_sdk_models.py | 2 -- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py b/airflow-core/src/airflow/api_fastapi/execution_api/app.py index 449019db7998d..5a87cdb81e04c 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py @@ -99,8 +99,11 @@ async def lifespan(app: FastAPI, registry: svcs.Registry): app.state.svcs_registry = registry registry.register_factory(JWTGenerator, _jwt_generator) - # Create an app scoped validator, so that we don't have to fetch it every time - registry.register_value(JWTValidator, _jwt_validator(), ping=JWTValidator.status) + + # InProcessExecutionAPI stubs out JWTValidator: don't re-register in that case. + if JWTValidator not in registry: + # Create an app scoped validator, so that we don't have to fetch it every time + registry.register_value(JWTValidator, _jwt_validator(), ping=JWTValidator.status) yield @@ -282,7 +285,7 @@ def _inject_trace_context_dep(routes, mode: str) -> None: route.dependencies.append(dep) -def create_task_execution_api_app() -> FastAPI: +def create_task_execution_api_app(lifespan: svcs.fastapi.lifespan = lifespan) -> FastAPI: """Create FastAPI app for task execution API.""" from airflow.api_fastapi.execution_api.routes import execution_api_router from airflow.api_fastapi.execution_api.versions import bundle @@ -388,7 +391,15 @@ def app(self): from airflow.api_fastapi.execution_api.routes.xcoms import has_xcom_access from airflow.api_fastapi.execution_api.security import _jwt_bearer - self._app = create_task_execution_api_app() + # Give this app its own lifespan + services registry so that stubbing services + # (e.g. JWTValidator) doesn't affect the module-level ``lifespan.registry``. + registry = svcs.Registry() + private_lifespan = attrs.evolve(lifespan, registry=registry) + self._app = create_task_execution_api_app(lifespan=private_lifespan) + + # In-process callers don't need a real JWTValidator: auth is bypassed below via + # ``dependency_overrides``. + registry.register_value(JWTValidator, None) # Set up dag_bag in app state for dependency injection self._app.state.dag_bag = create_dag_bag() diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py index 32880773ade2e..d4b9ce5ac88d7 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/test_app.py @@ -22,6 +22,7 @@ from unittest import mock from uuid import UUID +import httpx import pytest from fastapi import Request from fastapi.params import Security as SecurityParam @@ -137,6 +138,15 @@ def test_routes_with_task_instance_id_param_enforce_ti_self(client): ) +@conf_vars({("api_auth", "jwt_secret"): None}) +def test_in_process_execution_api_runs_without_jwt_secret(): + """The in-process API must not require ``api_auth/jwt_secret`` to be configured.""" + api = InProcessExecutionAPI() + with httpx.Client(transport=api.transport) as client: + response = client.get("http://localhost/health") + assert response.status_code == 200 + + def test_in_process_execution_api_transport_lifecycle(): """The background loop + thread lifecycle is tied to the ``.transport``, not the factory instance. diff --git a/task-sdk/dev/generate_task_sdk_models.py b/task-sdk/dev/generate_task_sdk_models.py index 74a1cb7f756a8..bb19fffc4497c 100644 --- a/task-sdk/dev/generate_task_sdk_models.py +++ b/task-sdk/dev/generate_task_sdk_models.py @@ -33,8 +33,6 @@ from openapi_spec_validator import validate_spec os.environ["_AIRFLOW__AS_LIBRARY"] = "1" -# Set a placeholder secret to allow the in-process FastAPI app to run its lifecycle hooks. -os.environ.setdefault("AIRFLOW__API_AUTH__JWT_SECRET", "task-sdk-model-generation") AIRFLOW_ROOT_PATH = Path(__file__).parents[2].resolve() AIRFLOW_TASK_SDK_ROOT_PATH = AIRFLOW_ROOT_PATH / "task-sdk" From fce31dd2c952964a2303d2554d93ec28b92be033 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 26 Jun 2026 08:56:13 +0800 Subject: [PATCH 04/29] [v3-3-test] Rename PartitionAtRuntime as PartitionedAtRuntime for consistency (#68978) (#68987) Co-authored-by: Wei Lee --- .../docs/authoring-and-scheduling/assets.rst | 12 ++++----- .../example_dags/example_asset_partition.py | 12 ++++----- .../src/airflow/serialization/encoders.py | 8 +++--- airflow-core/src/airflow/timetables/base.py | 2 +- airflow-core/src/airflow/timetables/simple.py | 6 ++--- .../tests/unit/api/common/test_trigger_dag.py | 12 ++++----- .../core_api/routes/public/test_assets.py | 6 ++--- .../core_api/routes/public/test_dag_run.py | 26 +++++++++---------- .../tests/unit/assets/test_manager.py | 4 +-- .../unit/dag_processing/test_collection.py | 4 +-- .../tests/unit/jobs/test_scheduler_job.py | 6 ++--- airflow-core/tests/unit/models/test_dag.py | 6 ++--- .../tests/unit/models/test_taskinstance.py | 20 +++++++------- .../serialization/test_dag_serialization.py | 10 +++---- task-sdk/docs/api.rst | 2 +- task-sdk/src/airflow/sdk/__init__.py | 6 ++--- task-sdk/src/airflow/sdk/__init__.pyi | 4 +-- task-sdk/src/airflow/sdk/bases/timetable.py | 2 +- .../sdk/definitions/timetables/assets.py | 2 +- .../tests/task_sdk/definitions/test_dag.py | 17 +++++++++--- 20 files changed, 88 insertions(+), 79 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index d039fbfd1ffd3..a74664302da5f 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -781,7 +781,7 @@ semantics (the default). DAG, Asset, FixedKeyMapper, - PartitionAtRuntime, + PartitionedAtRuntime, PartitionedAssetTimetable, RollupMapper, SegmentWindow, @@ -792,7 +792,7 @@ semantics (the default). @asset( uri="file://incoming/player-stats/multi-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), ) def multi_region_player_stats(self, outlet_events): # Emit one event per region in a single run. @@ -836,17 +836,17 @@ Setting partition keys at runtime When the partition key is not known ahead of time (for example, a watermark discovered from the source data, a late-arriving file, or a backfill request), let the producing task decide it while it runs. Schedule the producer with -``PartitionAtRuntime()`` and record the key(s) on the emitted event with +``PartitionedAtRuntime()`` and record the key(s) on the emitted event with ``outlet_events[self].add_partitions(...)``: .. code-block:: python - from airflow.sdk import PartitionAtRuntime, asset + from airflow.sdk import PartitionedAtRuntime, asset @asset( uri="file://incoming/player-stats/live-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), ) def live_region_player_stats(self, outlet_events): # The key is only known once the task runs. @@ -862,7 +862,7 @@ keys collapse to a single event: @asset( uri="file://incoming/player-stats/multi-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), ) def multi_region_player_stats(self, outlet_events): outlet_events[self].add_partitions(["us", "eu", "apac"]) diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py b/airflow-core/src/airflow/example_dags/example_asset_partition.py index a7929ed92581f..f4171bb46f1e7 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_partition.py +++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py @@ -30,8 +30,8 @@ IdentityMapper, MinimumCount, MonthWindow, - PartitionAtRuntime, PartitionedAssetTimetable, + PartitionedAtRuntime, ProductMapper, RollupMapper, SegmentWindow, @@ -160,7 +160,7 @@ def check_partition_alignment(): with DAG( dag_id="ingest_regional_sales", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), tags=["example", "sales", "ingestion"], ): """Produce regional sales data with composite ``region|timestamp`` partition keys at runtime.""" @@ -207,7 +207,7 @@ def aggregate_sales(dag_run=None): with DAG( dag_id="ingest_region_stats", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), tags=["example", "player-stats", "regional"], ): """ @@ -244,14 +244,14 @@ def regional_stats_breakdown(): @asset( uri="file://incoming/player-stats/live-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), tags=["player-stats", "runtime"], ) def live_region_player_stats(self, outlet_events): """ Produce a single region partition whose key is decided at runtime. - This asset demonstrates PartitionAtRuntime, which records the partition key on the + This asset demonstrates PartitionedAtRuntime, which records the partition key on the emitted event with ``add_partitions`` while the task runs rather than from a timetable. """ outlet_events[self].add_partitions("us") @@ -281,7 +281,7 @@ def summarize_live_region(dag_run=None): @asset( uri="file://incoming/player-stats/multi-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), tags=["player-stats", "runtime"], ) def multi_region_player_stats(self, outlet_events): diff --git a/airflow-core/src/airflow/serialization/encoders.py b/airflow-core/src/airflow/serialization/encoders.py index b31b7c828c68c..8ab533b888ae5 100644 --- a/airflow-core/src/airflow/serialization/encoders.py +++ b/airflow-core/src/airflow/serialization/encoders.py @@ -70,8 +70,8 @@ from airflow.sdk.definitions.asset import AssetRef from airflow.sdk.definitions.timetables.assets import ( AssetTriggeredTimetable, - PartitionAtRuntime, PartitionedAssetTimetable, + PartitionedAtRuntime, ) from airflow.sdk.definitions.timetables.simple import ContinuousTimetable, NullTimetable, OnceTimetable from airflow.sdk.definitions.timetables.trigger import CronPartitionTimetable @@ -329,7 +329,7 @@ class _Serializer: MultipleCronTriggerTimetable: "airflow.timetables.trigger.MultipleCronTriggerTimetable", NullTimetable: "airflow.timetables.simple.NullTimetable", OnceTimetable: "airflow.timetables.simple.OnceTimetable", - PartitionAtRuntime: "airflow.timetables.simple.PartitionAtRuntime", + PartitionedAtRuntime: "airflow.timetables.simple.PartitionedAtRuntime", PartitionedAssetTimetable: "airflow.timetables.simple.PartitionedAssetTimetable", } @@ -355,9 +355,9 @@ def serialize_timetable(self, timetable: BaseTimetable | CoreTimetable) -> dict[ @serialize_timetable.register(ContinuousTimetable) @serialize_timetable.register(NullTimetable) @serialize_timetable.register(OnceTimetable) - @serialize_timetable.register(PartitionAtRuntime) + @serialize_timetable.register(PartitionedAtRuntime) def _( - self, timetable: ContinuousTimetable | NullTimetable | OnceTimetable | PartitionAtRuntime + self, timetable: ContinuousTimetable | NullTimetable | OnceTimetable | PartitionedAtRuntime ) -> dict[str, Any]: return {} diff --git a/airflow-core/src/airflow/timetables/base.py b/airflow-core/src/airflow/timetables/base.py index 4bee4ab38a368..84d8aa2e46fee 100644 --- a/airflow-core/src/airflow/timetables/base.py +++ b/airflow-core/src/airflow/timetables/base.py @@ -240,7 +240,7 @@ class Timetable(Protocol): partitioned_at_runtime: bool = False """Whether this timetable defers partition selection to task runtime. - *True* for :class:`~airflow.timetables.simple.PartitionAtRuntime`; + *True* for :class:`~airflow.timetables.simple.PartitionedAtRuntime`; downstream code can branch on this flag instead of using ``isinstance``. """ diff --git a/airflow-core/src/airflow/timetables/simple.py b/airflow-core/src/airflow/timetables/simple.py index a8b499dbe50ba..87741a8eb0d7e 100644 --- a/airflow-core/src/airflow/timetables/simple.py +++ b/airflow-core/src/airflow/timetables/simple.py @@ -186,11 +186,11 @@ def next_dagrun_info( return DagRunInfo.interval(start, end) -class PartitionAtRuntime(NullTimetable): +class PartitionedAtRuntime(NullTimetable): """ Timetable that never schedules anything; partition keys are set at runtime. - This corresponds to ``schedule=PartitionAtRuntime()``. + This corresponds to ``schedule=PartitionedAtRuntime()``. A run's ``partition_key`` (run-level provenance) must be supplied at trigger time — for example via the REST API's ``partition_key`` field. Partition keys @@ -204,7 +204,7 @@ class PartitionAtRuntime(NullTimetable): @property def summary(self) -> str: - return "PartitionAtRuntime" + return "PartitionedAtRuntime" class AssetTriggeredTimetable(_TrivialTimetable): diff --git a/airflow-core/tests/unit/api/common/test_trigger_dag.py b/airflow-core/tests/unit/api/common/test_trigger_dag.py index 6fc48bd081411..e1837426bc188 100644 --- a/airflow-core/tests/unit/api/common/test_trigger_dag.py +++ b/airflow-core/tests/unit/api/common/test_trigger_dag.py @@ -25,7 +25,7 @@ from airflow.exceptions import InvalidPartitionKeyError from airflow.models import DagModel from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.timetables.simple import PartitionAtRuntime +from airflow.timetables.simple import PartitionedAtRuntime from airflow.timetables.trigger import CronPartitionTimetable from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -129,18 +129,18 @@ def test_trigger_dag_raises_invalid_partition_key_for_cron_partition_timetable(d ) -def test_trigger_dag_partition_at_runtime_leaves_partition_date_none(dag_maker, session): - """PartitionAtRuntime Dags accept arbitrary keys; partition_date stays None.""" +def test_trigger_dag_partitioned_at_runtime_leaves_partition_date_none(dag_maker, session): + """PartitionedAtRuntime Dags accept arbitrary keys; partition_date stays None.""" with dag_maker( session=session, - dag_id="TEST_PARTITION_AT_RUNTIME", - schedule=PartitionAtRuntime(), + dag_id="TEST_PARTITIONED_AT_RUNTIME", + schedule=PartitionedAtRuntime(), ): EmptyOperator(task_id="mytask") session.commit() dag_run = trigger_dag( - dag_id="TEST_PARTITION_AT_RUNTIME", + dag_id="TEST_PARTITIONED_AT_RUNTIME", triggered_by=DagRunTriggeredByType.REST_API, partition_key="arbitrary-runtime-key", session=session, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index b65036b72d9b3..33c43785a55a8 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -41,7 +41,7 @@ from airflow.models.serialized_dag import SerializedDagModel from airflow.models.trigger import Trigger from airflow.providers.standard.operators.empty import EmptyOperator -from airflow.timetables.simple import PartitionAtRuntime +from airflow.timetables.simple import PartitionedAtRuntime from airflow.utils.session import provide_session from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType @@ -1519,9 +1519,9 @@ def create_dags(self, setup, dag_maker, session): i: am.to_serialized() for i, am in enumerate(self.create_assets(session=session, num=3), start=1) } # DAG_ASSET1_ID is materialized with a partition_key in several tests below, so it must be a - # partitioned Dag. PartitionAtRuntime accepts runtime-discovered partition keys without + # partitioned Dag. PartitionedAtRuntime accepts runtime-discovered partition keys without # requiring a partitioned timetable. - with dag_maker(self.DAG_ASSET1_ID, schedule=PartitionAtRuntime(), session=session): + with dag_maker(self.DAG_ASSET1_ID, schedule=PartitionedAtRuntime(), session=session): EmptyOperator(task_id="task", outlets=assets[1]) with dag_maker(self.DAG_ASSET2_ID_A, schedule=None, session=session): EmptyOperator(task_id="task", outlets=assets[2]) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index 5b5a0293ffc4b..39a391f80f1b4 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -45,7 +45,7 @@ from airflow.sdk import Asset, Param, result, task from airflow.settings import _configure_async_session from airflow.timetables.interval import CronDataIntervalTimetable -from airflow.timetables.simple import PartitionAtRuntime, PartitionedAssetTimetable +from airflow.timetables.simple import PartitionedAssetTimetable, PartitionedAtRuntime from airflow.timetables.trigger import CronPartitionTimetable from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, State @@ -1619,8 +1619,8 @@ class TestGetDagRunAssetTriggerEvents: def test_should_respond_200(self, partition_key, test_client, dag_maker, session): asset1 = Asset(name="ds1", uri="file:///da1") - # Use PartitionAtRuntime for partitioned cases so the partition_key gate does not reject the key. - source_schedule = PartitionAtRuntime() if partition_key is not None else timedelta(days=1) + # Use PartitionedAtRuntime for partitioned cases so the partition_key gate does not reject the key. + source_schedule = PartitionedAtRuntime() if partition_key is not None else timedelta(days=1) with dag_maker( dag_id="source_dag", start_date=START_DATE1, schedule=source_schedule, session=session ): @@ -1639,7 +1639,7 @@ def test_should_respond_200(self, partition_key, test_client, dag_maker, session ) session.add(event) - trigger_schedule = PartitionAtRuntime() if partition_key is not None else timedelta(days=1) + trigger_schedule = PartitionedAtRuntime() if partition_key is not None else timedelta(days=1) with dag_maker( dag_id="TEST_DAG_ID", start_date=START_DATE1, schedule=trigger_schedule, session=session ): @@ -1650,7 +1650,7 @@ def test_should_respond_200(self, partition_key, test_client, dag_maker, session "partition_key": partition_key, } if partition_key is not None: - # PartitionAtRuntime is a null-timetable with no scheduled runs; supply logical_date=None + # PartitionedAtRuntime is a null-timetable with no scheduled runs; supply logical_date=None # explicitly so dag_maker does not try to infer it via next_dagrun_info (which returns None). create_dagrun_kwargs["logical_date"] = None dr = dag_maker.create_dagrun(**create_dagrun_kwargs) @@ -3503,18 +3503,18 @@ def test_should_respond_200_when_partition_key_given_for_partitioned_dag( assert response.status_code == 200 @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") - def test_should_respond_200_when_partition_key_given_for_partition_at_runtime_dag( + def test_should_respond_200_when_partition_key_given_for_partitioned_at_runtime_dag( self, dag_maker, test_client, session ): - """partition_key on a PartitionAtRuntime Dag must also be accepted (deferred validation). + """partition_key on a PartitionedAtRuntime Dag must also be accepted (deferred validation). partitioned_at_runtime=True means the Dag accepts runtime-discovered partition keys, so the REST layer must not reject it even though timetable.partitioned is False. """ - runtime_dag_id = "test_partition_at_runtime_dag_trigger" + runtime_dag_id = "test_partitioned_at_runtime_dag_trigger" with dag_maker( dag_id=runtime_dag_id, - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), start_date=START_DATE1, session=session, serialized=True, @@ -3638,14 +3638,14 @@ def test_trigger_partitioned_dag_invalid_key_returns_400(self, dag_maker, test_c assert response.status_code == 400 @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") - def test_trigger_partition_at_runtime_dag_leaves_partition_date_none( + def test_trigger_partitioned_at_runtime_dag_leaves_partition_date_none( self, dag_maker, test_client, session ): - """PartitionAtRuntime Dag with an arbitrary key must produce partition_date=None.""" - runtime_dag_id = "test_trigger_partition_at_runtime_none_date" + """PartitionedAtRuntime Dag with an arbitrary key must produce partition_date=None.""" + runtime_dag_id = "test_trigger_partitioned_at_runtime_none_date" with dag_maker( dag_id=runtime_dag_id, - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), start_date=START_DATE1, session=session, serialized=True, diff --git a/airflow-core/tests/unit/assets/test_manager.py b/airflow-core/tests/unit/assets/test_manager.py index 54ddd66a2d496..b788b9ab28699 100644 --- a/airflow-core/tests/unit/assets/test_manager.py +++ b/airflow-core/tests/unit/assets/test_manager.py @@ -494,10 +494,10 @@ def test_queue_partitioned_dags_stamps_rollup_fingerprint(self, session, dag_mak # partition-at-runtime Dag so its run can carry a ``partition_key`` that # the emitted ``AssetEvent`` inherits. from airflow.models.taskinstance import TaskInstance - from airflow.sdk import PartitionAtRuntime + from airflow.sdk import PartitionedAtRuntime with dag_maker( - dag_id="stamp-producer", schedule=PartitionAtRuntime(), session=session + dag_id="stamp-producer", schedule=PartitionedAtRuntime(), session=session ) as producer_dag: from airflow.providers.standard.operators.empty import EmptyOperator diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index fe15045598ff5..ee8d7fa0fcd22 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -64,7 +64,7 @@ from airflow.serialization.definitions.assets import SerializedAsset from airflow.serialization.encoders import encode_trigger, ensure_serialized_asset from airflow.serialization.serialized_objects import LazyDeserializedDAG -from airflow.timetables.simple import PartitionAtRuntime +from airflow.timetables.simple import PartitionedAtRuntime from airflow.triggers.base import BaseEventTrigger from airflow.utils.types import DagRunType @@ -136,7 +136,7 @@ def test_statement_latest_runs_loads_timetable_fields(dag_maker, session): @pytest.mark.db_test def test_statement_latest_runs_partitioned_sorted_by_partition_date(dag_maker, session): - with dag_maker("fake-dag", schedule=PartitionAtRuntime()): + with dag_maker("fake-dag", schedule=PartitionedAtRuntime()): pass dag_maker.sync_dagbag_to_db() for i, (run_id, partition_key, partition_date) in enumerate( diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 3b950f71d5b4c..e2d5977eb86f7 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -132,8 +132,8 @@ from airflow.serialization.serialized_objects import LazyDeserializedDAG from airflow.timetables.base import DagRunInfo, DataInterval, compute_rollup_fingerprint from airflow.timetables.simple import ( - PartitionAtRuntime, PartitionedAssetTimetable as CorePartitionedAssetTimetable, + PartitionedAtRuntime, ) from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.sqlalchemy import with_row_locks @@ -10255,7 +10255,7 @@ def _produce_and_register_asset_event( if expected_partition_key is None: expected_partition_key = partition_key - with dag_maker(dag_id=dag_id, schedule=PartitionAtRuntime(), session=session) as dag: + with dag_maker(dag_id=dag_id, schedule=PartitionedAtRuntime(), session=session) as dag: EmptyOperator(task_id="hi", outlets=[asset]) dr = dag_maker.create_dagrun( @@ -10707,7 +10707,7 @@ def test_consumer_dag_run_partition_date_is_none_when_task_key_diverges( with dag_maker( dag_id="asset-event-producer", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), session=session, ) as dag: EmptyOperator(task_id="hi", outlets=[asset_1]) diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index 090cd2d4756e0..ad4ed2c134d13 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -71,7 +71,7 @@ DAG, BaseOperator, CronPartitionTimetable, - PartitionAtRuntime, + PartitionedAtRuntime, TaskGroup, setup, task as task_decorator, @@ -1559,7 +1559,7 @@ def test_create_dagrun_int_partition_key_rejected_for_partitioned_dag(self, dag_ """DagRun-level type check rejects int partition_key even for partitioned Dags.""" with dag_maker( "test_create_dagrun_partitioned_int_key", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), ): ... with pytest.raises( @@ -1582,7 +1582,7 @@ def test_create_dagrun_int_partition_key_rejected_for_partitioned_dag(self, dag_ ("my-key", None, True), (None, None, False), ("my-key", CronPartitionTimetable("@daily", timezone="UTC"), False), - ("my-key", PartitionAtRuntime(), False), + ("my-key", PartitionedAtRuntime(), False), ], ) def test_serialized_dag_validate_partition_key( diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 97ec0242235b4..89ac6b12048e3 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -106,7 +106,7 @@ from airflow.ti_deps.deps.base_ti_dep import TIDepStatus from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep, _UpstreamTIStates -from airflow.timetables.simple import PartitionAtRuntime +from airflow.timetables.simple import PartitionedAtRuntime from airflow.utils.session import create_session, provide_session from airflow.utils.span_status import SpanStatus from airflow.utils.state import DagRunState, State, TaskInstanceState @@ -3568,7 +3568,7 @@ def test_register_asset_changes_in_db_no_outlets_is_a_noop(dag_maker, session): def test_when_dag_run_has_partition_then_asset_does(dag_maker, session): asset = Asset(name="hello") - with dag_maker(dag_id="asset_event_tester", schedule=PartitionAtRuntime()) as dag: + with dag_maker(dag_id="asset_event_tester", schedule=PartitionedAtRuntime()) as dag: EmptyOperator(task_id="hi", outlets=[asset]) dr = dag_maker.create_dagrun(partition_key="abc123", session=session) assert dr.partition_key == "abc123" @@ -3591,7 +3591,7 @@ def test_when_dag_run_has_partition_and_downstreams_listening_then_tables_popula session, ): asset = Asset(name="hello") - with dag_maker(dag_id="asset_event_tester", schedule=PartitionAtRuntime(), session=session) as dag: + with dag_maker(dag_id="asset_event_tester", schedule=PartitionedAtRuntime(), session=session) as dag: EmptyOperator(task_id="hi", outlets=[asset]) dag1_id = dag.dag_id dr = dag_maker.create_dagrun(partition_key="abc123", session=session) @@ -3657,7 +3657,7 @@ def test_runtime_partition_key_does_not_backfill_dag_run_when_none(dag_maker, se def test_runtime_partition_key_does_not_overwrite_scheduler_partition(dag_maker, session): """Task-emitted key lands on the AssetEvent but does NOT overwrite a scheduler-set DagRun.partition_key.""" asset = Asset(name="hello") - with dag_maker(dag_id="rt_pk_no_overwrite", schedule=PartitionAtRuntime()) as dag: + with dag_maker(dag_id="rt_pk_no_overwrite", schedule=PartitionedAtRuntime()) as dag: EmptyOperator(task_id="hi", outlets=[asset]) dr = dag_maker.create_dagrun(partition_key="scheduler-key", session=session) [ti] = dr.get_task_instances(session=session) @@ -3708,7 +3708,7 @@ def test_runtime_partition_key_inherits_dag_run_key_when_event_has_no_key(dag_ma can still be routed correctly. """ asset = Asset(name="hello") - with dag_maker(dag_id="rt_pk_none", schedule=PartitionAtRuntime()) as dag: + with dag_maker(dag_id="rt_pk_none", schedule=PartitionedAtRuntime()) as dag: EmptyOperator(task_id="hi", outlets=[asset]) dr = dag_maker.create_dagrun(partition_key="from-run", session=session) [ti] = dr.get_task_instances(session=session) @@ -3728,7 +3728,7 @@ def test_runtime_partition_key_inherits_dag_run_key_when_event_has_no_key(dag_ma def test_runtime_partition_key_mixed_events_for_same_asset(dag_maker, session): """One event with an explicit key + one without produce two AssetEvents (explicit key + inherited run key).""" asset = Asset(name="hello") - with dag_maker(dag_id="rt_pk_mixed", schedule=PartitionAtRuntime()) as dag: + with dag_maker(dag_id="rt_pk_mixed", schedule=PartitionedAtRuntime()) as dag: EmptyOperator(task_id="hi", outlets=[asset]) dr = dag_maker.create_dagrun(partition_key="from-run", session=session) [ti] = dr.get_task_instances(session=session) @@ -3773,17 +3773,17 @@ def test_runtime_partition_key_event_stays_none_when_no_key_and_no_run_key(dag_m assert event.partition_key is None -def test_runtime_partition_key_does_not_backfill_partition_at_runtime_run(dag_maker, session): - """Task-emitted key lands on AssetEvent but does NOT back-fill DagRun.partition_key on a PartitionAtRuntime run. +def test_runtime_partition_key_does_not_backfill_partitioned_at_runtime_run(dag_maker, session): + """Task-emitted key lands on AssetEvent but does NOT back-fill DagRun.partition_key on a PartitionedAtRuntime run. Provenance contract: DagRun.partition_key is set only at run-creation/trigger time. - A PartitionAtRuntime Dag triggered via REST without a partition_key starts with + A PartitionedAtRuntime Dag triggered via REST without a partition_key starts with partition_key=None. Even when a task discovers a partition at runtime and emits an outlet event carrying an explicit key, DagRun.partition_key must remain None — the key belongs to the AssetEvent, not to the run's provenance. """ asset = Asset(name="hello") - with dag_maker(dag_id="rt_pk_par_backfill", schedule=PartitionAtRuntime()) as dag: + with dag_maker(dag_id="rt_pk_par_backfill", schedule=PartitionedAtRuntime()) as dag: EmptyOperator(task_id="hi", outlets=[asset]) dr = dag_maker.create_dagrun(session=session) assert dr.partition_key is None diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py b/airflow-core/tests/unit/serialization/test_dag_serialization.py index 61a66c60fb647..07459a2011711 100644 --- a/airflow-core/tests/unit/serialization/test_dag_serialization.py +++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py @@ -93,7 +93,7 @@ validate_and_load_priority_weight_strategy, ) from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep -from airflow.timetables.simple import NullTimetable, OnceTimetable, PartitionAtRuntime +from airflow.timetables.simple import NullTimetable, OnceTimetable, PartitionedAtRuntime from airflow.timetables.trigger import CronPartitionTimetable from airflow.triggers.base import StartTriggerArgs from airflow.utils.types import DagRunType @@ -2778,11 +2778,11 @@ def test_create_dagrun_accepts_partition_key_for_partitioned_dag(self, dag_maker assert dr.partition_key == "2025-01-01T00:00:00" @pytest.mark.db_test - def test_create_dagrun_accepts_partition_key_for_partition_at_runtime_dag(self, dag_maker): - """create_dagrun does not raise when partition_key is passed to a PartitionAtRuntime Dag.""" + def test_create_dagrun_accepts_partition_key_for_partitioned_at_runtime_dag(self, dag_maker): + """create_dagrun does not raise when partition_key is passed to a PartitionedAtRuntime Dag.""" with dag_maker( - dag_id="test_partition_at_runtime", - schedule=PartitionAtRuntime(), + dag_id="test_partitioned_at_runtime", + schedule=PartitionedAtRuntime(), serialized=True, ): pass diff --git a/task-sdk/docs/api.rst b/task-sdk/docs/api.rst index d31695f486eae..984b0cd7261d2 100644 --- a/task-sdk/docs/api.rst +++ b/task-sdk/docs/api.rst @@ -214,7 +214,7 @@ Timetables .. autoapiclass:: airflow.sdk.MultipleCronTriggerTimetable -.. autoapiclass:: airflow.sdk.PartitionAtRuntime +.. autoapiclass:: airflow.sdk.PartitionedAtRuntime .. autoapiclass:: airflow.sdk.PartitionedAssetTimetable diff --git a/task-sdk/src/airflow/sdk/__init__.py b/task-sdk/src/airflow/sdk/__init__.py index 8e8953a8aca88..f3452389d17db 100644 --- a/task-sdk/src/airflow/sdk/__init__.py +++ b/task-sdk/src/airflow/sdk/__init__.py @@ -67,7 +67,7 @@ "ObjectStoragePath", "Param", "ParamsDict", - "PartitionAtRuntime", + "PartitionedAtRuntime", "PartitionedAssetTimetable", "PartitionMapper", "PokeReturnValue", @@ -199,8 +199,8 @@ from airflow.sdk.definitions.template import literal from airflow.sdk.definitions.timetables.assets import ( AssetOrTimeSchedule, - PartitionAtRuntime, PartitionedAssetTimetable, + PartitionedAtRuntime, ) from airflow.sdk.definitions.timetables.events import EventsTimetable from airflow.sdk.definitions.timetables.interval import ( @@ -269,7 +269,7 @@ "ObjectStoragePath": ".io.path", "Param": ".definitions.param", "ParamsDict": ".definitions.param", - "PartitionAtRuntime": ".definitions.timetables.assets", + "PartitionedAtRuntime": ".definitions.timetables.assets", "PartitionedAssetTimetable": ".definitions.timetables.assets", "PartitionMapper": ".definitions.partition_mappers.base", "PokeReturnValue": ".bases.sensor", diff --git a/task-sdk/src/airflow/sdk/__init__.pyi b/task-sdk/src/airflow/sdk/__init__.pyi index 273b0579f3ec3..d48bf614407f8 100644 --- a/task-sdk/src/airflow/sdk/__init__.pyi +++ b/task-sdk/src/airflow/sdk/__init__.pyi @@ -110,8 +110,8 @@ from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup from airflow.sdk.definitions.template import literal as literal from airflow.sdk.definitions.timetables.assets import ( AssetOrTimeSchedule, - PartitionAtRuntime, PartitionedAssetTimetable, + PartitionedAtRuntime, ) from airflow.sdk.definitions.timetables.events import EventsTimetable from airflow.sdk.definitions.timetables.interval import ( @@ -177,7 +177,7 @@ __all__ = [ "ObjectStoragePath", "Param", "PokeReturnValue", - "PartitionAtRuntime", + "PartitionedAtRuntime", "PartitionedAssetTimetable", "PartitionMapper", "ProductMapper", diff --git a/task-sdk/src/airflow/sdk/bases/timetable.py b/task-sdk/src/airflow/sdk/bases/timetable.py index bd37a6a0a7955..27769d6b31c1f 100644 --- a/task-sdk/src/airflow/sdk/bases/timetable.py +++ b/task-sdk/src/airflow/sdk/bases/timetable.py @@ -51,7 +51,7 @@ class BaseTimetable: """ Whether this timetable defers partition selection to task runtime. - *True* for :class:`~airflow.sdk.PartitionAtRuntime`; downstream code can + *True* for :class:`~airflow.sdk.PartitionedAtRuntime`; downstream code can branch on this flag instead of using ``isinstance``. """ diff --git a/task-sdk/src/airflow/sdk/definitions/timetables/assets.py b/task-sdk/src/airflow/sdk/definitions/timetables/assets.py index a0c6493692572..22208693588de 100644 --- a/task-sdk/src/airflow/sdk/definitions/timetables/assets.py +++ b/task-sdk/src/airflow/sdk/definitions/timetables/assets.py @@ -54,7 +54,7 @@ class PartitionedAssetTimetable(AssetTriggeredTimetable): default_partition_mapper: PartitionMapper = IdentityMapper() -class PartitionAtRuntime(BaseTimetable): +class PartitionedAtRuntime(BaseTimetable): """Marker timetable indicating that partition key(s) are determined at runtime.""" can_be_scheduled = False diff --git a/task-sdk/tests/task_sdk/definitions/test_dag.py b/task-sdk/tests/task_sdk/definitions/test_dag.py index 0e9f6c78b3801..b8f9d01b5b016 100644 --- a/task-sdk/tests/task_sdk/definitions/test_dag.py +++ b/task-sdk/tests/task_sdk/definitions/test_dag.py @@ -25,7 +25,16 @@ import pytest -from airflow.sdk import DAG, Context, Label, Param, PartitionAtRuntime, TaskGroup, dag as dag_decorator, task +from airflow.sdk import ( + DAG, + Context, + Label, + Param, + PartitionedAtRuntime, + TaskGroup, + dag as dag_decorator, + task, +) from airflow.sdk.bases.operator import BaseOperator from airflow.sdk.bases.timetable import BaseTimetable from airflow.sdk.definitions.param import DagParam, ParamsDict @@ -438,8 +447,8 @@ def test_continuous_schedule_linmits_max_active_runs(self): with pytest.raises(ValueError, match="ContinuousTimetable requires max_active_runs <= 1"): dag = DAG("continuous", start_date=DEFAULT_DATE, schedule="@continuous", max_active_runs=25) - def test_only_partition_at_runtime_has_partitioned_at_runtime_flag(self): - """Regression guard: across every BaseTimetable subclass, only PartitionAtRuntime sets partitioned_at_runtime=True.""" + def test_only_partitioned_at_runtime_has_partitioned_at_runtime_flag(self): + """Regression guard: across every BaseTimetable subclass, only PartitionedAtRuntime sets partitioned_at_runtime=True.""" def all_subclasses(cls): for sub in cls.__subclasses__(): @@ -447,7 +456,7 @@ def all_subclasses(cls): yield from all_subclasses(sub) flagged = {c for c in all_subclasses(BaseTimetable) if c.partitioned_at_runtime} - assert flagged == {PartitionAtRuntime} + assert flagged == {PartitionedAtRuntime} def test_dag_add_task_checks_trigger_rule(self): # A non fail stop dag should allow any trigger rule From 9856115f815cd3dfab2411a7036dedd5ec785b6d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 26 Jun 2026 08:56:52 +0800 Subject: [PATCH 05/29] [v3-3-test] Add partition_date to DagRun detail page (#68969) (#68977) Co-authored-by: Wei Lee --- .../src/airflow/ui/public/i18n/locales/en/common.json | 1 + .../src/airflow/ui/public/i18n/locales/zh-TW/common.json | 1 + airflow-core/src/airflow/ui/src/pages/Run/Details.tsx | 8 ++++++++ 3 files changed, 10 insertions(+) diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 3aaa5115bb2f5..7b5f37c9cfd3b 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -72,6 +72,7 @@ "expectedDuration": "Expected Duration", "lastSchedulingDecision": "Last Scheduling Decision", "mappedPartitionKey": "Mapped Partition key", + "partitionDate": "Partition Date", "partitionKey": "Partition key", "queuedAt": "Queued At", "runAfter": "Run After", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json index 90debb241c24b..a98d200635ce3 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json @@ -72,6 +72,7 @@ "expectedDuration": "預計時長", "lastSchedulingDecision": "最後排程決策", "mappedPartitionKey": "映射分區鍵", + "partitionDate": "資產分區日期", "partitionKey": "資產分區鍵", "queuedAt": "開始排隊時間", "runAfter": "最早可執行時間", diff --git a/airflow-core/src/airflow/ui/src/pages/Run/Details.tsx b/airflow-core/src/airflow/ui/src/pages/Run/Details.tsx index e233c528b54cc..e7ec9fa836437 100644 --- a/airflow-core/src/airflow/ui/src/pages/Run/Details.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Run/Details.tsx @@ -145,6 +145,14 @@ export const Details = () => {