feat: add stderr heartbeat to diagnose stdout pipe blocking#953
Draft
devin-ai-integration[bot] wants to merge 12 commits intomainfrom
Draft
feat: add stderr heartbeat to diagnose stdout pipe blocking#953devin-ai-integration[bot] wants to merge 12 commits intomainfrom
devin-ai-integration[bot] wants to merge 12 commits intomainfrom
Conversation
…sure When the Airbyte platform pauses reading from the source container's stdout pipe, the main thread's print() call blocks in an OS-level write() syscall. This stalls the record queue consumer, filling the bounded queue and blocking all worker threads — a complete deadlock. This change replaces blocking print() with non-blocking os.write() using select() to wait for the pipe to become writable. The main thread stays in a Python-level retry loop instead of getting stuck in a kernel syscall. When the platform resumes reading, select() returns, the write completes, and the pipeline resumes automatically. Key properties: - Memory stays bounded (queue maxsize=10,000 unchanged) - No deadlock (main thread never stuck in blocking syscall) - Automatic recovery when platform resumes reading - 600s watchdog raises RuntimeError if pipe stays blocked Co-Authored-By: unknown <>
Contributor
Author
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773412868-nonblocking-stdout#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773412868-nonblocking-stdoutPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
1 task
…nd log restore failures Co-Authored-By: unknown <>
….stdout Co-Authored-By: unknown <>
…UFFER) Co-Authored-By: unknown <>
…INT_BUFFER wrapper Co-Authored-By: unknown <>
…global BlockingIOError Setting os.set_blocking(fd, False) is a process-wide change that causes BlockingIOError in other threads (logging via print_buffer, worker threads). Instead, use a dedicated stdout-writer thread that does blocking os.write() calls. If the pipe is full, only the writer thread stalls - the main thread continues draining the record queue. Co-Authored-By: unknown <>
…ise KeyboardInterrupt/SystemExit Co-Authored-By: unknown <>
…ite timing Logs when os.write() blocks for >5s (indicates platform paused reading), and logs every 30s when write_queue is full. This will help validate whether the platform ever resumes reading from the pipe after a pause. Co-Authored-By: unknown <>
…ding from stdout pipe Co-Authored-By: unknown <>
…/queue/watchdog) Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
feat: add stderr heartbeat to diagnose stdout pipe blocking
Summary
Adds a lightweight background heartbeat thread to
launch()that writes periodic diagnostic status to stderr (fd 2) every 30 seconds. This is a diagnostic-only change — the actual stdout write path (print()) is unchanged.Purpose: In Airbyte Cloud, the orchestrator controls reading from the source container's stdout pipe. When it pauses reading,
print()blocks in the kernel. We need to prove whether the platform ever resumes reading or permanently stops. Since stderr is collected independently by the Kubernetes container runtime, heartbeat lines will appear in logs even when stdout is blocked.Heartbeat output:
What changed:
stdout-heartbeat) that writes to fd 2 every 30sprint()loop withprint_blocked/messages_written/bytes_writtentrackingtry/finallyto signal the heartbeat thread to stop on exitprint()withPRINT_BUFFERUpdates since last revision
The PR was simplified significantly. The previous implementation included a dedicated writer thread, bounded write queue,
O_NONBLOCK+select(), watchdog timeout, and capsys detection. All of that has been removed. Only the stderr heartbeat diagnostic remains, as the previous approach caused OOM kills in Cloud testing.Review & Testing Checklist for Human
print_blocked,messages_written, andbytes_writtenare read by the heartbeat thread without locks. This is safe under CPython's GIL for simple type reads/writes, but worth confirming no subtle issues arise.STDOUT_HEARTBEATlines appear in stderr logs, and thatprint_blocked=YESwith increasingblocked_sinceis visible when the sync hangs.Notes
Requested by @AnatoliiYatsuk.
Related issues:
Devin session