-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Fix dagrun starvation #64109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix dagrun starvation #64109
Changes from all commits
f922f2b
85e4c6a
dd6b74d
ecdb722
8356843
fe5462d
1c0c28f
c47e216
9c28d0d
9992724
1923f9f
c78e0a8
512bcf8
a257185
ee4c094
049f50e
aeeff8b
70110f4
6027d87
6efbfba
36bab96
be3863b
c27c071
c432be9
ea29eb6
a3bdc84
e2e6a1f
77ba9b2
d68d3a1
168e86b
f1ce411
8bbc5b3
b9a3644
86b6d99
9ed23af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -735,16 +735,33 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da | |
| .subquery() | ||
| ) | ||
|
|
||
| query = ( | ||
| select(cls) | ||
| .where(cls.state == DagRunState.QUEUED) | ||
| available_dagruns_rn = ( | ||
| select( | ||
| DagRun.dag_id, | ||
| DagRun.id, | ||
| running_drs.c.num_running, | ||
| func.row_number() | ||
| .over( | ||
| partition_by=[DagRun.dag_id, DagRun.backfill_id], | ||
| order_by=[ | ||
| nulls_first(cast("ColumnElement[Any]", BackfillDagRun.sort_ordinal), session=session), | ||
| nulls_first( | ||
| cast("ColumnElement[Any]", cls.last_scheduling_decision), session=session | ||
| ), | ||
| nulls_first(running_drs.c.num_running, session=session), | ||
| DagRun.run_after, | ||
| ], | ||
| ) | ||
| .label("rn"), | ||
| ) | ||
| .where(DagRun.state == DagRunState.QUEUED) | ||
|
Nataneljpwd marked this conversation as resolved.
|
||
| .join( | ||
| DagModel, | ||
| running_drs, | ||
| and_( | ||
| DagModel.dag_id == cls.dag_id, | ||
| DagModel.is_paused == false(), | ||
| DagModel.is_stale == false(), | ||
| running_drs.c.dag_id == DagRun.dag_id, | ||
| running_drs.c.backfill_id == DagRun.backfill_id, | ||
| ), | ||
| isouter=True, | ||
| ) | ||
| .join( | ||
| BackfillDagRun, | ||
|
|
@@ -754,37 +771,39 @@ def get_queued_dag_runs_to_set_running(cls, session: Session) -> ScalarResult[Da | |
| ), | ||
| isouter=True, | ||
| ) | ||
| .join(Backfill, isouter=True) | ||
| .subquery() | ||
| ) | ||
|
|
||
| query = ( | ||
| select(cls) | ||
|
Nataneljpwd marked this conversation as resolved.
|
||
| .join( | ||
| running_drs, | ||
| available_dagruns_rn, | ||
| and_( | ||
| running_drs.c.dag_id == DagRun.dag_id, | ||
| coalesce(running_drs.c.backfill_id, text("-1")) | ||
| == coalesce(DagRun.backfill_id, text("-1")), | ||
| available_dagruns_rn.c.id == DagRun.id, | ||
| available_dagruns_rn.c.dag_id == DagRun.dag_id, | ||
| ), | ||
| ) | ||
| .join( | ||
| DagModel, | ||
| and_( | ||
| DagModel.dag_id == cls.dag_id, | ||
| DagModel.is_paused == false(), | ||
| DagModel.is_stale == false(), | ||
| ), | ||
| isouter=True, | ||
| ) | ||
| .join(Backfill, isouter=True) | ||
| .where( | ||
| # there are two levels of checks for num_running | ||
| # the one done in this query verifies that the dag is not maxed out | ||
| # it could return many more dag runs than runnable if there is even | ||
| # capacity for 1. this could be improved. | ||
| coalesce(running_drs.c.num_running, text("0")) | ||
| < coalesce(Backfill.max_active_runs, DagModel.max_active_runs), | ||
| # this check returns strictly only the amount of dagruns | ||
| # which can run according to the max active runs limit | ||
| available_dagruns_rn.c.rn | ||
| <= coalesce( | ||
| Backfill.max_active_runs, | ||
| DagModel.max_active_runs, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had this before yet I was told to remove it as it should never be null
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think theoratically it can’t be null but in practice it’s unclear whether an installation upgraded all the way from very old Airflow would have a null value in the column. It would be safer to consider null IMO. |
||
| ) | ||
|
Nataneljpwd marked this conversation as resolved.
|
||
| - coalesce(available_dagruns_rn.c.num_running, 0), | ||
| # don't set paused dag runs as running | ||
| not_(coalesce(cast("ColumnElement[bool]", Backfill.is_paused), False)), | ||
| ) | ||
| .order_by( | ||
| # ordering by backfill sort ordinal first ensures that backfill dag runs | ||
| # have lower priority than all other dag run types (since sort_ordinal >= 1). | ||
| # additionally, sorting by sort_ordinal ensures that the backfill | ||
| # dag runs are created in the right order when that matters. | ||
| # todo: AIP-78 use row_number to avoid starvation; limit the number of returned runs per-dag | ||
| nulls_first(cast("ColumnElement[Any]", BackfillDagRun.sort_ordinal), session=session), | ||
| nulls_first(cast("ColumnElement[Any]", cls.last_scheduling_decision), session=session), | ||
| nulls_first(running_drs.c.num_running, session=session), # many running -> lower priority | ||
| cls.run_after, | ||
| ) | ||
| .limit(cls.DEFAULT_DAGRUNS_TO_EXAMINE) | ||
| ) | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2632,7 +2632,10 @@ def test_find_executable_task_instances_max_active_tis_per_dagrun_deferred(self, | |
| dag_id = "SchedulerJobTest.test_max_active_tis_per_dagrun_deferred" | ||
| with dag_maker(dag_id=dag_id, max_active_tasks=16, session=session): | ||
| task_a = EmptyOperator.partial(task_id="task_a", max_active_tis_per_dagrun=1).expand_kwargs( | ||
| [{"inputs": 1}, {"inputs": 2}] | ||
| [ | ||
| {"inputs": 1}, | ||
| {"inputs": 2}, | ||
| ] | ||
| ) | ||
| EmptyOperator(task_id="task_b") | ||
|
|
||
|
|
@@ -4076,6 +4079,94 @@ def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, ses | |
| dag_runs = DagRun.find(dag_id=dag.dag_id, session=session) | ||
| assert len(dag_runs) == 2 | ||
|
|
||
| def test_runs_are_not_starved_by_max_active_runs_limit(self, dag_maker, session): | ||
| """ | ||
| Test that dagruns are not starved by max_active_runs | ||
| """ | ||
| scheduler_job = Job() | ||
| self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) | ||
|
|
||
| dag_ids = ["dag1", "dag2", "dag3"] | ||
|
|
||
| max_active_runs = 3 | ||
|
|
||
| for dag_id in dag_ids: | ||
| with dag_maker( | ||
| dag_id=dag_id, | ||
| max_active_runs=max_active_runs, | ||
| session=session, | ||
| catchup=True, | ||
| schedule=timedelta(seconds=60), | ||
| start_date=DEFAULT_DATE, | ||
| ): | ||
| # Need to use something that doesn't immediately get marked as success by the scheduler | ||
| BashOperator(task_id="task", bash_command="true") | ||
|
|
||
| dag_run = dag_maker.create_dagrun( | ||
| state=State.QUEUED, session=session, run_type=DagRunType.SCHEDULED | ||
| ) | ||
|
|
||
| for _ in range(50): | ||
| # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs | ||
| dag_run = dag_maker.create_dagrun_after( | ||
| dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED | ||
| ) | ||
|
|
||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| running_dagrun_count = session.scalar( | ||
| select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) | ||
| ) | ||
|
|
||
| assert running_dagrun_count == max_active_runs * len(dag_ids) | ||
|
|
||
| def test_no_more_dagruns_are_set_to_running_when_max_active_runs_exceeded(self, dag_maker, session): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not convinced this test really adequately covers the case you claim it does. With max_active_runs=1 and 6 runs already RUNNING, the query's new rn <= max_active_runs - num_running filter never gets a chance to matter — the candidates that reach _start_queued_dagruns are rejected by the existing Python guard at scheduler_job_runner.py:2345 (active_runs >= dag_run.max_active_runs), which this PR doesn't touch. So running_pre == running_post holds with or without your query change; i suspect this test would pass on main. To actually cover the fix, you need a case where the query's per-DAG cap is what limits candidates: e.g. one DAG at its max_active_runs with non-backfill runs already running plus a large queued backlog, alongside a second DAG with free capacity, then assert the second DAG isn't starved out of the 20-candidate budget
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hello Ash, I am not sure I understood, as I have a test for it on line 3810, where I have 3 dag id's, i create 50 of each instance and I set the max_active_runs to 3, when I query, I use batches of 20 (the default), which does cover the changes I have made in the PR talking about correct me if I am wrong, but I think that is what you were talking about, in 1 query in the past it would only get 3 form dag1, for the next iteration get nothing, for the next 3 from dag2 and so on
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am also pretty sure that now it is safe to remove the check in the scheduler job runner, as it is done in the query instead, and there is no way for a race condition as the rows are locked, though I left it as per Kaxil's request |
||
| """ | ||
| Test that dagruns are not moved to running if there are more than the max_active_runs running dagruns | ||
| """ | ||
| scheduler_job = Job() | ||
| self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) | ||
|
|
||
| max_active_runs = 1 | ||
| with dag_maker( | ||
| dag_id="test_dag", | ||
| max_active_runs=max_active_runs, | ||
| session=session, | ||
| catchup=True, | ||
| schedule=timedelta(seconds=60), | ||
| start_date=DEFAULT_DATE, | ||
| ): | ||
| # Need to use something that doesn't immediately get marked as success by the scheduler | ||
| BashOperator(task_id="task", bash_command="true") | ||
|
|
||
| dag_run = dag_maker.create_dagrun(state=State.RUNNING, session=session, run_type=DagRunType.SCHEDULED) | ||
|
|
||
| for _ in range(5): | ||
| # create a bunch of dagruns in running state, to exceed max_active_runs | ||
| dag_run = dag_maker.create_dagrun_after( | ||
| dag_run, run_type=DagRunType.SCHEDULED, state=State.RUNNING | ||
| ) | ||
|
|
||
| running_dagruns_pre = session.scalar( | ||
| select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) | ||
| ) | ||
|
|
||
| for _ in range(5): | ||
| # create a bunch of dagruns in queued state, to make sure they are filtered by max_active_runs | ||
| dag_run = dag_maker.create_dagrun_after( | ||
| dag_run, run_type=DagRunType.SCHEDULED, state=State.QUEUED | ||
| ) | ||
|
|
||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| running_dagruns_post = session.scalar( | ||
| select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING) | ||
| ) | ||
|
|
||
| assert running_dagruns_pre == running_dagruns_post | ||
|
|
||
| def test_dagrun_timeout_verify_max_active_runs(self, dag_maker, session): | ||
| """ | ||
| Test if a dagrun will not be scheduled if max_dag_runs | ||
|
|
@@ -6802,41 +6893,55 @@ def _running_counts(): | |
| EmptyOperator(task_id="mytask") | ||
|
|
||
| dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED) | ||
| for _ in range(9): | ||
| for _ in range(29): | ||
| dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, state=State.QUEUED) | ||
|
|
||
| # initial state -- nothing is running | ||
| assert dag1_non_b_running == 0 | ||
| assert dag1_b_running == 0 | ||
| assert total_running == 0 | ||
| assert session.scalar(select(func.count(DagRun.id))) == 46 | ||
| assert session.scalar(select(func.count(DagRun.id))) == 66 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
| # now let's run it once | ||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| # after running the scheduler one time, observe that only one dag run is started | ||
| # this is because there are 30 runs for dag 1 so neither the backfills nor | ||
| # any runs for dag2 get started | ||
| # and 3 backfill dagruns are started | ||
| # this is because there are 30 queued dagruns, many of which get filtered because their DAGs | ||
| # have already reached max_active_runs | ||
| # and so due to the default dagruns-to-examine limit, we look at the first 20 dagruns that CAN be run | ||
| # according to the max_active_runs parameter, meaning 3 backfill runs will start, 1 non-backfill, | ||
| # and all runnable dagruns for dag2 | ||
| assert DagRun.DEFAULT_DAGRUNS_TO_EXAMINE == 20 | ||
| dag1_non_b_running, dag1_b_running, total_running = _running_counts() | ||
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 0 | ||
| assert total_running == 1 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert dag1_b_running == 3 | ||
| assert total_running == 20 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 66 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
| # now we finish all lower priority scheduled runs, and observe new higher priority tasks are started | ||
| session.execute( | ||
| update(DagRun) | ||
| .where(DagRun.dag_id == "test_dag2", DagRun.state == DagRunState.RUNNING) | ||
| .values(state=DagRunState.SUCCESS) | ||
| ) | ||
| session.commit() | ||
|
Nataneljpwd marked this conversation as resolved.
|
||
| session.flush() | ||
|
|
||
| # we run scheduler again and observe that now all the runs are created | ||
| # other than the finished runs of the backfill | ||
| # this must be because sorting is working | ||
| # new tasks from test dag 2 should run, and so they are scheduled | ||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| dag1_non_b_running, dag1_b_running, total_running = _running_counts() | ||
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 3 | ||
| assert total_running == 14 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert total_running == 18 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 66 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
| # run it a 3rd time and nothing changes | ||
|
|
@@ -6846,8 +6951,8 @@ def _running_counts(): | |
| dag1_non_b_running, dag1_b_running, total_running = _running_counts() | ||
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 3 | ||
| assert total_running == 14 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert total_running == 18 | ||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 66 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
| def test_backfill_runs_are_started_with_lower_priority_catchup_false(self, dag_maker, session): | ||
|
|
@@ -7067,25 +7172,11 @@ def _running_counts(): | |
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 3 | ||
|
|
||
| # this should be 14 but it is not. why? | ||
| # answer: because dag2 got starved out by dag1 | ||
| # if we run the scheduler again, dag2 should get queued | ||
| assert total_running == 4 | ||
| assert total_running == 14 | ||
|
|
||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
| # run scheduler a second time | ||
| self.job_runner._start_queued_dagruns(session) | ||
| session.flush() | ||
|
|
||
| dag1_non_b_running, dag1_b_running, total_running = _running_counts() | ||
| assert dag1_non_b_running == 1 | ||
| assert dag1_b_running == 3 | ||
|
|
||
| # on the second try, dag 2's 10 runs now start running | ||
| assert total_running == 14 | ||
|
|
||
| assert session.scalar(select(func.count()).select_from(DagRun)) == 46 | ||
| assert session.scalar(select(func.count()).where(DagRun.dag_id == dag1_dag_id)) == 36 | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.