Skip to content

feat: add diagnostic logging, deadlock prevention, and stdout buffering for concurrent reads#925

Open
devin-ai-integration[bot] wants to merge 24 commits intomainfrom
devin/1772184954-log-concurrency-level
Open

feat: add diagnostic logging, deadlock prevention, and stdout buffering for concurrent reads#925
devin-ai-integration[bot] wants to merge 24 commits intomainfrom
devin/1772184954-log-concurrency-level

Conversation

@devin-ai-integration
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot commented Feb 27, 2026

Summary

Adds diagnostic logging, deadlock prevention, and a stdout writer thread to the CDK's concurrent read pipeline. This PR addresses a confirmed deadlock in Airbyte Cloud where syncs hang silently for 24+ hours when stdout pipe backpressure stalls the main thread, causing all worker threads to block on queue.put() against the bounded shared queue (maxsize=10,000).

Changes

  1. Stdout writer thread (entrypoint.py): Replaces the synchronous print() loop in launch() with a dedicated background writer thread. The main thread puts serialised messages into an in-memory queue (maxsize=50,000) and the writer thread performs the blocking print() calls. This decouples the generator from OS-level stdout pipe backpressure so the CDK's internal record queue keeps draining even when the platform is slow to read.

  2. Watchdog thread (concurrent_source.py): A daemon thread monitors main-thread progress via a shared monotonic timestamp. If no queue item is consumed for 10 minutes, it calls os._exit(1) — a raw syscall that terminates the process regardless of I/O state. This is a last-resort safety net for cases where both stdout and stderr are blocked.

  3. Concurrency configuration logging (concurrent_declarative_source.py): Logs the resolved concurrency level, partition count, and source (manifest vs default) at startup via stderr.

  4. Queue heartbeat logging (concurrent_source.py): Replaces the indefinite queue.get() with a 60-second timeout. Logs a heartbeat when no items arrive (idle) or periodically while processing (active), including queue size and thread status.

  5. Partition reader diagnostic logging (partition_reader.py): Logs STARTED / PROGRESS (every 30s) / COMPLETED / FAILED for each partition read with record counts, timing, and slice info.

  6. Partition enqueuer diagnostic logging (partition_enqueuer.py): Logs STARTED / enqueuing / COMPLETED / FAILED for partition generation with counts and timing.

  7. Deadlock prevention (partition_reader.py, partition_enqueuer.py): Adds a 5-minute timeout (_QUEUE_PUT_TIMEOUT = 300s) to all queue.put() calls, including exception handlers. If a worker thread is blocked for 5 minutes trying to put an item on the full queue, it raises a RuntimeError with a clear deadlock message instead of hanging indefinitely.

Deadlock root cause (context)

In the Google Ads connector on Airbyte Cloud, multiple large responses (7–17 MB) complete simultaneously. Each partition reader thread calls queue.put(record) for thousands of records. The bounded queue fills to 10,000, and the main thread gets stuck at yield because print() blocks on the full stdout pipe (64KB buffer). With the main thread unable to drain the queue, all 8 worker threads also block — a complete deadlock with no logs for 24 hours.

Updates since last revision

  • Added stdout writer thread in launch() to decouple the main thread from stdout pipe backpressure. The main thread now puts messages into a 50,000-item buffer; a background thread handles the blocking I/O.
  • Added watchdog thread in ConcurrentSource.read() to force-terminate the process after 10 minutes of no main-thread progress (safety net for stdout/stderr both blocked).
  • Fixed exception handler deadlock risk: Exception handlers in PartitionReader and PartitionEnqueuer now use _put_with_timeout() instead of raw queue.put(), preventing deadlock when the queue is full.
  • Removed set_expected_logs from test scenarios to fix test ordering issues caused by the new diagnostic logging.

Review & Testing Checklist for Human

  • Stdout writer thread error propagation: If the writer thread crashes mid-stream (e.g., print() raises an exception), the main thread continues putting messages into the buffer until it fills (50,000 items), then blocks on buffer.put() indefinitely. The watchdog would eventually catch this, but it's not clean. Consider whether the writer should signal the main thread on error.
  • Writer thread join timeout: writer_thread.join(timeout=300) means if stdout is truly blocked, the join times out after 5 minutes and the main thread exits normally without re-raising writer_error[0] (which may not be populated yet). Verify this is acceptable behavior.
  • Buffer memory usage: The stdout buffer holds up to 50,000 serialised messages. For connectors with large records (e.g., Google Ads with 1KB/record), this could be ~50MB. Verify this is acceptable.
  • Watchdog os._exit(1) behavior: This is a hard kill that bypasses all cleanup (no finally blocks, no flush). Verify this is acceptable for Cloud deployments where the platform handles retries.
  • Test plan: Deploy a CDK prerelease with these changes to the Google Ads connector in the Airbyte Cloud sandbox workspace. Run a sync on the Kortx account (which previously deadlocked for 24+ hours). Verify the sync completes successfully without hanging.

Notes

  • Requested by Anatolii Yatsuk (gl_anatolii.yatsuk) to debug Google Ads connector deadlocks in Airbyte Cloud
  • Devin session

@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1772184954-log-concurrency-level#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1772184954-log-concurrency-level

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Feb 27, 2026

