Skip to content

feat: make concurrent CDK record queue unlimited to prevent deadlock#951

Draft
devin-ai-integration[bot] wants to merge 3 commits intomainfrom
devin/1773333604-increase-queue-size
Draft

feat: make concurrent CDK record queue unlimited to prevent deadlock#951
devin-ai-integration[bot] wants to merge 3 commits intomainfrom
devin/1773333604-increase-queue-size

Conversation

@devin-ai-integration
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot commented Mar 12, 2026

feat: make concurrent CDK record queue unlimited to prevent deadlock

Summary

Makes the internal record queue unlimited (maxsize=0) across the concurrent CDK to prevent a deadlock observed in Airbyte Cloud.

Context: In Cloud, when the platform pauses reading from the source container's stdout pipe (e.g., due to destination backpressure), the main thread blocks on print(), stops consuming from the internal queue, and the queue fills to capacity. Once full, all worker threads block on queue.put(), causing a complete deadlock. A previous attempt with maxsize=50_000 was still insufficient — the queue filled before the platform resumed reading. Setting maxsize=0 (unlimited) prevents workers from ever blocking on queue.put().

Changes (2 files):

  • concurrent_source.py: Queue(maxsize=10_000)Queue(maxsize=0)
  • concurrent_declarative_source.py: Queue(maxsize=10_000)Queue(maxsize=0)

Note: DEFAULT_MAX_QUEUE_SIZE in thread_pool_manager.py is intentionally left at 10_000. It controls max_concurrent_tasks (the futures/concurrency limit), which is a separate concern from the queue capacity. An earlier revision mistakenly set it to 0, which broke ThreadPoolManager entirely — prune_to_validate_has_reached_futures_limit() would always return True (since len(futures) >= 0 is always true), preventing any work from being submitted.

Updates since last revision

  • Reverted the thread_pool_manager.py change from the previous revision. DEFAULT_MAX_QUEUE_SIZE stays at its original value of 10_000 to preserve the max_concurrent_tasks default. Only the Queue(maxsize=...) calls in the two source files are changed to 0 (unlimited). This cleanly separates queue capacity from the concurrency/futures limit.

Review & Testing Checklist for Human

  • Memory impact across all connectors — an unlimited queue means potentially unbounded memory growth if workers produce records faster than the main thread consumes them. In the worst case (platform stops reading stdout indefinitely), memory could grow until the container OOMs. Confirm this risk is acceptable vs. the deadlock risk.
  • Confirm this is intended as a temporary workaround — the root cause is that the platform pauses reading from stdout. A permanent fix should address the pipe backpressure issue (e.g., the stdout writer thread approach in PR feat: add diagnostic logging, deadlock prevention, and stdout buffering for concurrent reads #925) rather than relying on unlimited memory buffering.
  • Verify DEFAULT_MAX_PARTITIONS_NUMBER in ConcurrentPerPartitionCursor — its docstring states it "needs to be higher than ThreadPoolManager.DEFAULT_MAX_QUEUE_SIZE". Since DEFAULT_MAX_QUEUE_SIZE remains 10_000 (unchanged), this invariant is still satisfied, but worth confirming during review.

Notes

Requested by @AnatoliiYatsuk.

Devin session

@devin-ai-integration
Copy link
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1773333604-increase-queue-size#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1773333604-increase-queue-size

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link

github-actions bot commented Mar 12, 2026

PyTest Results (Fast)

3 934 tests  +3 344   3 922 ✅ +3 344   7m 21s ⏱️ + 4m 13s
    1 suites ±    0      12 💤 +    1 
    1 files   ±    0       0 ❌  -     1 

Results for commit cb93ee4. ± Comparison against base commit fd21b86.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Mar 12, 2026

PyTest Results (Full)

3 937 tests  +16   3 925 ✅ +16   10m 44s ⏱️ -33s
    1 suites ± 0      12 💤 ± 0 
    1 files   ± 0       0 ❌ ± 0 

Results for commit cb93ee4. ± Comparison against base commit fd21b86.

♻️ This comment has been updated with latest results.

@devin-ai-integration devin-ai-integration bot changed the title feat: increase DEFAULT_MAX_QUEUE_SIZE from 10,000 to 50,000 feat: make concurrent CDK record queue unlimited to prevent deadlock Mar 12, 2026
…s default

Setting DEFAULT_MAX_QUEUE_SIZE=0 broke ThreadPoolManager because it is
also used as the default for max_concurrent_tasks. With max_concurrent_tasks=0,
prune_to_validate_has_reached_futures_limit() always returns True (len >= 0),
causing infinite warning spam and preventing any records from being processed.

Fix: keep DEFAULT_MAX_QUEUE_SIZE=10_000 (used for futures limit) and only
change the Queue(maxsize=0) in concurrent_source.py and
concurrent_declarative_source.py (unlimited queue to prevent deadlock).

Co-Authored-By: unknown <>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

0 participants