-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Enhanced run_immediately functionality #64162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
88954be
95bce09
1e50f3b
50ec5cf
8754005
dcb04ca
521da9d
e5d70eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -151,6 +151,63 @@ must be a :class:`datetime.timedelta` or ``dateutil.relativedelta.relativedelta` | |
| def example_dag(): | ||
| pass | ||
|
|
||
| .. versionadded:: 3.0.0 | ||
| The ``run_immediately`` argument was introduced in Airflow 3. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is not needed. The |
||
|
|
||
| The optional ``run_immediately`` argument controls which cron point is scheduled when a Dag is first | ||
| enabled or re-enabled after a pause. It has no effect when ``catchup=True`` (in that case the | ||
| scheduler always continues from where it left off). | ||
|
|
||
|
manipatnam marked this conversation as resolved.
|
||
| * ``run_immediately=True`` *(default)* — schedule the **most recent past** cron point immediately. | ||
| * ``run_immediately=False`` — skip the past cron point and wait for the **next future** cron point. | ||
| * ``run_immediately=timedelta(...)`` — schedule the most recent past cron point only if it fired | ||
| within the given window; otherwise wait for the **next future** cron point. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
| from datetime import datetime, timedelta | ||
|
|
||
| from airflow.timetables.trigger import CronTriggerTimetable | ||
|
|
||
|
|
||
| @dag( | ||
| # Runs every 10 minutes. | ||
| # run_immediately=False: always skip the most recent past slot and wait | ||
| # for the next 10-minute boundary. | ||
| schedule=CronTriggerTimetable( | ||
| "*/10 * * * *", | ||
| timezone="UTC", | ||
| run_immediately=False, | ||
| ), | ||
| start_date=datetime(2024, 1, 1), | ||
| catchup=False, | ||
| ..., | ||
| ) | ||
| def example_dag(): | ||
| pass | ||
|
|
||
|
|
||
| @dag( | ||
| # Runs hourly. | ||
| # run_immediately=timedelta(minutes=10): run the most recent past slot | ||
| # only if it fired within the last 10 minutes; otherwise wait for next. | ||
| schedule=CronTriggerTimetable( | ||
| "0 * * * *", | ||
| timezone="UTC", | ||
| run_immediately=timedelta(minutes=10), | ||
| ), | ||
| start_date=datetime(2024, 1, 1), | ||
| catchup=False, | ||
| ..., | ||
| ) | ||
| def example_dag_with_buffer(): | ||
| pass | ||
|
|
||
| .. note:: | ||
|
|
||
| ``run_immediately`` is a parameter of ``CronTriggerTimetable``, **not** of the ``DAG`` | ||
| constructor. Passing it directly to ``DAG(run_immediately=...)`` has no effect. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does have an effect; your dag file will error because the argument does not exist. This should say Passing it directly … is an error. |
||
|
|
||
|
|
||
| .. _MultipleCronTriggerTimetable: | ||
|
|
||
|
|
@@ -169,7 +226,7 @@ This is similar to CronTriggerTimetable_ except it takes multiple cron expressio | |
| def example_dag(): | ||
| pass | ||
|
|
||
| The same optional ``interval`` argument as CronTriggerTimetable_ is also available. | ||
| The same optional ``interval`` and ``run_immediately`` arguments as CronTriggerTimetable_ are also available. | ||
|
|
||
| .. code-block:: python | ||
|
|
||
|
|
@@ -350,10 +407,11 @@ The following is another example showing the difference in the case of skipping | |
|
|
||
| Suppose there are two running Dags with a cron expression ``@daily`` or ``0 0 * * *`` that use the two different timetables. If you pause the Dags at 3PM on January 31st and re-enable them at 3PM on February 2nd, | ||
|
|
||
| - `CronTriggerTimetable`_ skips the Dag runs that were supposed to trigger on February 1st and 2nd. The next Dag run will be triggered at 12AM on February 3rd. | ||
| - `CronDataIntervalTimetable`_ skips the Dag runs that were supposed to trigger on February 1st only. A Dag run for February 2nd is immediately triggered after you re-enable the Dag. | ||
| - Both `CronTriggerTimetable`_ and `CronDataIntervalTimetable`_ skip the Dag run that was supposed to trigger on February 1st. A Dag run for February 2nd is immediately triggered after you re-enable the Dag. | ||
|
|
||
| The difference between the two timetables in this scenario is the ``run_id`` timestamp: for ``CronTriggerTimetable``, the ``run_id`` reflects midnight on February 2nd (the trigger time), while for ``CronDataIntervalTimetable``, the ``run_id`` reflects midnight on February 1st (the start of the data interval being processed). | ||
|
|
||
| In these examples, you see how a trigger timetable creates Dag runs more intuitively and similar to what | ||
| In the first example (enabling a new Dag), you see how a trigger timetable creates Dag runs more intuitively and similar to what | ||
| people expect a workflow to behave, while a data interval timetable is designed heavily around the data | ||
| interval it processes, and does not reflect a workflow's own properties. | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -87,12 +87,15 @@ def next_dagrun_info( | |
| else: | ||
| next_start_time = self._align_to_next(restriction.earliest) | ||
| else: | ||
| start_time_candidates = [self._align_to_prev(coerce_datetime(utcnow()))] | ||
| if last_automated_data_interval is not None: | ||
| start_time_candidates.append(self._get_next(last_automated_data_interval.end)) | ||
| elif restriction.earliest is None: | ||
| # Run immediately has no effect if there is restriction on earliest | ||
| start_time_candidates.append(self._calc_first_run()) | ||
| # _calc_first_run respects run_immediately to decide between the | ||
| # most recent past cron point and the next future one. | ||
|
Comment on lines
+91
to
+92
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this comment trying to explain? |
||
| start_time_candidates = [ | ||
| self._calc_first_run(), | ||
| self._get_next(last_automated_data_interval.end), | ||
| ] | ||
| else: | ||
| start_time_candidates = [self._calc_first_run()] | ||
| if restriction.earliest is not None: | ||
| start_time_candidates.append(self._align_to_next(restriction.earliest)) | ||
| next_start_time = max(start_time_candidates) | ||
|
|
@@ -165,19 +168,17 @@ class CronTriggerTimetable(CronMixin, _TriggerTimetable): | |
| :param timezone: Which timezone to use to interpret the cron string | ||
| :param interval: timedelta that defines the data interval start. Default 0. | ||
|
|
||
| *run_immediately* controls, if no *start_time* is given to the DAG, when | ||
| the first run of the DAG should be scheduled. It has no effect if there | ||
| already exist runs for this DAG. | ||
|
|
||
| * If *True*, always run immediately the most recent possible DAG run. | ||
| * If *False*, wait to run until the next scheduled time in the future. | ||
| * If passed a ``timedelta``, will run the most recent possible DAG run | ||
| if that run's ``data_interval_end`` is within timedelta of now. | ||
| * If *None*, the timedelta is calculated as 10% of the time between the | ||
| most recent past scheduled time and the next scheduled time. E.g. if | ||
| running every hour, this would run the previous time if less than 6 | ||
| minutes had past since the previous run time, otherwise it would wait | ||
| until the next hour. | ||
| *run_immediately* controls which cron point is scheduled when a Dag is | ||
| first enabled or re-enabled after a pause. It always takes effect | ||
| regardless of whether ``start_date`` is set, but has no effect when | ||
| ``catchup=True``. | ||
|
|
||
| .. versionadded:: 3.0.0 | ||
|
|
||
| * If *True* (default), always run the most recent past cron point immediately. | ||
| * If *False*, skip the past cron point and wait for the next future one. | ||
| * If passed a ``timedelta``, run the most recent past cron point only if it | ||
| is within that timedelta of now; otherwise wait for the next future one. | ||
| """ | ||
|
|
||
| def __init__( | ||
|
|
@@ -186,7 +187,7 @@ def __init__( | |
| *, | ||
| timezone: str | Timezone | FixedTimezone, | ||
| interval: datetime.timedelta | relativedelta = datetime.timedelta(), | ||
| run_immediately: bool | datetime.timedelta = False, | ||
| run_immediately: bool | datetime.timedelta = True, | ||
|
Comment on lines
-189
to
+190
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changing the default is a breaking change that can’t be made until Airflow 4.0. We cannot accept this change (for about the next three years at least). |
||
| ) -> None: | ||
| super().__init__(cron, timezone) | ||
| self._interval = interval | ||
|
manipatnam marked this conversation as resolved.
|
||
|
|
@@ -200,7 +201,7 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable: | |
| data["expression"], | ||
| timezone=parse_timezone(data["timezone"]), | ||
| interval=decode_interval(data["interval"]), | ||
| run_immediately=decode_run_immediately(data.get("run_immediately", False)), | ||
| run_immediately=decode_run_immediately(data.get("run_immediately", True)), | ||
| ) | ||
|
|
||
| def serialize(self) -> dict[str, Any]: | ||
|
|
@@ -215,26 +216,21 @@ def serialize(self) -> dict[str, Any]: | |
|
|
||
| def _calc_first_run(self) -> DateTime: | ||
| """ | ||
| If no start_time is set, determine the start. | ||
| Determine which cron point to schedule next based on ``run_immediately``. | ||
|
|
||
| If True, always prefer past run, if False, never. If None, if within 10% of next run, | ||
| if timedelta, if within that timedelta from past run. | ||
| If *True*, always run the most recent past cron point. | ||
| If *False*, always wait for the next future cron point. | ||
| If a ``timedelta``, run the most recent past cron point only if it falls | ||
| within that window of now; otherwise wait for the next future cron point. | ||
| """ | ||
| now = coerce_datetime(utcnow()) | ||
| past_run_time = self._align_to_prev(now) | ||
| next_run_time = self._align_to_next(now) | ||
| if self._run_immediately is True: # Check for 'True' exactly because deltas also evaluate to true. | ||
| return past_run_time | ||
|
|
||
| gap_between_runs = next_run_time - past_run_time | ||
| gap_to_past = now - past_run_time | ||
| if isinstance(self._run_immediately, datetime.timedelta): | ||
| buffer_between_runs = self._run_immediately | ||
| else: | ||
| buffer_between_runs = max(gap_between_runs / 10, datetime.timedelta(minutes=5)) | ||
| if gap_to_past <= buffer_between_runs: | ||
| return past_run_time | ||
| return next_run_time | ||
| if now - past_run_time <= self._run_immediately: | ||
| return past_run_time | ||
| return self._align_to_next(now) | ||
|
|
||
|
|
||
| class MultipleCronTriggerTimetable(Timetable): | ||
|
|
@@ -253,7 +249,7 @@ def __init__( | |
| *crons: str, | ||
| timezone: str | Timezone | FixedTimezone, | ||
| interval: datetime.timedelta | relativedelta = datetime.timedelta(), | ||
| run_immediately: bool | datetime.timedelta = False, | ||
| run_immediately: bool | datetime.timedelta = True, | ||
| ) -> None: | ||
| if not crons: | ||
| raise ValueError("cron expression required") | ||
|
|
@@ -373,19 +369,15 @@ class CronPartitionTimetable(CronTriggerTimetable): | |
| The partition key will be derived from the partition date. | ||
| :param key_format: How to translate the partition date into a string partition key. | ||
|
|
||
| *run_immediately* controls, if no *start_time* is given to the Dag, when | ||
| the first run of the Dag should be scheduled. It has no effect if there | ||
| already exist runs for this Dag. | ||
| *run_immediately* controls which cron point is scheduled when a Dag is | ||
| first enabled or re-enabled after a pause. It always takes effect | ||
| regardless of whether ``start_date`` is set, but has no effect when | ||
| ``catchup=True``. | ||
|
|
||
| * If *True*, always run immediately the most recent possible Dag run. | ||
| * If *False*, wait to run until the next scheduled time in the future. | ||
| * If passed a ``timedelta``, will run the most recent possible Dag run | ||
| if that run's ``data_interval_end`` is within timedelta of now. | ||
| * If *None*, the timedelta is calculated as 10% of the time between the | ||
| most recent past scheduled time and the next scheduled time. E.g. if | ||
| running every hour, this would run the previous time if less than 6 | ||
| minutes had past since the previous run time, otherwise it would wait | ||
| until the next hour. | ||
| * If *True* (default), always run the most recent past cron point immediately. | ||
| * If *False*, skip the past cron point and wait for the next future one. | ||
| * If passed a ``timedelta``, run the most recent past cron point only if it | ||
| is within that timedelta of now; otherwise wait for the next future one. | ||
|
|
||
| # todo: AIP-76 talk about how we can have auto-reprocessing of partitions | ||
| # todo: AIP-76 we could allow a tuple of integer + time-based | ||
|
|
@@ -399,7 +391,7 @@ def __init__( | |
| *, | ||
| timezone: str | Timezone | FixedTimezone, | ||
| run_offset: int | datetime.timedelta | relativedelta | None = None, | ||
| run_immediately: bool | datetime.timedelta = False, | ||
| run_immediately: bool | datetime.timedelta = True, | ||
| # todo: AIP-76 we can't infer partition date from this, so we need to store it separately. | ||
| key_format: str = r"%Y-%m-%dT%H:%M:%S", | ||
| ) -> None: | ||
|
manipatnam marked this conversation as resolved.
|
||
|
|
@@ -426,7 +418,7 @@ def deserialize(cls, data: dict[str, Any]) -> Timetable: | |
| cron=data["expression"], | ||
| timezone=parse_timezone(data["timezone"]), | ||
| run_offset=offset, | ||
| run_immediately=decode_run_immediately(data.get("run_immediately", False)), | ||
| run_immediately=decode_run_immediately(data.get("run_immediately", True)), | ||
| key_format=data["key_format"], | ||
| ) | ||
|
|
||
|
|
@@ -485,15 +477,15 @@ def next_dagrun_info_v2( | |
| else: | ||
| next_start_time = self._align_to_next(restriction.earliest) | ||
| else: | ||
| prev_candidate = self._align_to_prev(coerce_datetime(utcnow())) | ||
| start_time_candidates = [prev_candidate] | ||
| if last_dagrun_info is not None: | ||
| next_candidate = self._get_next(last_dagrun_info.run_after) | ||
| start_time_candidates.append(next_candidate) | ||
| elif restriction.earliest is None: | ||
| # Run immediately has no effect if there is restriction on earliest | ||
| first_run = self._calc_first_run() | ||
| start_time_candidates.append(first_run) | ||
| # _calc_first_run respects run_immediately to decide between the | ||
| # most recent past cron point and the next future one. | ||
| start_time_candidates = [ | ||
| self._calc_first_run(), | ||
| self._get_next(last_dagrun_info.run_after), | ||
| ] | ||
| else: | ||
|
Comment on lines
+481
to
+487
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as mentioned above; comment is confusing. |
||
| start_time_candidates = [self._calc_first_run()] | ||
| if restriction.earliest is not None: | ||
| earliest = self._align_to_next(restriction.earliest) | ||
| start_time_candidates.append(earliest) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.