Skip to content

OpenLineage listener builds a new client (and leaks transport threads) per DAG-run event in scheduler pool workers #69284

Description

@gang-zh

Apache Airflow version

2.11.2 (bug verified present on main and in released providers)

What happened?

The OpenLineage listener submits bound adapter methods to its DAG-run event ProcessPoolExecutor:

Pickling a bound method serializes the adapter itself, so the long-lived pool worker unpickles a fresh OpenLineageAdapter for every event and, on emit, builds a new OpenLineageClient — with a new transport set — per DAG-run state change (get_or_create_openlineage_client caches on self._client, an instance attribute). close() is never called on those clients.

For transports that start background worker threads this leaks one thread per event inside the scheduler. The datadog transport (openlineage-python >= 1.37.0) unconditionally starts an AsyncHttpTransport worker thread in its constructor, and each such thread busy-polls its queue (await asyncio.sleep(0.01) when idle). Measured on openlineage-python 1.47.1, each leaked idle thread costs ~0.45% CPU:

clients created (= events) threads idle CPU%
0 1 0.0
50 51 29.5
150 151 72.0
400 401 167.6

Production impact (Airflow 2.11.2, Astronomer, KubernetesExecutor, ~50 hourly DAGs, composite transport with a datadog leg for Datadog Jobs Monitoring): scheduler CPU climbs steadily (non-sawtooth) from the moment the datadog leg is enabled until pinned at its limit within ~6 hours; scheduler heartbeat dips; liveness-probe restarts temporarily recover it. Disabling OpenLineage restores normal behavior immediately. Routing all events through the transport's async queue (ASYNC_TRANSPORT_RULES={"*": {"*": true}}) does not help, because the worker thread is created in the transport constructor regardless of routing.

What you think should happen instead?

Each pool worker should keep exactly one OpenLineageClient (one transport set, one set of transport threads) for its whole lifetime, regardless of how many events it emits.

How to reproduce

One DatadogTransport instantiation is what each per-event client rebuild does in the pool worker:

import threading
import time

from openlineage.client.transport.datadog import DatadogConfig, DatadogTransport


def cpu_pct(interval: float = 3.0) -> float:
    t0, w0 = time.process_time(), time.perf_counter()
    time.sleep(interval)
    return 100 * (time.process_time() - t0) / (time.perf_counter() - w0)


transports = []
print(f"{'clients':>8} {'threads':>8} {'idle CPU%':>10}")
print(f"{0:>8} {threading.active_count():>8} {cpu_pct():>10.1f}")
for target in (50, 150, 400):
    while len(transports) < target:
        transports.append(DatadogTransport(DatadogConfig(apiKey="fake-key")))
    time.sleep(1.0)
    print(f"{len(transports):>8} {threading.active_count():>8} {cpu_pct():>10.1f}")

On a live scheduler, py-spy dump on the OL pool-worker process shows the accumulating transport worker threads.

Operating System

Debian (Astro Runtime 13.7.0 container); reproduced on macOS as well

Versions of Apache Airflow Providers

apache-airflow-providers-openlineage — verified in 2.14.0, 2.16.0 and current main (older versions likely affected as well); openlineage-python 1.47.1.

Deployment

Astronomer

Anything else?

The datadog transport makes this most visible (unconditional thread start, 100 Hz idle poll, no close), and a separate report is being filed with OpenLineage for that. The provider-side per-event client rebuild affects all transport types, though: even the plain http transport pays connection-pool/session rebuild per event.

Are you willing to submit PR?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions