Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import pendulum
from elasticsearch import helpers
from elasticsearch.exceptions import NotFoundError
from pydantic import ValidationError

import airflow.logging_config as alc
from airflow.exceptions import AirflowProviderDeprecationWarning
Expand Down Expand Up @@ -75,6 +76,8 @@
EsLogMsgType = list[tuple[str, str]] # type: ignore[assignment,misc]


logger = logging.getLogger(__name__)

LOG_LINE_DEFAULTS = {"exc_text": "", "stack_info": ""}
# Elasticsearch hosted log type

Expand Down Expand Up @@ -139,6 +142,29 @@ def _build_log_fields(hit_dict: dict[str, Any]) -> dict[str, Any]:
return fields


def _safe_build_structured_log_message(hit_dict: dict[str, Any]) -> StructuredLogMessage:
"""
Build a StructuredLogMessage from a stored Elasticsearch hit, tolerating malformed fields.

A single malformed stored log entry (for example a non-string ``event`` produced by
logging a list or dict as the sole message argument) must not fail the entire
log-fetch request. Fall back to a stringified event, mirroring the fallback used for
unparsable raw log lines in ``_log_stream_to_parsed_log_stream``.
"""
fields = _build_log_fields(hit_dict)
try:
return StructuredLogMessage(**fields)
except ValidationError:
logger.warning(
"Failed to parse stored log entry into StructuredLogMessage; falling back to "
"stringified event. Offending fields: %s",
fields,
)
return StructuredLogMessage(
event=str(fields.get("event", hit_dict)), timestamp=fields.get("timestamp")
)


VALID_ES_CONFIG_KEYS = set(inspect.signature(elasticsearch.Elasticsearch.__init__).parameters.keys())
# Remove `self` from the valid set of kwargs
VALID_ES_CONFIG_KEYS.remove("self")
Expand Down Expand Up @@ -461,7 +487,7 @@ def _read(

# Flatten all hits, filter to only desired fields, and construct StructuredLogMessage objects
message = header + [
StructuredLogMessage(**_build_log_fields(hit.to_dict()))
_safe_build_structured_log_message(hit.to_dict())
for hits in logs_by_host.values()
for hit in hits
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
_clean_date,
_format_error_detail,
_render_log_id,
_safe_build_structured_log_message,
_strip_userinfo,
get_es_kwargs_from_config,
getattr_nested,
Expand Down Expand Up @@ -380,6 +381,31 @@ def test_read_with_custom_offset_and_host_fields(self, ti):
assert metadata["offset"] == "1"
assert not metadata["end_of_log"]

@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage fallback is Airflow 3+ only")
@pytest.mark.db_test
def test_read_with_malformed_event_falls_back_to_stringified_event(self, ti):
malformed_event = ["not", "a", "string"]
malformed_source = {
"message": self.test_message,
"event": malformed_event,
"log_id": self.LOG_ID,
"offset": 2,
}
response = _make_es_response(self.es_task_handler.io, self.base_log_source, malformed_source)

with patch.object(self.es_task_handler.io, "_es_read", return_value=response):
with patch("airflow.providers.elasticsearch.log.es_task_handler.logger") as mock_logger:
logs, metadatas = self.es_task_handler.read(ti, 1)

metadata = _assert_log_events(
logs,
metadatas,
expected_events=[self.test_message, str(malformed_event)],
expected_sources=["http://localhost:9200"],
)
assert not metadata["end_of_log"]
mock_logger.warning.assert_called_once()

@pytest.mark.db_test
def test_set_context(self, ti):
self.es_task_handler.set_context(ti)
Expand Down Expand Up @@ -934,3 +960,21 @@ def test_error_detail_dropped_when_empty(self):
hit = {"event": "msg", "error_detail": []}
result = _build_log_fields(hit)
assert "error_detail" not in result


@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="StructuredLogMessage fallback is Airflow 3+ only")
class TestSafeBuildStructuredLogMessage:
def test_string_event_returns_unchanged_and_does_not_warn(self):
hit = {"event": "hello", "level": "info"}
with patch("airflow.providers.elasticsearch.log.es_task_handler.logger") as mock_logger:
result = _safe_build_structured_log_message(hit)
assert result.event == "hello"
mock_logger.warning.assert_not_called()

def test_non_string_event_falls_back_to_stringified_event(self):
hit = {"event": ["a", "b"], "timestamp": "2024-01-01T00:00:00Z"}
with patch("airflow.providers.elasticsearch.log.es_task_handler.logger") as mock_logger:
result = _safe_build_structured_log_message(hit)
assert result.event == str(["a", "b"])
assert result.timestamp is not None
mock_logger.warning.assert_called_once()
Loading