diff --git a/airbyte_cdk/metrics/__init__.py b/airbyte_cdk/metrics/__init__.py index 37b834604..f8f0cb03a 100644 --- a/airbyte_cdk/metrics/__init__.py +++ b/airbyte_cdk/metrics/__init__.py @@ -14,7 +14,7 @@ import time from typing import Any, Optional -from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info +from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info, get_python_heap_bytes logger = logging.getLogger(__name__) @@ -22,6 +22,7 @@ METRIC_MEMORY_USAGE_BYTES = "cdk.memory.usage_bytes" METRIC_MEMORY_LIMIT_BYTES = "cdk.memory.limit_bytes" METRIC_MEMORY_USAGE_PERCENT = "cdk.memory.usage_percent" +METRIC_MEMORY_PYTHON_HEAP_BYTES = "cdk.memory.python_heap_bytes" # Default emission interval in seconds DEFAULT_EMISSION_INTERVAL_SECONDS = 30.0 @@ -139,6 +140,7 @@ def emit_memory_metrics(self) -> None: - cdk.memory.usage_bytes: Current container memory usage - cdk.memory.limit_bytes: Container memory limit (if known) - cdk.memory.usage_percent: Usage/limit ratio (if limit is known) + - cdk.memory.python_heap_bytes: Python heap via tracemalloc (only if CDK_TRACEMALLOC_ENABLED is set) """ if not self.enabled: return @@ -154,6 +156,10 @@ def emit_memory_metrics(self) -> None: if info.usage_percent is not None: self.gauge(METRIC_MEMORY_USAGE_PERCENT, info.usage_percent) + python_heap = get_python_heap_bytes() + if python_heap is not None: + self.gauge(METRIC_MEMORY_PYTHON_HEAP_BYTES, float(python_heap)) + except Exception: # Never let metric collection failures affect the sync logger.debug("Failed to collect memory metrics", exc_info=True) diff --git a/airbyte_cdk/metrics/memory.py b/airbyte_cdk/metrics/memory.py index 087551992..f2c00ab10 100644 --- a/airbyte_cdk/metrics/memory.py +++ b/airbyte_cdk/metrics/memory.py @@ -10,14 +10,21 @@ """ import logging +import os import resource import sys +import tracemalloc from dataclasses import dataclass from pathlib import Path from typing import Optional logger = logging.getLogger(__name__) +# Environment variable to opt in to tracemalloc-based Python heap metrics. +# tracemalloc.start() hooks into CPython's allocator and has ~10-30% runtime overhead, +# so it must not be enabled by default in production. +ENV_CDK_TRACEMALLOC_ENABLED = "CDK_TRACEMALLOC_ENABLED" + # cgroup v2 file paths (standard in modern K8s pods) CGROUP_V2_MEMORY_CURRENT = Path("/sys/fs/cgroup/memory.current") CGROUP_V2_MEMORY_MAX = Path("/sys/fs/cgroup/memory.max") @@ -153,3 +160,33 @@ def get_memory_info() -> MemoryInfo: # Fallback to rusage return _read_rusage_memory() + + +def _is_tracemalloc_enabled() -> bool: + """Return True if the CDK_TRACEMALLOC_ENABLED env var is set to a truthy value.""" + return os.environ.get(ENV_CDK_TRACEMALLOC_ENABLED, "").lower() in ("1", "true", "yes") + + +def get_python_heap_bytes() -> Optional[int]: + """Return Python heap size in bytes via tracemalloc, or None if not enabled. + + tracemalloc hooks into CPython's allocator and has ~10-30% runtime overhead. + It is only activated when the ``CDK_TRACEMALLOC_ENABLED`` env var is set to a + truthy value (``1``, ``true``, or ``yes``). + """ + if not _is_tracemalloc_enabled(): + return None + + if not tracemalloc.is_tracing(): + try: + tracemalloc.start() + logger.info( + "tracemalloc started (CDK_TRACEMALLOC_ENABLED is set). " + "Expect ~10-30%% runtime overhead." + ) + except RuntimeError: + logger.debug("tracemalloc failed to start", exc_info=True) + return None + + current, _ = tracemalloc.get_traced_memory() + return current diff --git a/unit_tests/metrics/test_memory.py b/unit_tests/metrics/test_memory.py index 99c288378..ae30ff45e 100644 --- a/unit_tests/metrics/test_memory.py +++ b/unit_tests/metrics/test_memory.py @@ -4,17 +4,20 @@ """Tests for airbyte_cdk.metrics.memory module.""" +import tracemalloc from pathlib import Path from unittest.mock import patch import pytest from airbyte_cdk.metrics.memory import ( + ENV_CDK_TRACEMALLOC_ENABLED, MemoryInfo, _read_cgroup_v1_memory, _read_cgroup_v2_memory, _read_rusage_memory, get_memory_info, + get_python_heap_bytes, ) @@ -176,3 +179,66 @@ def test_falls_back_to_rusage(self) -> None: assert info.usage_bytes > 0 assert info.limit_bytes is None + + +class TestGetPythonHeapBytes: + """Tests for the opt-in tracemalloc-based heap metric.""" + + def test_returns_none_when_env_var_not_set(self) -> None: + with patch.dict("os.environ", {}, clear=True): + result = get_python_heap_bytes() + assert result is None + + def test_returns_none_when_env_var_is_empty(self) -> None: + with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: ""}): + result = get_python_heap_bytes() + assert result is None + + def test_returns_none_when_env_var_is_false(self) -> None: + with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "false"}): + result = get_python_heap_bytes() + assert result is None + + def test_returns_bytes_when_enabled_with_true(self) -> None: + was_tracing = tracemalloc.is_tracing() + try: + with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "true"}): + result = get_python_heap_bytes() + assert result is not None + assert isinstance(result, int) + assert result >= 0 + finally: + if not was_tracing: + tracemalloc.stop() + + def test_returns_bytes_when_enabled_with_1(self) -> None: + was_tracing = tracemalloc.is_tracing() + try: + with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "1"}): + result = get_python_heap_bytes() + assert result is not None + assert isinstance(result, int) + finally: + if not was_tracing: + tracemalloc.stop() + + def test_returns_bytes_when_enabled_with_yes(self) -> None: + was_tracing = tracemalloc.is_tracing() + try: + with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "yes"}): + result = get_python_heap_bytes() + assert result is not None + assert isinstance(result, int) + finally: + if not was_tracing: + tracemalloc.stop() + + def test_case_insensitive(self) -> None: + was_tracing = tracemalloc.is_tracing() + try: + with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "TRUE"}): + result = get_python_heap_bytes() + assert result is not None + finally: + if not was_tracing: + tracemalloc.stop() diff --git a/unit_tests/metrics/test_metrics_client.py b/unit_tests/metrics/test_metrics_client.py index bb6e2f1da..23313544e 100644 --- a/unit_tests/metrics/test_metrics_client.py +++ b/unit_tests/metrics/test_metrics_client.py @@ -150,6 +150,32 @@ def test_skips_limit_when_unknown(self) -> None: assert "cdk.memory.limit_bytes" not in metric_names assert "cdk.memory.usage_percent" not in metric_names + def test_emits_python_heap_when_tracemalloc_enabled(self) -> None: + client, mock_instance = _make_enabled_client() + + mock_info = MemoryInfo(usage_bytes=100_000_000, limit_bytes=200_000_000) + with ( + patch("airbyte_cdk.metrics.get_memory_info", return_value=mock_info), + patch("airbyte_cdk.metrics.get_python_heap_bytes", return_value=5_000_000), + ): + client.emit_memory_metrics() + + gauge_calls = {call[0][0]: call[0][1] for call in mock_instance.gauge.call_args_list} + assert gauge_calls["cdk.memory.python_heap_bytes"] == 5_000_000.0 + + def test_skips_python_heap_when_tracemalloc_disabled(self) -> None: + client, mock_instance = _make_enabled_client() + + mock_info = MemoryInfo(usage_bytes=100_000_000, limit_bytes=200_000_000) + with ( + patch("airbyte_cdk.metrics.get_memory_info", return_value=mock_info), + patch("airbyte_cdk.metrics.get_python_heap_bytes", return_value=None), + ): + client.emit_memory_metrics() + + metric_names = [call[0][0] for call in mock_instance.gauge.call_args_list] + assert "cdk.memory.python_heap_bytes" not in metric_names + def test_noop_when_disabled(self) -> None: client = MetricsClient() # Should not raise