diff --git a/airflow-core/docs/administration-and-deployment/task-and-asset-state-store-cleanup.rst b/airflow-core/docs/administration-and-deployment/task-and-asset-state-store-cleanup.rst index b20a832576740..58a5073f189fe 100644 --- a/airflow-core/docs/administration-and-deployment/task-and-asset-state-store-cleanup.rst +++ b/airflow-core/docs/administration-and-deployment/task-and-asset-state-store-cleanup.rst @@ -48,7 +48,7 @@ Running cleanup The command is:: - airflow state-store cleanup-task-state-store + airflow state-store clean It reads ``[state_store] default_retention_days`` and ``[state_store] state_cleanup_batch_size`` from the ``airflow.cfg`` file, then deletes all eligible rows. @@ -56,7 +56,7 @@ It reads ``[state_store] default_retention_days`` and ``[state_store] state_clea Use ``--dry-run`` to preview what would be deleted without removing anything:: - airflow state-store cleanup-task-state-store --dry-run + airflow state-store clean --dry-run The output lists every row that would be deleted, grouped by dag, run, task, map index, and key. diff --git a/airflow-core/docs/administration-and-deployment/task-and-asset-state-store.rst b/airflow-core/docs/administration-and-deployment/task-and-asset-state-store.rst index e32393464500b..5ae6c0b1b3331 100644 --- a/airflow-core/docs/administration-and-deployment/task-and-asset-state-store.rst +++ b/airflow-core/docs/administration-and-deployment/task-and-asset-state-store.rst @@ -99,7 +99,7 @@ When this is set, ``TaskStateStoreAccessor.set()`` calls ``serialize_task_state_ Garbage collection semantics ----------------------------- -The cleanup task, also known as "garbage collection" is triggered using the Airflow CLI. The command to trigger the cleanup task is ``airflow state-store cleanup-task-state-store``. This process removes store rows according to the following rules: +The cleanup task, also known as "garbage collection" is triggered using the Airflow CLI. The command to trigger the cleanup task is ``airflow state-store clean``. This process removes store rows according to the following rules: **Time-based expiry (task state store only)** Rows whose ``expires_at < now()`` are deleted. ``expires_at`` is computed on the *worker* at write time, not by the server. @@ -115,7 +115,7 @@ The cleanup task, also known as "garbage collection" is triggered using the Airf .. important:: - Garbage collection only works for the ``MetastoreStateBackend``. Custom backends are explicitly skipped. + Garbage collection only works for the ``MetastoreBackend``. Custom backends are explicitly skipped. @@ -207,5 +207,5 @@ Example skeleton: return json.loads(s3_object["Body"].read().decode()) # Implement the remaining abstract methods as pass-throughs or delegating to the - # default MetastoreStateBackend for the DB side + # default MetastoreBackend for the DB side ... diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 44a9fd47c23b5..46a0ebfa391f0 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -1684,14 +1684,15 @@ class GroupCommand(NamedTuple): ) STATE_STORE_COMMANDS = ( ActionCommand( - name="cleanup-task-state-store", - help="Remove expired task state store rows (MetastoreBackend only)", + name="clean", + help="Remove expired task state store rows (metastore backend only)", description=( - "Reads [state_store] default_retention_days from config and deletes task_state_store rows " - "older than the configured threshold. Only applies when MetastoreBackend is configured; " - "custom backends are skipped. Use --dry-run to preview without deleting." + "Deletes task_state_store rows whose expires_at is in the past, honoring the state_store " + "settings default_retention_days and state_cleanup_batch_size. Currently supports the " + "default metastore backend only; custom (worker-side) backends are skipped. Use --dry-run " + "to preview deletions." ), - func=lazy_load_command("airflow.cli.commands.state_store_command.cleanup_task_state_store"), + func=lazy_load_command("airflow.cli.commands.state_store_command.clean_state_store"), args=(ARG_DB_DRY_RUN, ARG_VERBOSE), ), ) diff --git a/airflow-core/src/airflow/cli/commands/state_store_command.py b/airflow-core/src/airflow/cli/commands/state_store_command.py index 652c40d3445bc..c5139522d6135 100644 --- a/airflow-core/src/airflow/cli/commands/state_store_command.py +++ b/airflow-core/src/airflow/cli/commands/state_store_command.py @@ -26,12 +26,23 @@ # Other state operations (list, get, delete per key) will be added here in the future. -def cleanup_task_state_store(args) -> None: - """Remove expired task state store rows (MetastoreBackend only).""" +def clean_state_store(args) -> None: + """ + Remove expired task state store rows from the metastore backend. + + Deliberately restricted to ``MetastoreBackend`` for now. A custom backend is typically + worker-side (object storage, etc.); cleaning it correctly means deleting the metadata-DB + refs *and* the backend data in order, which has to run where the backend and its + dependencies live (the worker), not on the server-side CLI. Until that experience exists, + the command skips custom backends rather than half-cleaning them. + """ backend = get_state_backend() if not isinstance(backend, MetastoreBackend): - print("Custom backend configured — skipping cleanup (not supported).") + print( + f"Custom state store backend configured ({type(backend).__name__}); skipping. " + f"The clean command currently supports the metastore backend only." + ) return if args.dry_run: diff --git a/airflow-core/tests/unit/cli/commands/test_state_store_command.py b/airflow-core/tests/unit/cli/commands/test_state_store_command.py index 652e08fe11055..d777d41afd7e7 100644 --- a/airflow-core/tests/unit/cli/commands/test_state_store_command.py +++ b/airflow-core/tests/unit/cli/commands/test_state_store_command.py @@ -22,21 +22,26 @@ import pytest -from airflow.cli.commands.state_store_command import cleanup_task_state_store +from airflow._shared.state import BaseStoreBackend +from airflow.cli.commands.state_store_command import clean_state_store from airflow.state.metastore import MetastoreBackend pytestmark = pytest.mark.db_test -class TestStateStoreCleanupCommand: - def test_cleanup_calls_backend(self): +class TestStateStoreCleanCommand: + def test_clean_calls_backend(self): args = Namespace(dry_run=False, verbose=False) backend = MetastoreBackend() with ( - mock.patch("airflow.cli.commands.state_store_command.get_state_backend", return_value=backend), - patch.object(backend, "cleanup"), + mock.patch( + "airflow.cli.commands.state_store_command.get_state_backend", + return_value=backend, + autospec=True, + ), + patch.object(backend, "cleanup", autospec=True), ): - cleanup_task_state_store(args) + clean_state_store(args) backend.cleanup.assert_called_once_with() @@ -44,22 +49,29 @@ def test_dry_run_does_not_call_backend(self, capsys): args = Namespace(dry_run=True, verbose=False) backend = MetastoreBackend() with ( - mock.patch("airflow.cli.commands.state_store_command.get_state_backend", return_value=backend), - patch.object(backend, "_summary_dry_run", return_value={"expired": []}), + mock.patch( + "airflow.cli.commands.state_store_command.get_state_backend", + return_value=backend, + autospec=True, + ), + patch.object(backend, "_summary_dry_run", return_value={"expired": []}, autospec=True), ): - cleanup_task_state_store(args) + clean_state_store(args) captured = capsys.readouterr() assert "Nothing to delete" in captured.out def test_custom_backend_is_skipped(self, capsys): + # Custom (non-metastore) backends are intentionally skipped; cleanup() must not run. args = Namespace(dry_run=False, verbose=False) - custom_backend = MagicMock(spec=[]) + custom_backend = MagicMock(spec=BaseStoreBackend) with mock.patch( - "airflow.cli.commands.state_store_command.get_state_backend", return_value=custom_backend + "airflow.cli.commands.state_store_command.get_state_backend", + return_value=custom_backend, + autospec=True, ): - cleanup_task_state_store(args) + clean_state_store(args) captured = capsys.readouterr() - assert "Custom backend configured" in captured.out - assert not hasattr(custom_backend, "cleanup") or not custom_backend.cleanup.called + assert "skipping" in captured.out.lower() + custom_backend.cleanup.assert_not_called()