diff --git a/airflow-core/src/airflow/partition_mappers/temporal.py b/airflow-core/src/airflow/partition_mappers/temporal.py index 1938a18f84da8..c16801c97b132 100644 --- a/airflow-core/src/airflow/partition_mappers/temporal.py +++ b/airflow-core/src/airflow/partition_mappers/temporal.py @@ -469,9 +469,11 @@ class FanOutMapper(PartitionMapper): is N→1 (downstream waits until all members arrive), fan-out is 1→N (one upstream event creates one downstream Dag run per member). - For forward fan-out (emit the trailing period ending at the upstream key, - instead of the period it represents), pass ``direction=Window.Direction.FORWARD`` - to the window: + ``window``'s direction controls which period each upstream key fans out to. + The default, ``Window.Direction.FORWARD``, yields the period *starting at* + the upstream key (the period it represents); ``Window.Direction.BACKWARD`` + yields the trailing period *ending at* the upstream key. Pass + ``direction=Window.Direction.BACKWARD`` to the window for the latter: .. code-block:: python @@ -482,8 +484,8 @@ class FanOutMapper(PartitionMapper): FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=WeekWindow()) # Weekly upstream → the 7 days ending at the upstream Monday (trailing period) - forward_window = WeekWindow(direction=Window.Direction.FORWARD) - FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=forward_window) + backward_window = WeekWindow(direction=Window.Direction.BACKWARD) + FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=backward_window) """ # Keep ``FanOutMapper.default_downstream_mapper_by_window_name`` in sync with diff --git a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py index 0cf0a705d5cdd..ccb1497f0325b 100644 --- a/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py +++ b/task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py @@ -137,20 +137,23 @@ class (e.g. ``WeekWindow`` → ``StartOfDayMapper``). waits for all members), fan-out is 1→N (one upstream event creates many downstream Dag runs). - For forward fan-out (emit the *next* period's members instead of the current - one), pass ``direction=Window.Direction.FORWARD`` to the window: + ``window``'s direction controls which period each upstream key fans out to. + The default, ``Window.Direction.FORWARD``, yields the period *starting at* + the upstream key (the period it represents); ``Window.Direction.BACKWARD`` + yields the trailing period *ending at* the upstream key. Pass + ``direction=Window.Direction.BACKWARD`` to the window for the latter: .. code-block:: python from airflow.sdk import WeekWindow, Window from airflow.sdk.definitions.partition_mappers.temporal import FanOutMapper, StartOfWeekMapper - # Weekly upstream → 7 daily downstream Dag runs (current week) + # Weekly upstream → 7 daily downstream Dag runs (the 7 days the upstream Monday represents) FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=WeekWindow()) - # Weekly upstream → 7 daily keys for the *following* week - forward_window = WeekWindow(direction=Window.Direction.FORWARD) - FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=forward_window) + # Weekly upstream → the 7 days ending at the upstream Monday (trailing period) + backward_window = WeekWindow(direction=Window.Direction.BACKWARD) + FanOutMapper(upstream_mapper=StartOfWeekMapper(), window=backward_window) """ # Keep ``FanOutMapper.default_downstream_mapper_by_window_name`` in sync with