feat: add diagnostic logging, deadlock prevention, and stdout buffering for concurrent reads#925
feat: add diagnostic logging, deadlock prevention, and stdout buffering for concurrent reads#925devin-ai-integration[bot] wants to merge 24 commits intomainfrom
Conversation
Co-Authored-By: unknown <>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1772184954-log-concurrency-level#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1772184954-log-concurrency-levelPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
PyTest Results (Full)3 921 tests 3 909 ✅ 11m 14s ⏱️ Results for commit e4ce817. ♻️ This comment has been updated with latest results. |
|
/prerelease
|
…values Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
|
/prerelease
|
- Add heartbeat logging to _consume_from_queue (logs every 60s when no items received) - Add STARTED/PROGRESS/COMPLETED/FAILED logging to PartitionReader.process_partition - Add STARTED/COMPLETED/FAILED logging to PartitionEnqueuer.generate_partitions - Use queue.get(timeout=60) instead of blocking queue.get() to enable heartbeat Co-Authored-By: unknown <>
…sageSerializer mock Co-Authored-By: unknown <>
…rence Co-Authored-By: unknown <>
|
/prerelease
|
When all worker threads block on queue.put() and the main thread cannot drain the bounded queue (maxsize=10,000), the sync hangs silently for hours. This adds a 5-minute timeout to queue.put() calls in both PartitionReader and PartitionEnqueuer. If the timeout fires, a RuntimeError is raised with a clear deadlock message instead of hanging indefinitely. Co-Authored-By: unknown <>
|
/prerelease
|
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
… logging Co-Authored-By: unknown <>
…nistic ordering) Co-Authored-By: unknown <>
When the platform stops reading from the source container's stdout/stderr pipes (e.g. destination backpressure), all threads block on I/O and no in-process timeout can fire. The watchdog monitors main-thread progress and calls os._exit(1) after 10 minutes of no activity. Also fixes exception handlers in PartitionReader and PartitionEnqueuer that used blocking queue.put() without timeout, which would deadlock when the queue is full. Co-Authored-By: unknown <>
…hput syncs Co-Authored-By: unknown <>
|
/prerelease
|
PrintBuffer.flush() writes directly to sys.__stdout__ while holding an RLock. When the platform pauses reading stdout, flush() blocks on the pipe and the lock is never released. Every other thread that calls logger.info() then blocks waiting for the same RLock, causing a process-wide deadlock. Removing the 'with PRINT_BUFFER:' wrapper means print() and logging go directly to stdout/stderr without the shared lock, allowing the stdout writer thread to be the only bottleneck on the pipe. Co-Authored-By: unknown <>
|
/prerelease
|
The logging handler's stream is PrintBuffer, which writes to sys.__stdout__ while holding an RLock. When the platform pauses reading from the stdout pipe, PrintBuffer.flush() blocks on the pipe while holding the lock. Any thread that tries to log (including the main thread's heartbeat) also blocks waiting for the RLock. This commit redirects all logging handler streams to a _QueueStream that puts messages into the same unbounded buffer used by the stdout writer thread. The writer thread is now the ONLY thing that touches sys.__stdout__, so no other thread can block on the pipe. Co-Authored-By: unknown <>
…d handlers Co-Authored-By: unknown <>
…eadlock Previous fix only redirected logging handler streams to the non-blocking buffer queue. This was insufficient because other code paths (print() calls, direct sys.stdout.write()) still wrote to the real pipe and could block when the platform paused reading. This commit adds two additional layers of protection: 1. _StdoutProxy: A proxy object that intercepts write() and flush() calls, routing them through the unbounded buffer queue. All other attribute access (encoding, fileno, buffer, etc.) is delegated to the original stream object. 2. sys.stdout and sys.stderr are replaced with _StdoutProxy instances inside _buffered_write_to_stdout(). This ensures that ANY write from ANY thread goes through the non-blocking buffer. 3. Diagnostic logging via os.write(2, ...) confirms the buffered writer is active and reports how many handlers were redirected and the original stdout/stderr types. The three layers (handler redirection, stdout/stderr replacement, and the writer thread) together ensure no thread ever blocks on the stdout pipe except the dedicated writer thread. Co-Authored-By: unknown <>
…oint Adds periodic diagnostic messages written directly to stderr fd 2 (bypassing all Python buffering) at key points: 1. _consume_from_queue: every 10s logs total_items, total_yields, qsize, and last_get_wait. Also logs SLOW yields (>5s). 2. _buffered_write_to_stdout: every 10s logs msg_count, buffer_qsize, and writer_alive status. These diagnostics will be visible in platform logs even when the stdout pipe is blocked, allowing us to pinpoint exactly where the main thread gets stuck. Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
…pe backpressure Co-Authored-By: unknown <>
Summary
Adds diagnostic logging, deadlock prevention, and a stdout writer thread to the CDK's concurrent read pipeline. This PR addresses a confirmed deadlock in Airbyte Cloud where syncs hang silently for 24+ hours when stdout pipe backpressure stalls the main thread, causing all worker threads to block on
queue.put()against the bounded shared queue (maxsize=10,000).Changes
Stdout writer thread (
entrypoint.py): Replaces the synchronousprint()loop inlaunch()with a dedicated background writer thread. The main thread puts serialised messages into an in-memory queue (maxsize=50,000) and the writer thread performs the blockingprint()calls. This decouples the generator from OS-level stdout pipe backpressure so the CDK's internal record queue keeps draining even when the platform is slow to read.Watchdog thread (
concurrent_source.py): A daemon thread monitors main-thread progress via a shared monotonic timestamp. If no queue item is consumed for 10 minutes, it callsos._exit(1)— a raw syscall that terminates the process regardless of I/O state. This is a last-resort safety net for cases where both stdout and stderr are blocked.Concurrency configuration logging (
concurrent_declarative_source.py): Logs the resolved concurrency level, partition count, and source (manifest vs default) at startup via stderr.Queue heartbeat logging (
concurrent_source.py): Replaces the indefinitequeue.get()with a 60-second timeout. Logs a heartbeat when no items arrive (idle) or periodically while processing (active), including queue size and thread status.Partition reader diagnostic logging (
partition_reader.py): Logs STARTED / PROGRESS (every 30s) / COMPLETED / FAILED for each partition read with record counts, timing, and slice info.Partition enqueuer diagnostic logging (
partition_enqueuer.py): Logs STARTED / enqueuing / COMPLETED / FAILED for partition generation with counts and timing.Deadlock prevention (
partition_reader.py,partition_enqueuer.py): Adds a 5-minute timeout (_QUEUE_PUT_TIMEOUT = 300s) to allqueue.put()calls, including exception handlers. If a worker thread is blocked for 5 minutes trying to put an item on the full queue, it raises aRuntimeErrorwith a clear deadlock message instead of hanging indefinitely.Deadlock root cause (context)
In the Google Ads connector on Airbyte Cloud, multiple large responses (7–17 MB) complete simultaneously. Each partition reader thread calls
queue.put(record)for thousands of records. The bounded queue fills to 10,000, and the main thread gets stuck atyieldbecauseprint()blocks on the full stdout pipe (64KB buffer). With the main thread unable to drain the queue, all 8 worker threads also block — a complete deadlock with no logs for 24 hours.Updates since last revision
launch()to decouple the main thread from stdout pipe backpressure. The main thread now puts messages into a 50,000-item buffer; a background thread handles the blocking I/O.ConcurrentSource.read()to force-terminate the process after 10 minutes of no main-thread progress (safety net for stdout/stderr both blocked).PartitionReaderandPartitionEnqueuernow use_put_with_timeout()instead of rawqueue.put(), preventing deadlock when the queue is full.set_expected_logsfrom test scenarios to fix test ordering issues caused by the new diagnostic logging.Review & Testing Checklist for Human
print()raises an exception), the main thread continues putting messages into the buffer until it fills (50,000 items), then blocks onbuffer.put()indefinitely. The watchdog would eventually catch this, but it's not clean. Consider whether the writer should signal the main thread on error.writer_thread.join(timeout=300)means if stdout is truly blocked, the join times out after 5 minutes and the main thread exits normally without re-raisingwriter_error[0](which may not be populated yet). Verify this is acceptable behavior.os._exit(1)behavior: This is a hard kill that bypasses all cleanup (nofinallyblocks, no flush). Verify this is acceptable for Cloud deployments where the platform handles retries.Notes