From 8c4fdb65c756a6af5e87425aaf18809b8a58b4cb Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Fri, 3 Jul 2026 12:59:32 +0530 Subject: [PATCH 1/2] Prevent malformed Elasticsearch log entries from crashing task log fetch A single stored log entry with a non-string `event` field (for example, a task that logs a list or dict as the sole message argument) currently crashes the entire task-log-fetch request with an unhandled pydantic.ValidationError, instead of degrading gracefully. _read() built StructuredLogMessage objects from stored Elasticsearch hits without catching validation failures. This catches ValidationError per hit and falls back to a stringified event, matching the existing fallback pattern in file_task_handler.py's _log_stream_to_parsed_log_stream. closes: #69304 --- .../elasticsearch/log/es_task_handler.py | 28 +++++++++++- .../elasticsearch/log/test_es_task_handler.py | 43 +++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py index 3e2fdcc366db9..53225acb3aa1d 100644 --- a/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py @@ -40,6 +40,7 @@ import pendulum from elasticsearch import helpers from elasticsearch.exceptions import NotFoundError +from pydantic import ValidationError import airflow.logging_config as alc from airflow.exceptions import AirflowProviderDeprecationWarning @@ -75,6 +76,8 @@ EsLogMsgType = list[tuple[str, str]] # type: ignore[assignment,misc] +logger = logging.getLogger(__name__) + LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""} # Elasticsearch hosted log type @@ -139,6 +142,29 @@ def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]: return fields +def _safe_build_structured_log_message(hit_dict: dict[str, Any]) -> StructuredLogMessage: + """ + Build a StructuredLogMessage from a stored Elasticsearch hit, tolerating malformed fields. + + A single malformed stored log entry (for example a non-string ``event`` produced by + logging a list or dict as the sole message argument) must not fail the entire + log-fetch request. Fall back to a stringified event, mirroring the fallback used for + unparsable raw log lines in ``_log_stream_to_parsed_log_stream``. + """ + fields = _build_log_fields(hit_dict) + try: + return StructuredLogMessage(**fields) + except ValidationError: + logger.warning( + "Failed to parse stored log entry into StructuredLogMessage; falling back to " + "stringified event. Offending fields: %s", + fields, + ) + return StructuredLogMessage( + event=str(fields.get("event", hit_dict)), timestamp=fields.get("timestamp") + ) + + VALID_ES_CONFIG_KEYS = set(inspect.signature(elasticsearch.Elasticsearch.__init__).parameters.keys()) # Remove `self` from the valid set of kwargs VALID_ES_CONFIG_KEYS.remove("self") @@ -461,7 +487,7 @@ def _read( # Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects message = header + [ - StructuredLogMessage(**_build_log_fields(hit.to_dict())) + _safe_build_structured_log_message(hit.to_dict()) for hits in logs_by_host.values() for hit in hits ] diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index 56c6021408917..fa6e5813ffc2e 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -43,6 +43,7 @@ _clean_date, _format_error_detail, _render_log_id, + _safe_build_structured_log_message, _strip_userinfo, get_es_kwargs_from_config, getattr_nested, @@ -380,6 +381,31 @@ def test_read_with_custom_offset_and_host_fields(self, ti): assert metadata["offset"] == "1" assert not metadata["end_of_log"] + @pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage fallback is Airflow 3+ only") + @pytest.mark.db_test + def test_read_with_malformed_event_falls_back_to_stringified_event(self, ti): + malformed_event = ["not", "a", "string"] + malformed_source = { + "message": self.test_message, + "event": malformed_event, + "log_id": self.LOG_ID, + "offset": 2, + } + response = _make_es_response(self.es_task_handler.io, self.base_log_source, malformed_source) + + with patch.object(self.es_task_handler.io, "_es_read", return_value=response): + with patch("airflow.providers.elasticsearch.log.es_task_handler.logger") as mock_logger: + logs, metadatas = self.es_task_handler.read(ti, 1) + + metadata = _assert_log_events( + logs, + metadatas, + expected_events=[self.test_message, str(malformed_event)], + expected_sources=["http://localhost:9200"], + ) + assert not metadata["end_of_log"] + mock_logger.warning.assert_called_once() + @pytest.mark.db_test def test_set_context(self, ti): self.es_task_handler.set_context(ti) @@ -934,3 +960,20 @@ def test_error_detail_dropped_when_empty(self): hit = {"event": "msg", "error_detail": []} result = _build_log_fields(hit) assert "error_detail" not in result + + +class TestSafeBuildStructuredLogMessage: + def test_string_event_returns_unchanged_and_does_not_warn(self): + hit = {"event": "hello", "level": "info"} + with patch("airflow.providers.elasticsearch.log.es_task_handler.logger") as mock_logger: + result = _safe_build_structured_log_message(hit) + assert result.event == "hello" + mock_logger.warning.assert_not_called() + + def test_non_string_event_falls_back_to_stringified_event(self): + hit = {"event": ["a", "b"], "timestamp": "2024-01-01T00:00:00Z"} + with patch("airflow.providers.elasticsearch.log.es_task_handler.logger") as mock_logger: + result = _safe_build_structured_log_message(hit) + assert result.event == str(["a", "b"]) + assert result.timestamp is not None + mock_logger.warning.assert_called_once() From 2a2777037de1c580886dc95338f712f3332cff38 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Fri, 3 Jul 2026 22:41:32 +0530 Subject: [PATCH 2/2] Fix Elasticsearch compat tests on Airflow 2.11 The new StructuredLogMessage fallback tests exercise an Airflow 3-only type, but the provider compatibility matrix still runs this provider against Airflow 2.11.1. Guarding those tests the same way as the runtime path keeps the compatibility job focused on behavior that actually exists in that release line. --- .../tests/unit/elasticsearch/log/test_es_task_handler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py index fa6e5813ffc2e..44629a86fb7fd 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py @@ -962,6 +962,7 @@ def test_error_detail_dropped_when_empty(self): assert "error_detail" not in result +@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage fallback is Airflow 3+ only") class TestSafeBuildStructuredLogMessage: def test_string_event_returns_unchanged_and_does_not_warn(self): hit = {"event": "hello", "level": "info"}