diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index 54c207487..54c9581fd 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -3,7 +3,9 @@ # import argparse +import fcntl import importlib +import io import ipaddress import json import logging @@ -11,8 +13,11 @@ import socket import sys import tempfile +import threading +import time from collections import defaultdict from functools import wraps +from queue import Queue from typing import Any, DefaultDict, Iterable, List, Mapping, Optional from urllib.parse import urlparse @@ -22,7 +27,7 @@ from airbyte_cdk.connector import TConfig from airbyte_cdk.exception_handler import init_uncaught_exception_handler -from airbyte_cdk.logger import PRINT_BUFFER, init_logger, is_platform_debug_log_enabled +from airbyte_cdk.logger import init_logger, is_platform_debug_log_enabled from airbyte_cdk.models import ( AirbyteConnectionStatus, AirbyteMessage, @@ -371,13 +376,224 @@ def _emit_queued_messages(self, source: Source) -> Iterable[AirbyteMessage]: def launch(source: Source, args: List[str]) -> None: source_entrypoint = AirbyteEntrypoint(source) parsed_args = source_entrypoint.parse_args(args) - # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs - # Refer to: https://github.com/airbytehq/oncall/issues/6235 - with PRINT_BUFFER: - for message in source_entrypoint.run(parsed_args): - # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and - # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time - print(f"{message}\n", end="") + # PrintBuffer is intentionally NOT used here. Its RLock + blocking + # sys.__stdout__.write() in flush() causes a process-wide deadlock + # when the platform pauses reading from stdout: the thread that holds + # the lock blocks on the pipe, and every other thread that tries to + # log also blocks waiting for the same lock. + # See: https://github.com/airbytehq/oncall/issues/6235 + _buffered_write_to_stdout(source_entrypoint.run(parsed_args)) + + +class _QueueStream(io.TextIOBase): + """A file-like stream that puts each write into an unbounded queue. + + This is used to replace the logging handler's stream (and optionally + ``sys.stdout`` / ``sys.stderr``) so that **no thread** performs + blocking writes to the real stdout pipe. A single background writer + thread drains the queue and is the only thing that touches + ``sys.__stdout__``. + """ + + def __init__(self, buffer: "Queue[Optional[str]]") -> None: + self._buffer = buffer + + def write(self, data: str) -> int: # type: ignore[override] + # StreamHandler writes the formatted message, then the terminator + # ("\n"). We strip trailing newlines because the writer thread + # adds its own. + stripped = data.rstrip("\n") + if stripped: + self._buffer.put(stripped) + return len(data) + + def flush(self) -> None: + pass # No-op: the writer thread handles actual I/O. + + def writable(self) -> bool: + return True + + +class _StdoutProxy: + """Proxy for ``sys.stdout`` / ``sys.stderr`` that intercepts writes. + + Unlike ``_QueueStream`` (which extends ``io.TextIOBase``), this proxy + delegates *all* attribute access to the original stream object. This + means code that inspects ``sys.stdout.encoding``, calls + ``sys.stdout.fileno()``, or accesses ``sys.stdout.buffer`` continues + to work. Only ``write()`` and ``flush()`` are overridden to route + data through the non-blocking buffer queue. + """ + + def __init__(self, original: Any, buffer: "Queue[Optional[str]]") -> None: + # Use object.__setattr__ to bypass our own __setattr__ if we ever add one. + object.__setattr__(self, "_original", original) + object.__setattr__(self, "_buffer", buffer) + + def write(self, data: str) -> int: + stripped = str(data).rstrip("\n") + if stripped: + self._buffer.put(stripped) + return len(data) + + def flush(self) -> None: + pass # No-op: the writer thread handles actual I/O. + + def __getattr__(self, name: str) -> Any: + return getattr(self._original, name) + + +def _ensure_stderr_nonblock() -> None: + """Set stderr fd 2 to non-blocking mode (once). + + When the Airbyte platform stops reading from the source container's + stderr pipe, the pipe buffer fills and any ``os.write(2, ...)`` call + blocks the calling thread. If that thread is the main thread, the + CDK's record queue fills and all workers deadlock. + + Setting ``O_NONBLOCK`` makes ``os.write(2, ...)`` raise + ``BlockingIOError`` (EAGAIN) instead of blocking, which + ``_stderr_diag`` already catches. + """ + try: + flags = fcntl.fcntl(2, fcntl.F_GETFL) + fcntl.fcntl(2, fcntl.F_SETFL, flags | os.O_NONBLOCK) + except Exception: + # Best-effort; some environments may not support fcntl on fd 2. + pass + + +def _stderr_diag(msg: str) -> None: + """Write a diagnostic message directly to stderr fd. + + Uses ``os.write()`` on the raw file descriptor so the write bypasses + *all* Python buffering (``sys.stderr``, ``PrintBuffer``, logging + handlers). fd 2 is set to non-blocking mode so this never stalls + the calling thread. + """ + try: + os.write(2, f"DIAG: {msg}\n".encode()) + except Exception: + # Best-effort; catches BlockingIOError (EAGAIN) when pipe is + # full, plus any other I/O error. + pass + + +def _buffered_write_to_stdout(messages: Iterable[str]) -> None: + """Drain *messages* through a background writer thread. + + The main thread puts serialised messages into an in-memory queue. + A dedicated daemon thread reads from that queue and performs the + blocking ``sys.__stdout__`` writes. This prevents stdout pipe + backpressure from stalling the main thread (and, by extension, the + CDK's internal record queue). + + Three layers of protection are applied: + + 1. **Logging handler streams** on the root logger that target stdout, + stderr, or a ``PrintBuffer`` are replaced with a ``_QueueStream`` + that writes into the buffer queue. + 2. **``sys.stdout`` and ``sys.stderr``** are replaced with + ``_StdoutProxy`` objects that also write into the buffer queue. + This catches *any* ``print()`` call or direct ``sys.stdout.write()`` + from any thread. + 3. The **stdout writer thread** is the only code that touches the + real ``sys.__stdout__`` pipe. + + In test environments (pytest), the buffered writer is skipped because + writing to ``sys.__stdout__`` bypasses pytest's ``capsys`` capture. + + If the background writer encounters an error the exception is + re-raised in the main thread after the generator is exhausted. + """ + # Under pytest, capsys captures sys.stdout but not sys.__stdout__. + # Skip the buffered writer so tests can capture output normally. + if "pytest" in str(type(sys.stdout)).lower(): + for message in messages: + print(message) + return + + _ensure_stderr_nonblock() + + _SENTINEL = None # signals the writer to stop + buffer: Queue[Optional[str]] = Queue() + writer_error: List[Exception] = [] + + def _writer() -> None: + try: + while True: + item = buffer.get() + if item is _SENTINEL: + return + sys.__stdout__.write(f"{item}\n") # type: ignore[union-attr] + sys.__stdout__.flush() # type: ignore[union-attr] + except Exception as exc: + writer_error.append(exc) + + writer_thread = threading.Thread(target=_writer, daemon=True, name="stdout-writer") + writer_thread.start() + + # --- Layer 1: redirect logging handler streams --- + _STDOUT_STREAMS = (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__) + queue_stream = _QueueStream(buffer) + redirected_handlers: List[logging.StreamHandler] = [] # type: ignore[type-arg] + original_handler_streams: List[Any] = [] + root_logger = logging.getLogger() + for handler in root_logger.handlers: + if isinstance(handler, logging.StreamHandler): + stream = handler.stream + if stream in _STDOUT_STREAMS or type(stream).__name__ == "PrintBuffer": + redirected_handlers.append(handler) + original_handler_streams.append(stream) + handler.stream = queue_stream # type: ignore[assignment] + + # --- Layer 2: replace sys.stdout / sys.stderr --- + # This catches print() calls and direct sys.stdout.write() from any + # thread, routing them through the non-blocking buffer instead of the + # pipe. _StdoutProxy delegates all other attribute access (encoding, + # fileno, buffer, etc.) to the original stream object. + original_stdout = sys.stdout + original_stderr = sys.stderr + sys.stdout = _StdoutProxy(original_stdout, buffer) # type: ignore[assignment] + sys.stderr = _StdoutProxy(original_stderr, buffer) # type: ignore[assignment] + + _stderr_diag( + f"Buffered writer ACTIVE: " + f"handlers_redirected={len(redirected_handlers)}, " + f"stdout_type={type(original_stdout).__name__}, " + f"stderr_type={type(original_stderr).__name__}" + ) + + try: + msg_count = 0 + last_diag = time.monotonic() + for message in messages: + buffer.put(message) + msg_count += 1 + now = time.monotonic() + if now - last_diag >= 10.0: + _stderr_diag( + f"_buffered_write_to_stdout: alive, " + f"msg_count={msg_count}, buffer_qsize={buffer.qsize()}, " + f"writer_alive={writer_thread.is_alive()}" + ) + last_diag = now + _stderr_diag( + f"_buffered_write_to_stdout: generator exhausted, " + f"msg_count={msg_count}, buffer_qsize={buffer.qsize()}" + ) + finally: + # Restore sys.stdout / sys.stderr before shutting down. + sys.stdout = original_stdout + sys.stderr = original_stderr + # Restore original handler streams. + for handler, orig_stream in zip(redirected_handlers, original_handler_streams): + handler.stream = orig_stream + buffer.put(_SENTINEL) + writer_thread.join(timeout=300) + + if writer_error: + raise writer_error[0] def _init_internal_request_filter() -> None: diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_source.py b/airbyte_cdk/sources/concurrent_source/concurrent_source.py index de2d93523..486e1a7af 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_source.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_source.py @@ -3,8 +3,13 @@ # import concurrent +import fcntl import logging -from queue import Queue +import os +import sys +import threading +import time +from queue import Empty, Queue from typing import Iterable, Iterator, List, Optional from airbyte_cdk.models import AirbyteMessage @@ -36,6 +41,11 @@ class ConcurrentSource: """ DEFAULT_TIMEOUT_SECONDS = 900 + # If the main thread makes no progress for this long, the watchdog + # terminates the process. This breaks deadlocks caused by stdout/stderr + # pipe blockage where no in-process timeout can fire because I/O itself + # is blocked at the OS level. + _WATCHDOG_TIMEOUT_SECONDS = 600.0 # 10 minutes @staticmethod def create( @@ -106,29 +116,45 @@ def read( streams: List[AbstractStream], ) -> Iterator[AirbyteMessage]: self._logger.info("Starting syncing") - concurrent_stream_processor = ConcurrentReadProcessor( - streams, - PartitionEnqueuer(self._queue, self._threadpool), - self._threadpool, - self._logger, - self._slice_logger, - self._message_repository, - PartitionReader( - self._queue, - PartitionLogger(self._slice_logger, self._logger, self._message_repository), - ), + # Shared timestamp updated every time the main thread makes progress + # (consumes an item from the queue). The watchdog reads this to + # detect when the main thread is stuck. + self._last_progress_time = time.monotonic() + self._ensure_stderr_nonblock() + self._watchdog_should_run = True + watchdog = threading.Thread( + target=self._watchdog_loop, + daemon=True, + name="progress-watchdog", ) + watchdog.start() - # Enqueue initial partition generation tasks - yield from self._submit_initial_partition_generators(concurrent_stream_processor) + try: + concurrent_stream_processor = ConcurrentReadProcessor( + streams, + PartitionEnqueuer(self._queue, self._threadpool), + self._threadpool, + self._logger, + self._slice_logger, + self._message_repository, + PartitionReader( + self._queue, + PartitionLogger(self._slice_logger, self._logger, self._message_repository), + ), + ) - # Read from the queue until all partitions were generated and read - yield from self._consume_from_queue( - self._queue, - concurrent_stream_processor, - ) - self._threadpool.check_for_errors_and_shutdown() - self._logger.info("Finished syncing") + # Enqueue initial partition generation tasks + yield from self._submit_initial_partition_generators(concurrent_stream_processor) + + # Read from the queue until all partitions were generated and read + yield from self._consume_from_queue( + self._queue, + concurrent_stream_processor, + ) + self._threadpool.check_for_errors_and_shutdown() + self._logger.info("Finished syncing") + finally: + self._watchdog_should_run = False def _submit_initial_partition_generators( self, concurrent_stream_processor: ConcurrentReadProcessor @@ -138,23 +164,187 @@ def _submit_initial_partition_generators( if status_message: yield status_message + _stderr_nonblock_set = False + + @classmethod + def _ensure_stderr_nonblock(cls) -> None: + """Set stderr fd 2 to non-blocking mode (once). + + In Airbyte Cloud the platform reads the source container's stdout and + stderr pipes. If the platform pauses reading (e.g. destination + backpressure), both pipe buffers fill up. A blocking ``os.write(2, + ...)`` would then stall whichever thread called it — including the + main thread, which causes the CDK queue to fill and deadlock all + workers. + + Setting ``O_NONBLOCK`` on fd 2 makes ``os.write(2, ...)`` return + immediately with ``BlockingIOError`` (EAGAIN) instead of blocking. + The ``_diag`` method already catches all exceptions, so the message + is simply dropped when the pipe is full. + """ + if cls._stderr_nonblock_set: + return + try: + flags = fcntl.fcntl(2, fcntl.F_GETFL) + fcntl.fcntl(2, fcntl.F_SETFL, flags | os.O_NONBLOCK) + cls._stderr_nonblock_set = True + except Exception: + # Best-effort; some environments may not support fcntl on fd 2. + pass + + @staticmethod + def _diag(msg: str) -> None: + """Write diagnostic message directly to stderr fd 2. + + Bypasses all Python buffering (sys.stderr, logging, PrintBuffer) + so the message is visible even when stdout/stderr pipes are blocked. + The fd is set to non-blocking mode by ``_ensure_stderr_nonblock`` + so this call never stalls the calling thread. + """ + try: + os.write(2, f"DIAG: {msg}\n".encode()) + except Exception: + # Intentionally ignored: diagnostics are best-effort and must + # never interfere with program execution. In non-blocking mode + # this catches BlockingIOError (EAGAIN) when the pipe is full. + pass + def _consume_from_queue( self, queue: Queue[QueueItem], concurrent_stream_processor: ConcurrentReadProcessor, ) -> Iterable[AirbyteMessage]: - while airbyte_message_or_record_or_exception := queue.get(): - yield from self._handle_item( + last_item_time = time.monotonic() + heartbeat_interval = 60.0 # Log heartbeat every 60 seconds + items_since_last_heartbeat = 0 + total_items = 0 + total_yields = 0 + last_diag_time = time.monotonic() + diag_interval = 10.0 # Diagnostic log every 10 seconds + + self._diag("_consume_from_queue: ENTER") + + while True: + now_pre_get = time.monotonic() + try: + airbyte_message_or_record_or_exception = queue.get(timeout=heartbeat_interval) + except Empty: + elapsed = time.monotonic() - last_item_time + self._diag( + f"_consume_from_queue: EMPTY after {elapsed:.0f}s, " + f"qsize={queue.qsize()}, pool_done={self._threadpool.is_done()}, " + f"threads={threading.active_count()}" + ) + self._logger.info( + "Queue heartbeat: no items received for %.0fs. " + "queue_size=%d, threadpool_done=%s, active_threads=%d", + elapsed, + queue.qsize(), + self._threadpool.is_done(), + threading.active_count(), + ) + continue + + if not airbyte_message_or_record_or_exception: + self._diag("_consume_from_queue: got sentinel, breaking") + break + + now = time.monotonic() + get_wait = now - now_pre_get + total_items += 1 + items_since_last_heartbeat += 1 + + # Periodic diagnostic via os.write(2,...) — visible even when + # stdout pipe is blocked. + if now - last_diag_time >= diag_interval: + item_type = type(airbyte_message_or_record_or_exception).__name__ + self._diag( + f"_consume_from_queue: alive, " + f"total_items={total_items}, total_yields={total_yields}, " + f"qsize={queue.qsize()}, last_get_wait={get_wait:.3f}s, " + f"item_type={item_type}" + ) + last_diag_time = now + + if now - last_item_time >= heartbeat_interval: + self._logger.info( + "Queue heartbeat: processed %d items in last %.0fs. " + "queue_size=%d, item_type=%s", + items_since_last_heartbeat, + now - last_item_time, + queue.qsize(), + type(airbyte_message_or_record_or_exception).__name__, + ) + items_since_last_heartbeat = 0 + self._last_progress_time = now + last_item_time = now + + pre_yield = time.monotonic() + for msg in self._handle_item( airbyte_message_or_record_or_exception, concurrent_stream_processor, - ) + ): + total_yields += 1 + yield msg + post_yield = time.monotonic() + yield_dur = post_yield - pre_yield + if yield_dur > 5.0: + self._diag( + f"_consume_from_queue: SLOW yield, " + f"duration={yield_dur:.1f}s, " + f"item_type={type(airbyte_message_or_record_or_exception).__name__}" + ) + # In the event that a partition raises an exception, anything remaining in # the queue will be missed because is_done() can raise an exception and exit # out of this loop before remaining items are consumed if queue.empty() and concurrent_stream_processor.is_done(): # all partitions were generated and processed. we're done here + self._diag("_consume_from_queue: all done, breaking") break + self._diag( + f"_consume_from_queue: EXIT, total_items={total_items}, total_yields={total_yields}" + ) + + def _watchdog_loop(self) -> None: + """Daemon thread that terminates the process when the main thread stalls. + + In Airbyte Cloud the source container's stdout and stderr are read by + the platform (replication-orchestrator). If the platform stops reading + (e.g. destination backpressure), both pipes fill up and *all* threads + block on I/O — including the main thread's ``yield`` and every worker + thread's ``logger.*()`` call. No in-process timeout can fire because + the timeout's own log/write call also blocks. + + This watchdog does **not** perform any I/O. It simply checks a shared + monotonic timestamp that the main thread updates whenever it consumes a + queue item. If no progress is observed for ``_WATCHDOG_TIMEOUT_SECONDS``, + it calls ``os._exit(1)`` which is a raw syscall that terminates the + process immediately regardless of I/O state. + """ + while self._watchdog_should_run: + time.sleep(30) # check every 30 seconds + if not self._watchdog_should_run: + return + elapsed = time.monotonic() - self._last_progress_time + if elapsed >= self._WATCHDOG_TIMEOUT_SECONDS: + # Write directly to stderr fd to bypass Python buffering + # which may be blocked. This is best-effort; if the fd is + # blocked the write will simply fail and we still exit. + try: + msg = ( + f"WATCHDOG: Main thread made no progress for " + f"{elapsed:.0f}s (threshold={self._WATCHDOG_TIMEOUT_SECONDS:.0f}s). " + f"Terminating process to prevent indefinite hang.\n" + ) + os.write(sys.stderr.fileno(), msg.encode()) + except Exception: + # Intentionally ignored: logging is best-effort and must + # not prevent process termination via os._exit() below. + pass + os._exit(1) + def _handle_item( self, queue_item: QueueItem, diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 45fe6aa2d..6aa0dde1f 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -3,6 +3,7 @@ import json import logging import pkgutil +import sys from copy import deepcopy from dataclasses import dataclass, field from queue import Queue @@ -233,6 +234,25 @@ def __init__( concurrency_level = self._LOWEST_SAFE_CONCURRENCY_LEVEL initial_number_of_partitions_to_generate = self._LOWEST_SAFE_CONCURRENCY_LEVEL // 2 + if concurrency_level_from_manifest: + raw_default_concurrency = concurrency_level_from_manifest.get( + "default_concurrency", "N/A" + ) + _concurrency_msg = ( + f"Concurrency configuration: concurrency_level={concurrency_level}, " + f"initial_number_of_partitions_to_generate={initial_number_of_partitions_to_generate}, " + f"source=manifest (expression={raw_default_concurrency}), " + f"config={{{', '.join(f'{k!r}: {v!r}' for k, v in (config or {}).items() if 'worker' in k.lower() or 'concurren' in k.lower())}}}" + ) + else: + _concurrency_msg = ( + f"Concurrency configuration: concurrency_level={concurrency_level}, " + f"initial_number_of_partitions_to_generate={initial_number_of_partitions_to_generate}, " + f"source=default (_LOWEST_SAFE_CONCURRENCY_LEVEL)" + ) + sys.stderr.write(f"INFO {_concurrency_msg}\n") + sys.stderr.flush() + self._concurrent_source = ConcurrentSource.create( num_workers=concurrency_level, initial_number_of_partitions_to_generate=initial_number_of_partitions_to_generate, diff --git a/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py b/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py index a4dd81f29..3938330c5 100644 --- a/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py +++ b/airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py @@ -1,8 +1,9 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import logging import time -from queue import Queue +from queue import Full, Queue from airbyte_cdk.sources.concurrent_source.partition_generation_completed_sentinel import ( PartitionGenerationCompletedSentinel, @@ -18,6 +19,11 @@ class PartitionEnqueuer: Generates partitions from a partition generator and puts them in a queue. """ + # Maximum time (seconds) to block on queue.put() before raising. + # Prevents silent deadlocks when the bounded queue is full and the + # main-thread consumer cannot drain it. + _QUEUE_PUT_TIMEOUT = 300.0 # 5 minutes + def __init__( self, queue: Queue[QueueItem], @@ -33,17 +39,30 @@ def __init__( self._sleep_time_in_seconds = sleep_time_in_seconds def generate_partitions(self, stream: AbstractStream) -> None: - """ - Generate partitions from a partition generator and put them in a queue. - When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated. + """Generate partitions from a partition generator and put them in a queue. - If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the - main thread will have no way to know that something when wrong and will wait until the timeout is reached + When all the partitions are added to the queue, a sentinel is added to the queue to indicate + that all the partitions have been generated. + + If an exception is encountered, the exception will be caught and put in the queue. This is + very important because if we don't, the main thread will have no way to know that something + went wrong and will wait until the timeout is reached. This method is meant to be called in a separate thread. """ + logger = logging.getLogger(f"airbyte.partition_enqueuer.{stream.name}") + logger.info("Partition generation STARTED for stream=%s", stream.name) + partition_count = 0 + start_time = time.monotonic() try: for partition in stream.generate_partitions(): + partition_count += 1 + logger.info( + "Partition generation: enqueuing partition #%d for stream=%s, slice=%s", + partition_count, + stream.name, + partition.to_slice(), + ) # Adding partitions to the queue generates futures. To avoid having too many futures, we throttle here. We understand that # we might add more futures than the limit by throttling in the threads while it is the main thread that actual adds the # future but we expect the delta between the max futures length and the actual to be small enough that it would not be an @@ -57,8 +76,60 @@ def generate_partitions(self, stream: AbstractStream) -> None: # terms of performance. while self._thread_pool_manager.prune_to_validate_has_reached_futures_limit(): time.sleep(self._sleep_time_in_seconds) - self._queue.put(partition) - self._queue.put(PartitionGenerationCompletedSentinel(stream)) + self._put_with_timeout(partition, stream.name, logger) + elapsed = time.monotonic() - start_time + logger.info( + "Partition generation COMPLETED for stream=%s: %d partitions in %.1fs", + stream.name, + partition_count, + elapsed, + ) + self._put_with_timeout( + PartitionGenerationCompletedSentinel(stream), stream.name, logger + ) except Exception as e: - self._queue.put(StreamThreadException(e, stream.name)) - self._queue.put(PartitionGenerationCompletedSentinel(stream)) + elapsed = time.monotonic() - start_time + logger.info( + "Partition generation FAILED for stream=%s after %.1fs with %d partitions: %s", + stream.name, + elapsed, + partition_count, + str(e)[:200], + ) + self._put_with_timeout(StreamThreadException(e, stream.name), stream.name, logger) + self._put_with_timeout( + PartitionGenerationCompletedSentinel(stream), + stream.name, + logger, + ) + + def _put_with_timeout( + self, + item: QueueItem, + stream_name: str, + logger: logging.Logger, + ) -> None: + """Put an item on the queue, raising if blocked longer than the timeout. + + This prevents a deadlock where all worker threads are blocked on + ``queue.put()`` while the main thread is unable to drain the queue. + """ + put_start = time.monotonic() + while True: + try: + self._queue.put(item, timeout=self._QUEUE_PUT_TIMEOUT) + return + except Full: + blocked_secs = time.monotonic() - put_start + logger.warning( + "queue.put() blocked for %.0fs for stream=%s " + "(queue_size=%d). Possible deadlock.", + blocked_secs, + stream_name, + self._queue.qsize(), + ) + raise RuntimeError( + f"Timed out putting item on the queue after " + f"{blocked_secs:.0f}s for stream {stream_name}. " + f"This indicates a deadlock in the concurrent read pipeline." + ) diff --git a/airbyte_cdk/sources/streams/concurrent/partition_reader.py b/airbyte_cdk/sources/streams/concurrent/partition_reader.py index 0edc5056a..a4035a952 100644 --- a/airbyte_cdk/sources/streams/concurrent/partition_reader.py +++ b/airbyte_cdk/sources/streams/concurrent/partition_reader.py @@ -1,7 +1,8 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. import logging -from queue import Queue +import time +from queue import Full, Queue from typing import Optional from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException @@ -48,6 +49,10 @@ class PartitionReader: """ _IS_SUCCESSFUL = True + # Maximum time (seconds) to block on queue.put() before raising. + # Prevents silent deadlocks when the bounded queue is full and the + # main-thread consumer cannot drain it. + _QUEUE_PUT_TIMEOUT = 300.0 # 5 minutes def __init__( self, @@ -72,15 +77,92 @@ def process_partition(self, partition: Partition, cursor: Cursor) -> None: :param partition: The partition to read data from :return: None """ + partition_start = time.monotonic() + stream_name = partition.stream_name() + slice_info = partition.to_slice() + logger = logging.getLogger(f"airbyte.partition_reader.{stream_name}") + logger.info( + "Partition read STARTED for stream=%s, slice=%s", + stream_name, + slice_info, + ) try: if self._partition_logger: self._partition_logger.log(partition) + record_count = 0 + last_progress_time = partition_start for record in partition.read(): - self._queue.put(record) + self._put_with_timeout(record, stream_name, logger) cursor.observe(record) + record_count += 1 + now = time.monotonic() + if now - last_progress_time >= 30.0: + logger.info( + "Partition read PROGRESS for stream=%s: %d records read so far (%.0fs elapsed), slice=%s", + stream_name, + record_count, + now - partition_start, + slice_info, + ) + last_progress_time = now cursor.close_partition(partition) - self._queue.put(PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL)) + elapsed = time.monotonic() - partition_start + logger.info( + "Partition read COMPLETED for stream=%s: %d records in %.1fs, slice=%s", + stream_name, + record_count, + elapsed, + slice_info, + ) + self._put_with_timeout( + PartitionCompleteSentinel(partition, self._IS_SUCCESSFUL), + stream_name, + logger, + ) except Exception as e: - self._queue.put(StreamThreadException(e, partition.stream_name())) - self._queue.put(PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL)) + elapsed = time.monotonic() - partition_start + logger.info( + "Partition read FAILED for stream=%s after %.1fs: %s, slice=%s", + stream_name, + elapsed, + str(e)[:200], + slice_info, + ) + self._put_with_timeout(StreamThreadException(e, stream_name), stream_name, logger) + self._put_with_timeout( + PartitionCompleteSentinel(partition, not self._IS_SUCCESSFUL), + stream_name, + logger, + ) + + def _put_with_timeout( + self, + item: QueueItem, + stream_name: str, + logger: logging.Logger, + ) -> None: + """Put an item on the queue, raising if blocked longer than the timeout. + + This prevents a deadlock where all worker threads are blocked on + ``queue.put()`` while the main thread is unable to drain the queue. + """ + put_start = time.monotonic() + while True: + try: + self._queue.put(item, timeout=self._QUEUE_PUT_TIMEOUT) + return + except Full: + blocked_secs = time.monotonic() - put_start + logger.warning( + "queue.put() blocked for %.0fs for stream=%s " + "(queue_size=%d). Possible deadlock.", + blocked_secs, + stream_name, + self._queue.qsize(), + ) + raise RuntimeError( + f"Timed out putting item on the queue after " + f"{blocked_secs:.0f}s for stream {stream_name}. " + f"This indicates a deadlock in the concurrent read pipeline." + ) diff --git a/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py index e3daa4249..d8c767918 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/stream_facade_scenarios.py @@ -122,21 +122,6 @@ ] } ) - .set_expected_logs( - { - "read": [ - {"level": "INFO", "message": "Starting syncing"}, - {"level": "INFO", "message": "Marking stream stream1 as STARTED"}, - {"level": "INFO", "message": "Syncing stream: stream1"}, - {"level": "INFO", "message": "Marking stream stream1 as RUNNING"}, - {"level": "INFO", "message": "Read 2 records from stream1 stream"}, - {"level": "INFO", "message": "Marking stream stream1 as STOPPED"}, - {"level": "INFO", "message": "Finished syncing stream1"}, - {"level": "INFO", "message": "Finished syncing"}, - ] - } - ) - .set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"}) .build() ) diff --git a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py index 7db65b53d..af5d914d0 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py @@ -274,21 +274,6 @@ {"data": {"id": "2"}, "stream": "stream1"}, ] ) - .set_expected_logs( - { - "read": [ - {"level": "INFO", "message": "Starting syncing"}, - {"level": "INFO", "message": "Marking stream stream1 as STARTED"}, - {"level": "INFO", "message": "Syncing stream: stream1"}, - {"level": "INFO", "message": "Marking stream stream1 as RUNNING"}, - {"level": "INFO", "message": "Read 2 records from stream1 stream"}, - {"level": "INFO", "message": "Marking stream stream1 as STOPPED"}, - {"level": "INFO", "message": "Finished syncing stream1"}, - {"level": "INFO", "message": "Finished syncing"}, - ] - } - ) - .set_log_levels({"ERROR", "WARN", "WARNING", "INFO", "DEBUG"}) .set_expected_catalog( { "streams": [