From 9950310a80cd415c6bd752b5a963cbc3f167f63c Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 3 Jul 2026 15:06:45 +0900 Subject: [PATCH 1/3] Add streaming task log support to BaseExecutor and FileTaskHandler Reading a running task's log through an executor materializes the whole log in the API server before the bounded LogStreamAccumulator can bound memory, so large logs spike the API server heap. This adds an interface executors can implement to stream log lines lazily instead. --- .../src/airflow/executors/base_executor.py | 15 ++++ .../airflow/utils/log/file_task_handler.py | 70 ++++++++++++----- .../unit/executors/test_base_executor.py | 12 +++ .../unit/utils/log/test_file_task_handler.py | 75 +++++++++++++++++++ 4 files changed, 152 insertions(+), 20 deletions(-) diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index ef5772331108f..2d45dea4c9321 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -71,6 +71,7 @@ def get_execution_api_server_url(conf_source: AirflowConfigParser | ExecutorConf from sqlalchemy.orm import Session + from airflow._shared.logging.remote import StreamingLogResponse from airflow.api_fastapi.auth.tokens import JWTGenerator from airflow.callbacks.base_callback_sink import BaseCallbackSink from airflow.callbacks.callback_requests import CallbackRequest @@ -174,6 +175,7 @@ class BaseExecutor(LoggingMixin): # The connection-test supervisor uses ``signal.SIGALRM`` (via ``TimeoutPosix``) # to bound hook execution. Executors that opt in must run on POSIX systems. supports_connection_test: bool = False + supports_streaming_logs: bool = False sentry_integration: str = "" is_local: bool = False @@ -559,6 +561,19 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li """ return [], [] + def get_streaming_task_log(self, ti: TaskInstance, try_number: int) -> StreamingLogResponse: + """ + Return a streaming response for task logs. + + Executors that implement this method must also set the ``supports_streaming_logs`` class + attribute to ``True``. + + :param ti: A TaskInstance object + :param try_number: current try_number to read log from + :return: StreamingLogResponse + """ + raise NotImplementedError + def end(self) -> None: # pragma: no cover """Wait synchronously for the previously submitted job to complete.""" raise NotImplementedError diff --git a/airflow-core/src/airflow/utils/log/file_task_handler.py b/airflow-core/src/airflow/utils/log/file_task_handler.py index 9eec24b18134e..4ef1099f417e8 100644 --- a/airflow-core/src/airflow/utils/log/file_task_handler.py +++ b/airflow-core/src/airflow/utils/log/file_task_handler.py @@ -30,7 +30,7 @@ from itertools import chain, islice from pathlib import Path from types import GeneratorType -from typing import IO, TYPE_CHECKING, TypedDict, cast +from typing import IO, TYPE_CHECKING, Literal, TypedDict, cast, overload from urllib.parse import urljoin import pendulum @@ -557,27 +557,52 @@ def _render_filename( ) raise RuntimeError(f"Unable to render log filename for {ti}. This should never happen") - def _get_executor_get_task_log( - self, ti: TaskInstance | TaskInstanceHistory - ) -> Callable[[TaskInstance | TaskInstanceHistory, int], tuple[list[str], list[str]]]: + @overload + def _get_executor_log_callable( + self, ti: TaskInstance | TaskInstanceHistory, *, streaming: Literal[True] + ) -> Callable[[TaskInstance | TaskInstanceHistory, int], StreamingLogResponse] | None: ... + + @overload + def _get_executor_log_callable( + self, ti: TaskInstance | TaskInstanceHistory, *, streaming: Literal[False] = ... + ) -> Callable[[TaskInstance | TaskInstanceHistory, int], tuple[list[str], list[str]]]: ... + + def _get_executor_log_callable( + self, ti: TaskInstance | TaskInstanceHistory, *, streaming: bool = False + ) -> ( + Callable[[TaskInstance | TaskInstanceHistory, int], StreamingLogResponse] + | None + | Callable[[TaskInstance | TaskInstanceHistory, int], tuple[list[str], list[str]]] + ): """ - Get the get_task_log method from executor of current task instance. + Get the get_task_log or get_streaming_task_log method from executor of current task instance. Since there might be multiple executors, so we need to get the executor of current task instance instead of getting from default executor. :param ti: task instance object - :return: get_task_log method of the executor + :param streaming: if True, get the get_streaming_task_log method, otherwise get the get_task_log method + :return: get_task_log or get_streaming_task_log method of the executor """ executor_name = ti.executor or self.DEFAULT_EXECUTOR_KEY executor = self.executor_instances.get(executor_name) - if executor is not None: - return executor.get_task_log + if executor is None: + if executor_name == self.DEFAULT_EXECUTOR_KEY: + executor = ExecutorLoader.get_default_executor() + else: + executor = ExecutorLoader.load_executor(executor_name) + self.executor_instances[executor_name] = executor + + if streaming: + # The `supports_streaming_logs` class attribute and `get_streaming_task_log` method was added in Airflow 3.2.0. + # And some of the provider executors or custom executors haven't supported `get_streaming_task_log` yet. + # For backward compatibility with earlier versions, we need to check for their existence. + if hasattr(executor, "get_streaming_task_log") and getattr( + executor, "supports_streaming_logs", False + ): + return executor.get_streaming_task_log + return None - if executor_name == self.DEFAULT_EXECUTOR_KEY: - self.executor_instances[executor_name] = ExecutorLoader.get_default_executor() - else: - self.executor_instances[executor_name] = ExecutorLoader.load_executor(executor_name) - return self.executor_instances[executor_name].get_task_log + return executor.get_task_log def _read( self, @@ -632,23 +657,28 @@ def _read( raise TypeError("Logs should be either a list of strings or a generator of log lines.") # Extend LogSourceInfo source_list.extend(sources) - has_k8s_exec_pod = False + + has_executor_log = False if ti.state == TaskInstanceState.RUNNING: - executor_get_task_log = self._get_executor_get_task_log(ti) - response = executor_get_task_log(ti, try_number) - if response: - sources, logs = response + # check for streaming logs first + if executor_streaming_get_task_log := self._get_executor_log_callable(ti, streaming=True): + sources, executor_logs = executor_streaming_get_task_log(ti, try_number) + else: # fallback to non-streaming logs if streaming not supported + executor_get_task_log = self._get_executor_log_callable(ti) + sources, logs = executor_get_task_log(ti, try_number) # make the logs stream-like compatible executor_logs = [_get_compatible_log_stream(logs)] + if sources: source_list.extend(sources) - has_k8s_exec_pod = True + has_executor_log = True + if not (remote_logs and ti.state not in State.unfinished): # when finished, if we have remote logs, no need to check local worker_log_full_path = Path(self.local_base, worker_log_rel_path) sources, local_logs = self._read_from_local(worker_log_full_path) source_list.extend(sources) - if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_k8s_exec_pod: + if ti.state in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED) and not has_executor_log: sources, served_logs = self._read_from_logs_server(ti, worker_log_rel_path) source_list.extend(sources) elif (ti.state not in State.unfinished or ti.state in _STATES_WITH_COMPLETED_ATTEMPT) and not ( diff --git a/airflow-core/tests/unit/executors/test_base_executor.py b/airflow-core/tests/unit/executors/test_base_executor.py index cb646f7ce8d55..78d5e078ba0f0 100644 --- a/airflow-core/tests/unit/executors/test_base_executor.py +++ b/airflow-core/tests/unit/executors/test_base_executor.py @@ -67,6 +67,10 @@ def test_supports_multi_team_default_value(): assert not BaseExecutor.supports_multi_team +def test_supports_streaming_logs_default_value(): + assert not BaseExecutor.supports_streaming_logs + + def test_invalid_slotspool(): with pytest.raises(ValueError, match="parallelism is set to 0 or lower"): BaseExecutor(0) @@ -78,6 +82,14 @@ def test_get_task_log(): assert executor.get_task_log(ti=ti, try_number=1) == ([], []) +def test_get_streaming_task_log_not_implemented(): + executor = BaseExecutor() + ti = TaskInstance(task=SerializedBaseOperator(task_id="dummy"), dag_version_id=mock.MagicMock(spec=UUID)) + + with pytest.raises(NotImplementedError): + executor.get_streaming_task_log(ti=ti, try_number=1) + + def test_serve_logs_default_value(): assert not BaseExecutor.serve_logs diff --git a/airflow-core/tests/unit/utils/log/test_file_task_handler.py b/airflow-core/tests/unit/utils/log/test_file_task_handler.py index ceecffafa3265..c46f5af9d2476 100644 --- a/airflow-core/tests/unit/utils/log/test_file_task_handler.py +++ b/airflow-core/tests/unit/utils/log/test_file_task_handler.py @@ -21,6 +21,9 @@ from unittest.mock import MagicMock, patch from airflow.utils.log.file_task_handler import FileTaskHandler +from airflow.utils.state import TaskInstanceState + +from tests_common.test_utils.file_task_handler import convert_list_to_stream, extract_events class TestFileTaskHandlerLogServer: @@ -220,3 +223,75 @@ def test_handles_base_log_folder_that_is_itself_a_symlink(self, tmp_path): assert len(sources) == 1 assert "through-symlink content" in self._drain(streams[0]) + + +class TestFileTaskHandlerExecutorLogs: + """Tests for executor log retrieval selection.""" + + @staticmethod + def _running_ti(executor_name: str) -> MagicMock: + ti = MagicMock() + ti.executor = executor_name + ti.state = TaskInstanceState.RUNNING + ti.try_number = 1 + return ti + + def test_running_task_prefers_streaming_executor_logs(self): + """Use executor streaming logs when the executor advertises streaming support.""" + handler = FileTaskHandler(base_log_folder="") + executor = MagicMock() + executor.supports_streaming_logs = True + executor.get_streaming_task_log.return_value = ( + ["streaming source"], + [convert_list_to_stream(["streaming log"])], + ) + executor.get_task_log.return_value = (["legacy source"], ["legacy log"]) + handler.executor_instances = {"StreamingExecutor": executor} + ti = self._running_ti("StreamingExecutor") + + with ( + patch.object(handler, "_render_filename", return_value="dag/run/task/1.log"), + patch.object(handler, "_read_remote_logs", side_effect=NotImplementedError), + patch.object(handler, "_read_from_local", return_value=([], [])), + patch.object(handler, "_read_from_logs_server", return_value=([], [])) as read_from_logs_server, + ): + logs, metadata = handler._read(ti=ti, try_number=1) + + executor.get_streaming_task_log.assert_called_once_with(ti, 1) + executor.get_task_log.assert_not_called() + read_from_logs_server.assert_not_called() + assert extract_events(logs, skip_source_info=False) == [ + "::group::Log message source details", + "streaming source", + "::endgroup::", + "streaming log", + ] + assert metadata == {"end_of_log": False, "log_pos": 1} + + def test_running_task_falls_back_to_legacy_executor_logs(self): + """Use legacy executor logs when streaming support is not advertised.""" + handler = FileTaskHandler(base_log_folder="") + executor = MagicMock() + executor.supports_streaming_logs = False + executor.get_task_log.return_value = (["legacy source"], ["legacy log"]) + handler.executor_instances = {"LegacyExecutor": executor} + ti = self._running_ti("LegacyExecutor") + + with ( + patch.object(handler, "_render_filename", return_value="dag/run/task/1.log"), + patch.object(handler, "_read_remote_logs", side_effect=NotImplementedError), + patch.object(handler, "_read_from_local", return_value=([], [])), + patch.object(handler, "_read_from_logs_server", return_value=([], [])) as read_from_logs_server, + ): + logs, metadata = handler._read(ti=ti, try_number=1) + + executor.get_streaming_task_log.assert_not_called() + executor.get_task_log.assert_called_once_with(ti, 1) + read_from_logs_server.assert_not_called() + assert extract_events(logs, skip_source_info=False) == [ + "::group::Log message source details", + "legacy source", + "::endgroup::", + "legacy log", + ] + assert metadata == {"end_of_log": False, "log_pos": 1} From 5521de3873f96ce9a7c9f95c2f7ac7599e13955b Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 3 Jul 2026 15:16:41 +0900 Subject: [PATCH 2/3] Add streaming task log support to KubernetesExecutor Fetching a running task's pod log materialized every line in the API server before serving it. Streaming the lines lazily through the new BaseExecutor.get_streaming_task_log interface lets the bounded log accumulator cap resident memory while serving large logs. --- .../tests/unit/utils/test_log_handlers.py | 7 ++- .../executors/celery_kubernetes_executor.py | 8 +++ .../test_celery_kubernetes_executor.py | 24 ++++++++ .../executors/kubernetes_executor.py | 34 +++++++++-- .../executors/local_kubernetes_executor.py | 8 +++ .../executors/test_kubernetes_executor.py | 58 +++++++++++++++++-- .../test_local_kubernetes_executor.py | 24 ++++++++ .../log_handlers/test_log_handlers.py | 12 ++-- 8 files changed, 155 insertions(+), 20 deletions(-) diff --git a/airflow-core/tests/unit/utils/test_log_handlers.py b/airflow-core/tests/unit/utils/test_log_handlers.py index 24a2e367a3854..8b86264eb5577 100644 --- a/airflow-core/tests/unit/utils/test_log_handlers.py +++ b/airflow-core/tests/unit/utils/test_log_handlers.py @@ -291,8 +291,9 @@ def test_file_task_handler_with_multiple_executors( else: path_to_executor_class = executors_mapping.get(executor_name) - with patch(f"{path_to_executor_class}.get_task_log", return_value=([], [])) as mock_get_task_log: - mock_get_task_log.return_value = ([], []) + with patch( + f"{path_to_executor_class}.get_streaming_task_log", return_value=([], []) + ) as mock_get_streaming_task_log: ti = create_task_instance( dag_id="dag_for_testing_multiple_executors", task_id="task_for_testing_multiple_executors", @@ -326,7 +327,7 @@ def test_file_task_handler_with_multiple_executors( assert hasattr(file_handler, "read") file_handler.read(ti) os.remove(log_filename) - mock_get_task_log.assert_called_once() + mock_get_streaming_task_log.assert_called_once() if executor_name is None: mock_get_default_executor.assert_called_once() diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index f66c153a6d7a8..d726f895f8440 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -30,6 +30,7 @@ from airflow.utils.providers_configuration_loader import providers_configuration_loaded if TYPE_CHECKING: + from airflow._shared.logging.remote import StreamingLogResponse from airflow.callbacks.base_callback_sink import BaseCallbackSink from airflow.callbacks.callback_requests import CallbackRequest from airflow.cli.cli_config import GroupCommand @@ -55,6 +56,7 @@ class CeleryKubernetesExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + supports_streaming_logs: bool = True # TODO: Remove this flag once providers depend on Airflow 3.0 supports_pickling: bool = True supports_sentry: bool = False @@ -206,6 +208,12 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li return self.kubernetes_executor.get_task_log(ti=ti, try_number=try_number) return [], [] + def get_streaming_task_log(self, ti: TaskInstance, try_number: int) -> StreamingLogResponse: + """Fetch streaming task log from Kubernetes executor.""" + if ti.queue == self.kubernetes_executor.kubernetes_queue: + return self.kubernetes_executor.get_streaming_task_log(ti=ti, try_number=try_number) + return [], [] + def has_task(self, task_instance: TaskInstance) -> bool: """ Check if a task is either queued or running in either celery or kubernetes executor. diff --git a/providers/celery/tests/unit/celery/executors/test_celery_kubernetes_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_kubernetes_executor.py index 000dcdf5ba659..51baff081976a 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_kubernetes_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_kubernetes_executor.py @@ -49,6 +49,9 @@ def test_is_production_default_value(self): def test_serve_logs_default_value(self): assert not CeleryKubernetesExecutor.serve_logs + def test_supports_streaming_logs(self): + assert CeleryKubernetesExecutor.supports_streaming_logs + def test_cli_commands_vended(self): assert CeleryKubernetesExecutor.get_cli_commands() @@ -198,6 +201,27 @@ def test_log_is_fetched_from_k8s_executor_only_for_k8s_queue(self): k8s_executor_mock.get_task_log.assert_not_called() assert log == ([], []) + def test_streaming_log_is_fetched_from_k8s_executor_only_for_k8s_queue(self): + celery_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + cke = CeleryKubernetesExecutor(celery_executor_mock, k8s_executor_mock) + simple_task_instance = mock.MagicMock() + simple_task_instance.queue = KUBERNETES_QUEUE + + cke.get_streaming_task_log(ti=simple_task_instance, try_number=1) + + k8s_executor_mock.get_streaming_task_log.assert_called_once_with( + ti=simple_task_instance, try_number=1 + ) + + k8s_executor_mock.reset_mock() + simple_task_instance.queue = "test-queue" + + log = cke.get_streaming_task_log(ti=simple_task_instance, try_number=1) + + k8s_executor_mock.get_streaming_task_log.assert_not_called() + assert log == ([], []) + def test_get_event_buffer(self): celery_executor_mock = mock.MagicMock() k8s_executor_mock = mock.MagicMock() diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 46137915afbbe..665a586036ca8 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -30,8 +30,10 @@ import multiprocessing import time from collections import Counter, defaultdict +from collections.abc import Iterable from contextlib import suppress from datetime import datetime, timedelta +from itertools import chain from queue import Empty, Queue from typing import TYPE_CHECKING, Any @@ -66,6 +68,7 @@ from kubernetes.client import models as k8s from sqlalchemy.orm import Session + from airflow._shared.logging.remote import RawLogStream, StreamingLogResponse from airflow.cli.cli_config import GroupCommand from airflow.executors import workloads from airflow.models.taskinstance import TaskInstance @@ -81,6 +84,7 @@ class KubernetesExecutor(BaseExecutor): RUNNING_POD_LOG_LINES = 100 supports_ad_hoc_ti_run: bool = True supports_multi_team: bool = True + supports_streaming_logs: bool = True if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS: # In the v3 path, we store workloads, not commands as strings. @@ -611,8 +615,23 @@ def _get_pod_namespace(self, ti: TaskInstance): return namespace or self.conf.get("kubernetes_executor", "namespace") def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: - messages = [] - log = [] + messages: list[str] = [] + log: list[str] = [] + try: + messages, log_streams = self.get_streaming_task_log(ti, try_number) + log = ["\n".join(stream) for stream in log_streams] + except Exception as e: + messages.append(f"Reading from k8s pod logs failed: {e}") + return messages, log or [""] + + def get_streaming_task_log(self, ti: TaskInstance, try_number: int) -> StreamingLogResponse: + messages: list[str] = [] + log_streams: list[RawLogStream] = [] + + def create_log_stream(logs: Iterable[bytes]) -> RawLogStream: + for line in logs: + yield remove_escape_codes(line.decode()) + try: from airflow.providers.cncf.kubernetes.kube_client import get_kube_client from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator @@ -645,13 +664,16 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li tail_lines=self.RUNNING_POD_LOG_LINES, _preload_content=False, ) - for line in res: - log.append(remove_escape_codes(line.decode())) - if log: + + log_iter = iter(res) + first_line = next(log_iter, None) + if first_line is not None: + log_streams.append(create_log_stream(chain([first_line], log_iter))) messages.append("Found logs through kube API") except Exception as e: messages.append(f"Reading from k8s pod logs failed: {e}") - return messages, ["\n".join(log)] + + return messages, log_streams def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: with Stats.timer( diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 274ba81170471..496ca41d08d1a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -28,6 +28,7 @@ from airflow.providers.common.compat.sdk import conf if TYPE_CHECKING: + from airflow._shared.logging.remote import StreamingLogResponse from airflow.callbacks.base_callback_sink import BaseCallbackSink from airflow.callbacks.callback_requests import CallbackRequest from airflow.cli.cli_config import GroupCommand @@ -53,6 +54,7 @@ class LocalKubernetesExecutor(BaseExecutor): """ supports_ad_hoc_ti_run: bool = True + supports_streaming_logs: bool = True # TODO: Remove this attribute once providers rely on Airflow >=3.0.0 supports_pickling: bool = False supports_sentry: bool = False @@ -201,6 +203,12 @@ def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], li return self.kubernetes_executor.get_task_log(ti=ti, try_number=try_number) return [], [] + def get_streaming_task_log(self, ti: TaskInstance, try_number: int) -> StreamingLogResponse: + """Fetch streaming task log from kubernetes executor.""" + if ti.queue == self.kubernetes_executor.kubernetes_queue: + return self.kubernetes_executor.get_streaming_task_log(ti=ti, try_number=try_number) + return [], [] + def has_task(self, task_instance: TaskInstance) -> bool: """ Check if a task is either queued or running in either local or kubernetes executor. diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 285b263b064e1..d08bf0a13ea23 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -2323,9 +2323,8 @@ def test_kube_config_get_namespace_list( assert executor.kube_config.multi_namespace_mode_namespace_list == expected_value_in_kube_config - @pytest.mark.db_test @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") - def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operator): + def test_get_streaming_task_log(self, mock_get_kube_client): """fetch task log from pod""" mock_kube_client = mock_get_kube_client.return_value @@ -2333,22 +2332,68 @@ def test_get_task_log(self, mock_get_kube_client, create_task_instance_of_operat mock_pod = mock.Mock() mock_pod.metadata.name = "x" mock_kube_client.list_namespaced_pod.return_value.items = [mock_pod] - ti = create_task_instance_of_operator(EmptyOperator, dag_id="test_k8s_log_dag", task_id="test_task") + ti = mock.MagicMock( + dag_id="test_k8s_log_dag", + task_id="test_task", + map_index=-1, + run_id="test_run", + queued_by_job_id=None, + hostname="", + executor_config={}, + ) executor = KubernetesExecutor() - messages, logs = executor.get_task_log(ti=ti, try_number=1) + messages, log_streams = executor.get_streaming_task_log(ti=ti, try_number=1) mock_kube_client.read_namespaced_pod_log.assert_called_once() assert messages == [ "Attempting to fetch logs from pod through kube API", "Found logs through kube API", ] - assert logs[0] == "a_\nb_\nc_" + assert list(log_streams[0]) == ["a_", "b_", "c_"] + + mock_kube_client.reset_mock() + mock_kube_client.read_namespaced_pod_log.side_effect = Exception("error_fetching_pod_log") + + messages, log_streams = executor.get_streaming_task_log(ti=ti, try_number=1) + assert log_streams == [] + assert messages == [ + "Attempting to fetch logs from pod through kube API", + "Reading from k8s pod logs failed: error_fetching_pod_log", + ] + + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_get_task_log(self, mock_get_kube_client): + """Fetch legacy task log response from pod.""" + mock_kube_client = mock_get_kube_client.return_value + mock_kube_client.read_namespaced_pod_log.return_value = [b"a_", b"b_", b"c_"] + mock_pod = mock.Mock() + mock_pod.metadata.name = "x" + mock_kube_client.list_namespaced_pod.return_value.items = [mock_pod] + ti = mock.MagicMock( + dag_id="test_k8s_log_dag", + task_id="test_task", + map_index=-1, + run_id="test_run", + queued_by_job_id=None, + hostname="", + executor_config={}, + ) + + executor = KubernetesExecutor() + messages, logs = executor.get_task_log(ti=ti, try_number=1) + + assert messages == [ + "Attempting to fetch logs from pod through kube API", + "Found logs through kube API", + ] + assert logs == ["a_\nb_\nc_"] mock_kube_client.reset_mock() mock_kube_client.read_namespaced_pod_log.side_effect = Exception("error_fetching_pod_log") messages, logs = executor.get_task_log(ti=ti, try_number=1) + assert logs == [""] assert messages == [ "Attempting to fetch logs from pod through kube API", @@ -2422,6 +2467,9 @@ def test_sentry_integration(self): def test_supports_sentry(self): assert not KubernetesExecutor.supports_sentry + def test_supports_streaming_logs(self): + assert KubernetesExecutor.supports_streaming_logs is True + def test_cli_commands_vended(self): assert KubernetesExecutor.get_cli_commands() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_local_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_local_kubernetes_executor.py index 69f291c574f9e..98d873374fdd3 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_local_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_local_kubernetes_executor.py @@ -48,6 +48,9 @@ def test_is_production_default_value(self): def test_serve_logs_default_value(self): assert LocalKubernetesExecutor.serve_logs + def test_supports_streaming_logs(self): + assert LocalKubernetesExecutor.supports_streaming_logs + def test_cli_commands_vended(self): assert LocalKubernetesExecutor.get_cli_commands() @@ -112,6 +115,27 @@ def test_log_is_fetched_from_k8s_executor_only_for_k8s_queue(self): assert logs == [] assert messages == [] + def test_streaming_log_is_fetched_from_k8s_executor_only_for_k8s_queue(self): + local_executor_mock = mock.MagicMock() + k8s_executor_mock = mock.MagicMock() + local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock) + simple_task_instance = mock.MagicMock() + simple_task_instance.queue = conf.get("local_kubernetes_executor", "kubernetes_queue") + + local_k8s_exec.get_streaming_task_log(ti=simple_task_instance, try_number=3) + + k8s_executor_mock.get_streaming_task_log.assert_called_once_with( + ti=simple_task_instance, try_number=3 + ) + + k8s_executor_mock.reset_mock() + simple_task_instance.queue = "test-queue" + messages, logs = local_k8s_exec.get_streaming_task_log(ti=simple_task_instance, try_number=3) + + k8s_executor_mock.get_streaming_task_log.assert_not_called() + assert logs == [] + assert messages == [] + def test_send_callback(self): local_executor_mock = mock.MagicMock() k8s_executor_mock = mock.MagicMock() diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/log_handlers/test_log_handlers.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/log_handlers/test_log_handlers.py index 1a95dd4349b32..3702d645b1171 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/log_handlers/test_log_handlers.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/log_handlers/test_log_handlers.py @@ -74,13 +74,13 @@ def teardown_method(self): self.clean_up() @mock.patch( - "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_task_log" + "airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor.get_streaming_task_log" ) @pytest.mark.parametrize("state", [TaskInstanceState.RUNNING, TaskInstanceState.SUCCESS]) @pytest.mark.usefixtures("clean_executor_loader") - def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instance, state): - """Test for k8s executor, the log is read from get_task_log method""" - mock_k8s_get_task_log.return_value = ([], []) + def test__read_for_k8s_executor(self, mock_k8s_get_streaming_task_log, create_task_instance, state): + """Test for k8s executor, the log is read from get_streaming_task_log method.""" + mock_k8s_get_streaming_task_log.return_value = ([], []) executor_name = "KubernetesExecutor" ti = create_task_instance( dag_id="dag_for_testing_k8s_executor_log_read", @@ -96,9 +96,9 @@ def test__read_for_k8s_executor(self, mock_k8s_get_task_log, create_task_instanc fth = FileTaskHandler("") fth._read(ti=ti, try_number=2) if state == TaskInstanceState.RUNNING: - mock_k8s_get_task_log.assert_called_once_with(ti, 2) + mock_k8s_get_streaming_task_log.assert_called_once_with(ti, 2) else: - mock_k8s_get_task_log.assert_not_called() + mock_k8s_get_streaming_task_log.assert_not_called() @pytest.mark.parametrize( ("pod_override", "namespace_to_call"), From f8ab3d9ec0d970defea17dbe11492132e29b98ae Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 3 Jul 2026 22:29:20 +0900 Subject: [PATCH 3/3] Fix CI error --- docs/spelling_wordlist.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index eb25e39ebefe9..10e5b6eabd684 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1,5 +1,3 @@ -aarch -abc AbstractFileSystem AbstractToolset accessor @@ -1608,6 +1606,7 @@ StoredInfoType storedInfoType str Streamable +StreamingLogResponse strftime Stringified stringified