Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,7 @@ def terminate_orphan_processes(self, present: set[DagFileInfo]):
),
)
processor.kill(signal.SIGKILL)
processor.logger_filehandle.close()
processor.close()
self._file_stats.pop(file, None)

@provide_session
Expand Down Expand Up @@ -1312,7 +1312,7 @@ def _collect_results(self):

for file in finished:
processor = self._processors.pop(file)
processor.logger_filehandle.close()
processor.close()

def _get_log_dir(self) -> str:
return os.path.join(self.base_log_dir, timezone.utcnow().strftime("%Y-%m-%d"))
Expand Down Expand Up @@ -1618,7 +1618,7 @@ def _kill_timed_out_processors(self):
# Clean up `self._processors` after iterating over it
for proc in processors_to_remove:
processor = self._processors.pop(proc)
processor.logger_filehandle.close()
processor.close()

def _add_files_to_queue(
self,
Expand Down
13 changes: 12 additions & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
from airflow.utils.dag_version_inflation_checker import check_dag_file_stability
from airflow.utils.file import iter_airflow_imports
from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
Expand Down Expand Up @@ -550,7 +551,7 @@ def in_process_api_server() -> InProcessExecutionAPI:


@attrs.define(kw_only=True)
class DagFileProcessorProcess(WatchedSubprocess):
class DagFileProcessorProcess(WatchedSubprocess, LoggingMixin):
"""
Parses dags with Task SDK API.

Expand Down Expand Up @@ -730,3 +731,13 @@ def is_ready(self) -> bool:

def wait(self) -> int:
raise NotImplementedError(f"Don't call wait on {type(self).__name__} objects")

def close(self):
try:
self.logger_filehandle.close()
except OSError:
self.log.warning(
"Failed to close log file handle for %s",
self.dag_file_rel_path,
exc_info=True,
)
53 changes: 53 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,21 @@ def test_terminate_orphan_processes_kills_processor_when_file_is_truly_absent(se
)
processor.kill.assert_called_once_with(signal.SIGKILL)

def test_terminate_orphan_processes_tolerates_stale_file_handle_on_close(self):
"""A stale NFS file handle on close (e.g. OpenShift) must not crash the manager."""
manager = DagFileProcessorManager(max_runs=1)
versioned_file = _get_versioned_file_info("callbacks.py")
processor = MagicMock()
processor.logger_filehandle.close.side_effect = OSError(116, "Stale file handle")

manager._processors[versioned_file] = processor

with mock.patch("airflow.dag_processing.manager.stats.decr"):
manager.terminate_orphan_processes(present=set())

assert manager._processors == {}
processor.kill.assert_called_once_with(signal.SIGKILL)

def test_remove_orphaned_file_stats_keeps_versioned_callback_stats_when_unversioned_file_is_present(self):
manager = DagFileProcessorManager(max_runs=1)
versioned_file = _get_versioned_file_info("callbacks.py")
Expand Down Expand Up @@ -1184,6 +1199,26 @@ def test_kill_timed_out_processors_kill(self):
assert len(manager._processors) == 0
processor.logger_filehandle.close.assert_called()

def test_kill_timed_out_processors_tolerates_stale_file_handle_on_close(self):
"""A stale NFS file handle on close (e.g. OpenShift) must not crash the manager."""
manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)
start_time = time.monotonic() - manager.processor_timeout - 1
processor, _ = self.mock_processor(start_time=start_time)
processor.logger_filehandle.close.side_effect = OSError(116, "Stale file handle")
manager._processors = {
DagFileInfo(
bundle_name="testing", rel_path=Path("abc.py"), bundle_path=TEST_DAGS_FOLDER
): processor
}
with (
mock.patch.object(type(processor), "kill"),
mock.patch("airflow.dag_processing.manager.stats.decr"),
mock.patch("airflow.dag_processing.manager.stats.incr"),
):
manager._kill_timed_out_processors()

assert len(manager._processors) == 0

def test_kill_timed_out_processors_no_kill(self):
manager = DagFileProcessorManager(
max_runs=1,
Expand Down Expand Up @@ -1338,6 +1373,24 @@ def test_collect_results_processes_remaining_files_when_one_persist_fails(self,
assert manager._file_stats[file_b].run_count == 2
assert len(manager._processors) == 0

def test_collect_results_tolerates_stale_file_handle_on_close(self):
"""A stale NFS file handle on close (e.g. OpenShift) must not crash the manager."""
manager = DagFileProcessorManager(max_runs=1)
file = DagFileInfo(bundle_name="testing", rel_path=Path("a.py"), bundle_path=TEST_DAGS_FOLDER)
manager._file_stats[file] = DagFileStat()
manager._bundle_versions["testing"] = "v1"

proc, _ = self.mock_processor(start_time=time.monotonic() - 1)
proc.had_callbacks = False
proc.parsing_result = DagFileParsingResult(fileloc="a.py", serialized_dags=[])
proc.logger_filehandle.close.side_effect = OSError(116, "Stale file handle")
manager._processors = {file: proc}

with mock.patch.object(manager, "persist_parsing_result"):
manager._collect_results()

assert len(manager._processors) == 0

@pytest.mark.usefixtures("testing_dag_bundle")
@pytest.mark.parametrize(
("callbacks", "path", "expected_body"),
Expand Down
Loading