From 74374fdac8262fcbe529ff1451836ed0bef3f61a Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Fri, 3 Jul 2026 10:58:01 +0100 Subject: [PATCH] [v3-3-test] Rename `state-store cleanup-task-state-store` command to `state-store clean` (#69316) The command added in 3.3.0rc1 repeated "state store" in both the group and the subcommand. Rename it to `airflow state-store clean`, mirroring `airflow db clean`. The old name only shipped in 3.3.0rc1, so renaming before GA keeps a single stable name for 3.3.0. Also make the help text and docs accurate about the current behavior: the command intentionally supports the metastore backend only and skips custom (worker-side) backends, and the docstring now explains why (correct cleanup of a worker-side backend needs to delete the DB refs and the backend data in order, which has to run where the backend's dependencies live, not on the server-side CLI). Fix a wrong class name (`MetastoreStateBackend` -> `MetastoreBackend`) in the docs. (cherry picked from commit 71be2b47b339bf7c76b3cb0b84f50261f3d830e2) Co-authored-by: Kaxil Naik --- .../task-and-asset-state-store-cleanup.rst | 4 +- .../task-and-asset-state-store.rst | 6 +-- airflow-core/src/airflow/cli/cli_config.py | 13 +++--- .../cli/commands/state_store_command.py | 17 ++++++-- .../cli/commands/test_state_store_command.py | 40 ++++++++++++------- 5 files changed, 52 insertions(+), 28 deletions(-) diff --git a/airflow-core/docs/administration-and-deployment/task-and-asset-state-store-cleanup.rst b/airflow-core/docs/administration-and-deployment/task-and-asset-state-store-cleanup.rst index b20a832576740..58a5073f189fe 100644 --- a/airflow-core/docs/administration-and-deployment/task-and-asset-state-store-cleanup.rst +++ b/airflow-core/docs/administration-and-deployment/task-and-asset-state-store-cleanup.rst @@ -48,7 +48,7 @@ Running cleanup The command is:: - airflow state-store cleanup-task-state-store + airflow state-store clean It reads ``[state_store] default_retention_days`` and ``[state_store] state_cleanup_batch_size`` from the ``airflow.cfg`` file, then deletes all eligible rows. @@ -56,7 +56,7 @@ It reads ``[state_store] default_retention_days`` and ``[state_store] state_clea Use ``--dry-run`` to preview what would be deleted without removing anything:: - airflow state-store cleanup-task-state-store --dry-run + airflow state-store clean --dry-run The output lists every row that would be deleted, grouped by dag, run, task, map index, and key. diff --git a/airflow-core/docs/administration-and-deployment/task-and-asset-state-store.rst b/airflow-core/docs/administration-and-deployment/task-and-asset-state-store.rst index e32393464500b..5ae6c0b1b3331 100644 --- a/airflow-core/docs/administration-and-deployment/task-and-asset-state-store.rst +++ b/airflow-core/docs/administration-and-deployment/task-and-asset-state-store.rst @@ -99,7 +99,7 @@ When this is set, ``TaskStateStoreAccessor.set()`` calls ``serialize_task_state_ Garbage collection semantics ----------------------------- -The cleanup task, also known as "garbage collection" is triggered using the Airflow CLI. The command to trigger the cleanup task is ``airflow state-store cleanup-task-state-store``. This process removes store rows according to the following rules: +The cleanup task, also known as "garbage collection" is triggered using the Airflow CLI. The command to trigger the cleanup task is ``airflow state-store clean``. This process removes store rows according to the following rules: **Time-based expiry (task state store only)** Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. @@ -115,7 +115,7 @@ The cleanup task, also known as "garbage collection" is triggered using the Airf .. important:: - Garbage collection only works for the ``MetastoreStateBackend``. Custom backends are explicitly skipped. + Garbage collection only works for the ``MetastoreBackend``. Custom backends are explicitly skipped. @@ -207,5 +207,5 @@ Example skeleton: return json.loads(s3_object["Body"].read().decode()) # Implement the remaining abstract methods as pass-throughs or delegating to the - # default MetastoreStateBackend for the DB side + # default MetastoreBackend for the DB side ... diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 44a9fd47c23b5..46a0ebfa391f0 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -1684,14 +1684,15 @@ class GroupCommand(NamedTuple): ) STATE_STORE_COMMANDS = ( ActionCommand( - name="cleanup-task-state-store", - help="Remove expired task state store rows (MetastoreBackend only)", + name="clean", + help="Remove expired task state store rows (metastore backend only)", description=( - "Reads [state_store] default_retention_days from config and deletes task_state_store rows " - "older than the configured threshold. Only applies when MetastoreBackend is configured; " - "custom backends are skipped. Use --dry-run to preview without deleting." + "Deletes task_state_store rows whose expires_at is in the past, honoring the state_store " + "settings default_retention_days and state_cleanup_batch_size. Currently supports the " + "default metastore backend only; custom (worker-side) backends are skipped. Use --dry-run " + "to preview deletions." ), - func=lazy_load_command("airflow.cli.commands.state_store_command.cleanup_task_state_store"), + func=lazy_load_command("airflow.cli.commands.state_store_command.clean_state_store"), args=(ARG_DB_DRY_RUN, ARG_VERBOSE), ), ) diff --git a/airflow-core/src/airflow/cli/commands/state_store_command.py b/airflow-core/src/airflow/cli/commands/state_store_command.py index 652c40d3445bc..c5139522d6135 100644 --- a/airflow-core/src/airflow/cli/commands/state_store_command.py +++ b/airflow-core/src/airflow/cli/commands/state_store_command.py @@ -26,12 +26,23 @@ # Other state operations (list, get, delete per key) will be added here in the future. -def cleanup_task_state_store(args) -> None: - """Remove expired task state store rows (MetastoreBackend only).""" +def clean_state_store(args) -> None: + """ + Remove expired task state store rows from the metastore backend. + + Deliberately restricted to ``MetastoreBackend`` for now. A custom backend is typically + worker-side (object storage, etc.); cleaning it correctly means deleting the metadata-DB + refs *and* the backend data in order, which has to run where the backend and its + dependencies live (the worker), not on the server-side CLI. Until that experience exists, + the command skips custom backends rather than half-cleaning them. + """ backend = get_state_backend() if not isinstance(backend, MetastoreBackend): - print("Custom backend configured — skipping cleanup (not supported).") + print( + f"Custom state store backend configured ({type(backend).__name__}); skipping. " + f"The clean command currently supports the metastore backend only." + ) return if args.dry_run: diff --git a/airflow-core/tests/unit/cli/commands/test_state_store_command.py b/airflow-core/tests/unit/cli/commands/test_state_store_command.py index 652e08fe11055..d777d41afd7e7 100644 --- a/airflow-core/tests/unit/cli/commands/test_state_store_command.py +++ b/airflow-core/tests/unit/cli/commands/test_state_store_command.py @@ -22,21 +22,26 @@ import pytest -from airflow.cli.commands.state_store_command import cleanup_task_state_store +from airflow._shared.state import BaseStoreBackend +from airflow.cli.commands.state_store_command import clean_state_store from airflow.state.metastore import MetastoreBackend pytestmark = pytest.mark.db_test -class TestStateStoreCleanupCommand: - def test_cleanup_calls_backend(self): +class TestStateStoreCleanCommand: + def test_clean_calls_backend(self): args = Namespace(dry_run=False, verbose=False) backend = MetastoreBackend() with ( - mock.patch("airflow.cli.commands.state_store_command.get_state_backend", return_value=backend), - patch.object(backend, "cleanup"), + mock.patch( + "airflow.cli.commands.state_store_command.get_state_backend", + return_value=backend, + autospec=True, + ), + patch.object(backend, "cleanup", autospec=True), ): - cleanup_task_state_store(args) + clean_state_store(args) backend.cleanup.assert_called_once_with() @@ -44,22 +49,29 @@ def test_dry_run_does_not_call_backend(self, capsys): args = Namespace(dry_run=True, verbose=False) backend = MetastoreBackend() with ( - mock.patch("airflow.cli.commands.state_store_command.get_state_backend", return_value=backend), - patch.object(backend, "_summary_dry_run", return_value={"expired": []}), + mock.patch( + "airflow.cli.commands.state_store_command.get_state_backend", + return_value=backend, + autospec=True, + ), + patch.object(backend, "_summary_dry_run", return_value={"expired": []}, autospec=True), ): - cleanup_task_state_store(args) + clean_state_store(args) captured = capsys.readouterr() assert "Nothing to delete" in captured.out def test_custom_backend_is_skipped(self, capsys): + # Custom (non-metastore) backends are intentionally skipped; cleanup() must not run. args = Namespace(dry_run=False, verbose=False) - custom_backend = MagicMock(spec=[]) + custom_backend = MagicMock(spec=BaseStoreBackend) with mock.patch( - "airflow.cli.commands.state_store_command.get_state_backend", return_value=custom_backend + "airflow.cli.commands.state_store_command.get_state_backend", + return_value=custom_backend, + autospec=True, ): - cleanup_task_state_store(args) + clean_state_store(args) captured = capsys.readouterr() - assert "Custom backend configured" in captured.out - assert not hasattr(custom_backend, "cleanup") or not custom_backend.cleanup.called + assert "skipping" in captured.out.lower() + custom_backend.cleanup.assert_not_called()