PyTest Results (Fast)

3 918 tests  +49   3 906 ✅ +49   6m 50s ⏱️ -1s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit e4ce817. ± Comparison against base commit 7f41401.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Feb 27, 2026

PyTest Results (Full)

3 921 tests   3 909 ✅  11m 14s ⏱️
    1 suites     12 💤
    1 files        0 ❌

Results for commit e4ce817.

♻️ This comment has been updated with latest results.

@tolik0
Copy link
Contributor

Anatolii Yatsuk (tolik0) commented Feb 27, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22489445491

@tolik0
Copy link
Contributor

Anatolii Yatsuk (tolik0) commented Feb 27, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22494697606

- Add heartbeat logging to _consume_from_queue (logs every 60s when no items received)
- Add STARTED/PROGRESS/COMPLETED/FAILED logging to PartitionReader.process_partition
- Add STARTED/COMPLETED/FAILED logging to PartitionEnqueuer.generate_partitions
- Use queue.get(timeout=60) instead of blocking queue.get() to enable heartbeat

Co-Authored-By: unknown <>
…sageSerializer mock

Co-Authored-By: unknown <>
@tolik0
Copy link
Contributor

Anatolii Yatsuk (tolik0) commented Mar 5, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22725428290

When all worker threads block on queue.put() and the main thread
cannot drain the bounded queue (maxsize=10,000), the sync hangs
silently for hours. This adds a 5-minute timeout to queue.put()
calls in both PartitionReader and PartitionEnqueuer. If the timeout
fires, a RuntimeError is raised with a clear deadlock message instead
of hanging indefinitely.

Co-Authored-By: unknown <>
@devin-ai-integration devin-ai-integration bot changed the title feat: log concurrency configuration at startup for debugging feat: add diagnostic logging and deadlock prevention for concurrent reads Mar 10, 2026
@tolik0
Copy link
Contributor

Anatolii Yatsuk (tolik0) commented Mar 10, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22911023302

When the platform stops reading from the source container's stdout/stderr
pipes (e.g. destination backpressure), all threads block on I/O and no
in-process timeout can fire. The watchdog monitors main-thread progress
and calls os._exit(1) after 10 minutes of no activity.

Also fixes exception handlers in PartitionReader and PartitionEnqueuer
that used blocking queue.put() without timeout, which would deadlock
when the queue is full.

Co-Authored-By: unknown <>
@tolik0
Copy link
Contributor

Anatolii Yatsuk (tolik0) commented Mar 11, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22961350734

PrintBuffer.flush() writes directly to sys.__stdout__ while holding
an RLock. When the platform pauses reading stdout, flush() blocks on
the pipe and the lock is never released. Every other thread that calls
logger.info() then blocks waiting for the same RLock, causing a
process-wide deadlock.

Removing the 'with PRINT_BUFFER:' wrapper means print() and logging
go directly to stdout/stderr without the shared lock, allowing the
stdout writer thread to be the only bottleneck on the pipe.

Co-Authored-By: unknown <>
@tolik0
Copy link
Contributor

Anatolii Yatsuk (tolik0) commented Mar 11, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/22969963641

The logging handler's stream is PrintBuffer, which writes to
sys.__stdout__ while holding an RLock. When the platform pauses
reading from the stdout pipe, PrintBuffer.flush() blocks on the
pipe while holding the lock. Any thread that tries to log (including
the main thread's heartbeat) also blocks waiting for the RLock.

This commit redirects all logging handler streams to a _QueueStream
that puts messages into the same unbounded buffer used by the stdout
writer thread. The writer thread is now the ONLY thing that touches
sys.__stdout__, so no other thread can block on the pipe.

Co-Authored-By: unknown <>
…eadlock

Previous fix only redirected logging handler streams to the non-blocking
buffer queue. This was insufficient because other code paths (print()
calls, direct sys.stdout.write()) still wrote to the real pipe and could
block when the platform paused reading.

This commit adds two additional layers of protection:

1. _StdoutProxy: A proxy object that intercepts write() and flush()
   calls, routing them through the unbounded buffer queue. All other
   attribute access (encoding, fileno, buffer, etc.) is delegated to
   the original stream object.

2. sys.stdout and sys.stderr are replaced with _StdoutProxy instances
   inside _buffered_write_to_stdout(). This ensures that ANY write
   from ANY thread goes through the non-blocking buffer.

3. Diagnostic logging via os.write(2, ...) confirms the buffered writer
   is active and reports how many handlers were redirected and the
   original stdout/stderr types.

The three layers (handler redirection, stdout/stderr replacement, and
the writer thread) together ensure no thread ever blocks on the stdout
pipe except the dedicated writer thread.

Co-Authored-By: unknown <>
…oint

Adds periodic diagnostic messages written directly to stderr fd 2
(bypassing all Python buffering) at key points:

1. _consume_from_queue: every 10s logs total_items, total_yields,
   qsize, and last_get_wait. Also logs SLOW yields (>5s).
2. _buffered_write_to_stdout: every 10s logs msg_count, buffer_qsize,
   and writer_alive status.

These diagnostics will be visible in platform logs even when the
stdout pipe is blocked, allowing us to pinpoint exactly where the
main thread gets stuck.

Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant