From 1a88d4cc555246e56f85edec1554ea1b814affc7 Mon Sep 17 00:00:00 2001 From: yxstev Date: Fri, 5 Jun 2026 16:09:14 +0800 Subject: [PATCH 1/9] [fix,refactor] Isolate `notify_data_update` ZMQ I/O into a dedicated background asyncio loop Root cause: `notify_data_update` ran on the caller's asyncio loop. Under heavy compute workloads (PyTorch, Ray), the loop would stall, causing `asyncio.wait_for` timers to fire before the controller ACK was ever read. Changes: - Spin up a dedicated daemon thread (`_notify_thread`) running an isolated `_notify_loop` per StorageManager instance, keeping notify I/O immune to caller-side event loop starvation - Pre-connect a single reusable async DEALER socket (`_notify_sock`) in `_init_notify_zmq`; bridge calls via `run_coroutine_threadsafe` + `wrap_future` so the caller-side `await` remains non-blocking - Add `_notify_lock` to serialize concurrent `notify_data_update` calls on the shared socket, preventing send/recv interleaving across coroutines - Guard `close()` with `hasattr(_notify_loop)` to avoid `AttributeError` in `__del__` when `__init__` fails before the loop is created Signed-off-by: yxstev --- transfer_queue/storage/managers/base.py | 206 ++++++++++++++---------- 1 file changed, 119 insertions(+), 87 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 7057cc1..354a6ac 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -16,6 +16,7 @@ import asyncio import itertools import os +import threading import time import weakref from abc import ABC, abstractmethod @@ -45,6 +46,14 @@ TQ_STORAGE_HANDSHAKE_MAX_RETRIES = int(os.environ.get("TQ_STORAGE_HANDSHAKE_MAX_RETRIES", 3)) TQ_DATA_UPDATE_RESPONSE_TIMEOUT = int(os.environ.get("TQ_DATA_UPDATE_RESPONSE_TIMEOUT", 30)) + +def _run_notify_loop(notify_loop: asyncio.AbstractEventLoop) -> None: + asyncio.set_event_loop(notify_loop) + try: + notify_loop.run_forever() + finally: + notify_loop.close() + LIMIT_THREADS_PER_MANAGER_IN_DRIVER = 8 LIMIT_THREADS_PER_MANAGER_IN_RAY_ACTOR = 4 @@ -61,41 +70,70 @@ def __init__(self, controller_info: ZMQServerInfo, config: DictConfig): # Handshake socket is sync (used only during initialization) self.controller_handshake_socket: zmq.Socket | None = None - self.zmq_context: zmq.asyncio.Context | None = None self._connect_to_controller() + # Dedicated asyncio loop for ZMQ notify traffic, isolated from the caller's loop + self._notify_loop = asyncio.new_event_loop() + self._notify_thread = threading.Thread( + target=_run_notify_loop, + args=(self._notify_loop,), + daemon=True, + name=f"{self.storage_manager_id}-notify_data_status_loop", + ) + self._notify_thread.start() + + self._notify_zmq_ctx: zmq.asyncio.Context | None = None + self._notify_sock: zmq.asyncio.Socket | None = None + self._notify_lock = asyncio.Lock() + notify_sock_ready = threading.Event() + asyncio.run_coroutine_threadsafe(self._init_notify_zmq(notify_sock_ready), self._notify_loop) + notify_sock_ready.wait() + def _connect_to_controller(self) -> None: - """Initialize ZMQ sockets between storage unit and controller for handshake.""" + """Establish initial connection to the controller via a blocking handshake.""" if not isinstance(self.controller_info, ZMQServerInfo): raise ValueError(f"controller_info should be ZMQServerInfo, but got {type(self.controller_info)}") + sync_zmq_context = zmq.Context() try: - # Create a synchronous context for handshake (blocking operation) - sync_zmq_context = zmq.Context() - - # create zmq socket for handshake (sync, for initial connection) self.controller_handshake_socket = create_zmq_socket( ctx=sync_zmq_context, socket_type=zmq.DEALER, ip=self.controller_info.ip, identity=f"{self.storage_manager_id}-controller_handshake_socket-{uuid4().hex[:8]}".encode(), ) - - # do handshake with controller using sync socket self._do_handshake_with_controller() - - # close the sync handshake socket and context after handshake + except Exception as e: + logger.error(f"Failed to connect to controller: {e}") + raise + finally: if self.controller_handshake_socket and not self.controller_handshake_socket.closed: self.controller_handshake_socket.close(linger=0) self.controller_handshake_socket = None sync_zmq_context.term() - # create async context for data status update - self.zmq_context = zmq.asyncio.Context() - + async def _init_notify_zmq(self, notify_sock_ready: threading.Event) -> None: + """Create the async ZMQ DEALER socket used for data status notifications.""" + try: + self._notify_zmq_ctx = zmq.asyncio.Context() + identity = f"{self.storage_manager_id}-notify-{uuid4().hex[:8]}".encode() + self._notify_sock = create_zmq_socket( + ctx=self._notify_zmq_ctx, + socket_type=zmq.DEALER, + ip=self.controller_info.ip, + identity=identity, + ) + self._notify_sock.setsockopt(zmq.LINGER, 0) + self._notify_sock.connect(self.controller_info.to_addr("request_handle_socket")) + logger.debug( + f"[{self.storage_manager_id}]: Notify ZMQ socket connected " + f"to controller {self.controller_info.id}." + ) except Exception as e: - logger.error(f"Failed to connect to controller: {e}") + logger.critical(f"[{self.storage_manager_id}]: Failed to initialize notify ZMQ: {e}") raise + finally: + notify_sock_ready.set() def _do_handshake_with_controller(self) -> None: """Handshake with controller to establish connection with retransmission mechanism.""" @@ -205,54 +243,47 @@ async def notify_data_update( logger.warning(f"No controller connected for storage manager {self.storage_manager_id}") return - # create dynamic socket - identity = f"{self.storage_manager_id}-data_update-{uuid4().hex[:8]}".encode() - sock = create_zmq_socket(self.zmq_context, zmq.DEALER, self.controller_info.ip, identity) + normalized_field_schema = {} + for field_name, field in field_schema.items(): + field_copy = field.copy() + per_sample_shapes = field_copy.get("per_sample_shapes", None) + if isinstance(per_sample_shapes, list | tuple): + if len(per_sample_shapes) != len(global_indexes): + raise ValueError( + f"per_sample_shapes length ({len(per_sample_shapes)}) does not match " + f"number of global_indexes ({len(global_indexes)}) for field '{field_name}'; " + f"skipping per_sample_shapes normalization." + ) + field_copy["per_sample_shapes"] = { + global_indexes[i]: per_sample_shapes[i] for i in range(len(global_indexes)) + } + normalized_field_schema[field_name] = field_copy - try: - sock.connect(self.controller_info.to_addr("request_handle_socket")) - - normalized_field_schema = {} - for field_name, field in field_schema.items(): - # Work on a shallow copy to avoid mutating caller-provided schema - field_copy = field.copy() - per_sample_shapes = field_copy.get("per_sample_shapes", None) - if isinstance(per_sample_shapes, list | tuple): - if len(per_sample_shapes) != len(global_indexes): - raise ValueError( - f"per_sample_shapes length ({len(per_sample_shapes)}) does not match " - f"number of global_indexes ({len(global_indexes)}) for field '{field_name}'; " - f"skipping per_sample_shapes normalization." - ) - else: - field_copy["per_sample_shapes"] = { - global_indexes[i]: per_sample_shapes[i] for i in range(len(global_indexes)) - } - - normalized_field_schema[field_name] = field_copy - - # convert per_sample_shapes into dict - for field in field_schema.values(): - per_sample_shapes = field.get("per_sample_shapes", None) - if per_sample_shapes: - per_sample_shapes = {global_indexes[i]: per_sample_shapes[i] for i in range(len(global_indexes))} - field["per_sample_shapes"] = per_sample_shapes - - request_msg = ZMQMessage.create( - request_type=ZMQRequestType.NOTIFY_DATA_UPDATE, # type: ignore[arg-type] - sender_id=self.storage_manager_id, - body={ - "partition_id": partition_id, - "global_indexes": global_indexes, - "field_schema": normalized_field_schema, - "custom_backend_meta": custom_backend_meta, - }, - ).serialize() - - await sock.send_multipart(request_msg) + request_msg = ZMQMessage.create( + request_type=ZMQRequestType.NOTIFY_DATA_UPDATE, # type: ignore[arg-type] + sender_id=self.storage_manager_id, + body={ + "partition_id": partition_id, + "global_indexes": global_indexes, + "field_schema": normalized_field_schema, + "custom_backend_meta": custom_backend_meta, + }, + ).serialize() + + thread_future = asyncio.run_coroutine_threadsafe( + self._notify_and_wait(request_msg), + self._notify_loop, + ) + await asyncio.wrap_future(thread_future) + + async def _notify_and_wait(self, request_msg: list) -> None: + """Send a data status notification to the controller and block until ACK is received.""" + assert self._notify_sock is not None + + async with self._notify_lock: + await self._notify_sock.send_multipart(request_msg) logger.debug( - f"[{self.storage_manager_id}]: Send data status update request " - f"from storage manager id #{self.storage_manager_id} " + f"[{self.storage_manager_id}]: Sent data status update request " f"to controller id #{self.controller_info.id} successfully." ) @@ -262,7 +293,10 @@ async def notify_data_update( while not response_received and timeout > 0: try: poll_interval = min(TQ_STORAGE_POLLER_TIMEOUT, timeout) - messages = await asyncio.wait_for(sock.recv_multipart(copy=False), timeout=poll_interval) + messages = await asyncio.wait_for( + self._notify_sock.recv_multipart(copy=False), + timeout=poll_interval, + ) response_msg = ZMQMessage.deserialize(messages) if response_msg.request_type == ZMQRequestType.NOTIFY_DATA_UPDATE_ACK: # type: ignore[arg-type] @@ -271,32 +305,15 @@ async def notify_data_update( f"[{self.storage_manager_id}]: Get data status update ACK response " f"from controller id #{response_msg.sender_id} successfully." ) + break except asyncio.TimeoutError: timeout -= poll_interval - except Exception as e: - logger.warning(f"[{self.storage_manager_id}]: Error receiving response: {e}") - break if not response_received: - logger.error(f"[{self.storage_manager_id}]: Did not receive data status update ACK.") - - except Exception as e: - logger.error(f"[{self.storage_manager_id}]: Error during notify_data_update: {e}") - try: - error_msg = ZMQMessage.create( - request_type=ZMQRequestType.NOTIFY_DATA_UPDATE_ERROR, # type: ignore[arg-type] - sender_id=self.storage_manager_id, - body={"message": f"Failed to notify: {str(e)}"}, - ).serialize() - await sock.send_multipart(error_msg) - except Exception: - pass - finally: - try: - if not sock.closed: - sock.close(linger=-1) - except Exception: - pass + raise TimeoutError( + f"[{self.storage_manager_id}]: Timeout waiting for data status update ACK " + f"from controller after {TQ_DATA_UPDATE_RESPONSE_TIMEOUT}s." + ) @abstractmethod async def put_data( @@ -344,8 +361,7 @@ async def clear_data(self, metadata: BatchMeta) -> None: raise NotImplementedError("Subclasses must implement clear_data") def close(self) -> None: - """Close all ZMQ sockets and context to prevent resource leaks.""" - # Close handshake socket if it exists + """Close all ZMQ sockets/contexts and stop the notify loop.""" if self.controller_handshake_socket: try: if not self.controller_handshake_socket.closed: @@ -353,8 +369,24 @@ def close(self) -> None: except Exception as e: logger.error(f"[{self.storage_manager_id}]: Error closing controller_handshake_socket: {str(e)}") - if self.zmq_context: - self.zmq_context.term() + if not hasattr(self, "_notify_loop") or not self._notify_loop.is_running(): + return + + async def _cleanup() -> None: + if self._notify_sock and not self._notify_sock.closed: + self._notify_sock.close() + if self._notify_zmq_ctx: + self._notify_zmq_ctx.term() + + future = asyncio.run_coroutine_threadsafe(_cleanup(), self._notify_loop) + try: + future.result(timeout=5) + except Exception as e: + logger.error(f"[{self.storage_manager_id}]: Error closing notify ZMQ: {e}") + + self._notify_loop.call_soon_threadsafe(self._notify_loop.stop) + self._notify_thread.join(timeout=5) + logger.debug(f"[{self.storage_manager_id}]: Notify ZMQ thread shut down.") def __del__(self): """Destructor to ensure resources are cleaned up.""" From c0b481f76b0dc4c12a89ec0ea896ccf71ce8c4b7 Mon Sep 17 00:00:00 2001 From: yxstev Date: Fri, 5 Jun 2026 16:22:10 +0800 Subject: [PATCH 2/9] fix pre-commit Signed-off-by: yxstev --- transfer_queue/storage/managers/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 354a6ac..56de77f 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -54,6 +54,7 @@ def _run_notify_loop(notify_loop: asyncio.AbstractEventLoop) -> None: finally: notify_loop.close() + LIMIT_THREADS_PER_MANAGER_IN_DRIVER = 8 LIMIT_THREADS_PER_MANAGER_IN_RAY_ACTOR = 4 @@ -126,8 +127,7 @@ async def _init_notify_zmq(self, notify_sock_ready: threading.Event) -> None: self._notify_sock.setsockopt(zmq.LINGER, 0) self._notify_sock.connect(self.controller_info.to_addr("request_handle_socket")) logger.debug( - f"[{self.storage_manager_id}]: Notify ZMQ socket connected " - f"to controller {self.controller_info.id}." + f"[{self.storage_manager_id}]: Notify ZMQ socket connected to controller {self.controller_info.id}." ) except Exception as e: logger.critical(f"[{self.storage_manager_id}]: Failed to initialize notify ZMQ: {e}") From 43ace714d46ad193ba23ab8b73dd451cf06c3fa2 Mon Sep 17 00:00:00 2001 From: yxstev Date: Fri, 5 Jun 2026 16:43:02 +0800 Subject: [PATCH 3/9] avoid running _init_notify_zmq in coroutine Signed-off-by: yxstev --- transfer_queue/storage/managers/base.py | 41 ++++++++----------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 56de77f..c277912 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -74,6 +74,18 @@ def __init__(self, controller_info: ZMQServerInfo, config: DictConfig): self._connect_to_controller() # Dedicated asyncio loop for ZMQ notify traffic, isolated from the caller's loop + self._notify_zmq_ctx = zmq.asyncio.Context() + identity = f"{self.storage_manager_id}-notify-{uuid4().hex[:8]}".encode() + self._notify_sock = create_zmq_socket( + ctx=self._notify_zmq_ctx, + socket_type=zmq.DEALER, + ip=self.controller_info.ip, + identity=identity, + ) + self._notify_sock.setsockopt(zmq.LINGER, 0) + self._notify_sock.connect(self.controller_info.to_addr("request_handle_socket")) + + self._notify_lock = asyncio.Lock() self._notify_loop = asyncio.new_event_loop() self._notify_thread = threading.Thread( target=_run_notify_loop, @@ -83,13 +95,6 @@ def __init__(self, controller_info: ZMQServerInfo, config: DictConfig): ) self._notify_thread.start() - self._notify_zmq_ctx: zmq.asyncio.Context | None = None - self._notify_sock: zmq.asyncio.Socket | None = None - self._notify_lock = asyncio.Lock() - notify_sock_ready = threading.Event() - asyncio.run_coroutine_threadsafe(self._init_notify_zmq(notify_sock_ready), self._notify_loop) - notify_sock_ready.wait() - def _connect_to_controller(self) -> None: """Establish initial connection to the controller via a blocking handshake.""" if not isinstance(self.controller_info, ZMQServerInfo): @@ -113,28 +118,6 @@ def _connect_to_controller(self) -> None: self.controller_handshake_socket = None sync_zmq_context.term() - async def _init_notify_zmq(self, notify_sock_ready: threading.Event) -> None: - """Create the async ZMQ DEALER socket used for data status notifications.""" - try: - self._notify_zmq_ctx = zmq.asyncio.Context() - identity = f"{self.storage_manager_id}-notify-{uuid4().hex[:8]}".encode() - self._notify_sock = create_zmq_socket( - ctx=self._notify_zmq_ctx, - socket_type=zmq.DEALER, - ip=self.controller_info.ip, - identity=identity, - ) - self._notify_sock.setsockopt(zmq.LINGER, 0) - self._notify_sock.connect(self.controller_info.to_addr("request_handle_socket")) - logger.debug( - f"[{self.storage_manager_id}]: Notify ZMQ socket connected to controller {self.controller_info.id}." - ) - except Exception as e: - logger.critical(f"[{self.storage_manager_id}]: Failed to initialize notify ZMQ: {e}") - raise - finally: - notify_sock_ready.set() - def _do_handshake_with_controller(self) -> None: """Handshake with controller to establish connection with retransmission mechanism.""" is_connected: bool = False From 3e4b0a8d2fdd3c8d595d7e4efff244df76485672 Mon Sep 17 00:00:00 2001 From: yxstev Date: Fri, 5 Jun 2026 16:49:46 +0800 Subject: [PATCH 4/9] preserve the previous dynamic socket design Signed-off-by: yxstev --- transfer_queue/storage/managers/base.py | 39 +++++++------------------ 1 file changed, 11 insertions(+), 28 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index c277912..9580cd8 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -74,18 +74,6 @@ def __init__(self, controller_info: ZMQServerInfo, config: DictConfig): self._connect_to_controller() # Dedicated asyncio loop for ZMQ notify traffic, isolated from the caller's loop - self._notify_zmq_ctx = zmq.asyncio.Context() - identity = f"{self.storage_manager_id}-notify-{uuid4().hex[:8]}".encode() - self._notify_sock = create_zmq_socket( - ctx=self._notify_zmq_ctx, - socket_type=zmq.DEALER, - ip=self.controller_info.ip, - identity=identity, - ) - self._notify_sock.setsockopt(zmq.LINGER, 0) - self._notify_sock.connect(self.controller_info.to_addr("request_handle_socket")) - - self._notify_lock = asyncio.Lock() self._notify_loop = asyncio.new_event_loop() self._notify_thread = threading.Thread( target=_run_notify_loop, @@ -261,10 +249,14 @@ async def notify_data_update( async def _notify_and_wait(self, request_msg: list) -> None: """Send a data status notification to the controller and block until ACK is received.""" - assert self._notify_sock is not None + zmq_ctx = zmq.asyncio.Context() + identity = f"{self.storage_manager_id}-notify-{uuid4().hex[:8]}".encode() + sock = create_zmq_socket(ctx=zmq_ctx, socket_type=zmq.DEALER, ip=self.controller_info.ip, identity=identity) + sock.setsockopt(zmq.LINGER, 0) + sock.connect(self.controller_info.to_addr("request_handle_socket")) - async with self._notify_lock: - await self._notify_sock.send_multipart(request_msg) + try: + await sock.send_multipart(request_msg) logger.debug( f"[{self.storage_manager_id}]: Sent data status update request " f"to controller id #{self.controller_info.id} successfully." @@ -277,7 +269,7 @@ async def _notify_and_wait(self, request_msg: list) -> None: try: poll_interval = min(TQ_STORAGE_POLLER_TIMEOUT, timeout) messages = await asyncio.wait_for( - self._notify_sock.recv_multipart(copy=False), + sock.recv_multipart(copy=False), timeout=poll_interval, ) response_msg = ZMQMessage.deserialize(messages) @@ -297,6 +289,9 @@ async def _notify_and_wait(self, request_msg: list) -> None: f"[{self.storage_manager_id}]: Timeout waiting for data status update ACK " f"from controller after {TQ_DATA_UPDATE_RESPONSE_TIMEOUT}s." ) + finally: + sock.close() + zmq_ctx.term() @abstractmethod async def put_data( @@ -355,18 +350,6 @@ def close(self) -> None: if not hasattr(self, "_notify_loop") or not self._notify_loop.is_running(): return - async def _cleanup() -> None: - if self._notify_sock and not self._notify_sock.closed: - self._notify_sock.close() - if self._notify_zmq_ctx: - self._notify_zmq_ctx.term() - - future = asyncio.run_coroutine_threadsafe(_cleanup(), self._notify_loop) - try: - future.result(timeout=5) - except Exception as e: - logger.error(f"[{self.storage_manager_id}]: Error closing notify ZMQ: {e}") - self._notify_loop.call_soon_threadsafe(self._notify_loop.stop) self._notify_thread.join(timeout=5) logger.debug(f"[{self.storage_manager_id}]: Notify ZMQ thread shut down.") From 3890ded456f09f14ec4dda40708e73ec04b674ab Mon Sep 17 00:00:00 2001 From: yxstev Date: Fri, 5 Jun 2026 17:44:34 +0800 Subject: [PATCH 5/9] fix comments Signed-off-by: yxstev --- transfer_queue/storage/managers/base.py | 37 +++++++++++++++++-------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 9580cd8..fcae7d6 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -67,6 +67,7 @@ def __init__(self, controller_info: ZMQServerInfo, config: DictConfig): self.storage_manager_id = f"TQ_STORAGE_{uuid4().hex[:8]}" self.config = config self.controller_info = controller_info + self.zmq_ctx = zmq.asyncio.Context() # Handshake socket is sync (used only during initialization) self.controller_handshake_socket: zmq.Socket | None = None @@ -84,28 +85,35 @@ def __init__(self, controller_info: ZMQServerInfo, config: DictConfig): self._notify_thread.start() def _connect_to_controller(self) -> None: - """Establish initial connection to the controller via a blocking handshake.""" + """Initialize ZMQ sockets between storage unit and controller for handshake.""" if not isinstance(self.controller_info, ZMQServerInfo): raise ValueError(f"controller_info should be ZMQServerInfo, but got {type(self.controller_info)}") - sync_zmq_context = zmq.Context() try: + # Create a synchronous context for handshake (blocking operation) + sync_zmq_context = zmq.Context() + + # create zmq socket for handshake (sync, for initial connection) self.controller_handshake_socket = create_zmq_socket( ctx=sync_zmq_context, socket_type=zmq.DEALER, ip=self.controller_info.ip, identity=f"{self.storage_manager_id}-controller_handshake_socket-{uuid4().hex[:8]}".encode(), ) + + # do handshake with controller using sync socket self._do_handshake_with_controller() - except Exception as e: - logger.error(f"Failed to connect to controller: {e}") - raise - finally: + + # close the sync handshake socket and context after handshake if self.controller_handshake_socket and not self.controller_handshake_socket.closed: self.controller_handshake_socket.close(linger=0) self.controller_handshake_socket = None sync_zmq_context.term() + except Exception as e: + logger.error(f"Failed to connect to controller: {e}") + raise + def _do_handshake_with_controller(self) -> None: """Handshake with controller to establish connection with retransmission mechanism.""" is_connected: bool = False @@ -222,8 +230,7 @@ async def notify_data_update( if len(per_sample_shapes) != len(global_indexes): raise ValueError( f"per_sample_shapes length ({len(per_sample_shapes)}) does not match " - f"number of global_indexes ({len(global_indexes)}) for field '{field_name}'; " - f"skipping per_sample_shapes normalization." + f"number of global_indexes ({len(global_indexes)}) for field '{field_name}'. " ) field_copy["per_sample_shapes"] = { global_indexes[i]: per_sample_shapes[i] for i in range(len(global_indexes)) @@ -249,9 +256,10 @@ async def notify_data_update( async def _notify_and_wait(self, request_msg: list) -> None: """Send a data status notification to the controller and block until ACK is received.""" - zmq_ctx = zmq.asyncio.Context() identity = f"{self.storage_manager_id}-notify-{uuid4().hex[:8]}".encode() - sock = create_zmq_socket(ctx=zmq_ctx, socket_type=zmq.DEALER, ip=self.controller_info.ip, identity=identity) + sock = create_zmq_socket( + ctx=self.zmq_ctx, socket_type=zmq.DEALER, ip=self.controller_info.ip, identity=identity + ) sock.setsockopt(zmq.LINGER, 0) sock.connect(self.controller_info.to_addr("request_handle_socket")) @@ -283,15 +291,17 @@ async def _notify_and_wait(self, request_msg: list) -> None: break except asyncio.TimeoutError: timeout -= poll_interval + except Exception as e: + logger.warning(f"[{self.storage_manager_id}]: Error receiving response: {e}") + break if not response_received: - raise TimeoutError( + logger.error( f"[{self.storage_manager_id}]: Timeout waiting for data status update ACK " f"from controller after {TQ_DATA_UPDATE_RESPONSE_TIMEOUT}s." ) finally: sock.close() - zmq_ctx.term() @abstractmethod async def put_data( @@ -352,6 +362,9 @@ def close(self) -> None: self._notify_loop.call_soon_threadsafe(self._notify_loop.stop) self._notify_thread.join(timeout=5) + + self.zmq_ctx.term() + logger.debug(f"[{self.storage_manager_id}]: Notify ZMQ thread shut down.") def __del__(self): From aa075100861c627df96a6f7477f258d4ad13d8b3 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Fri, 5 Jun 2026 18:06:59 +0800 Subject: [PATCH 6/9] fix Signed-off-by: 0oshowero0 --- transfer_queue/storage/managers/base.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index fcae7d6..4c53bb8 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -67,11 +67,11 @@ def __init__(self, controller_info: ZMQServerInfo, config: DictConfig): self.storage_manager_id = f"TQ_STORAGE_{uuid4().hex[:8]}" self.config = config self.controller_info = controller_info - self.zmq_ctx = zmq.asyncio.Context() # Handshake socket is sync (used only during initialization) self.controller_handshake_socket: zmq.Socket | None = None + self.zmq_context: zmq.asyncio.Context | None = None self._connect_to_controller() # Dedicated asyncio loop for ZMQ notify traffic, isolated from the caller's loop @@ -110,6 +110,9 @@ def _connect_to_controller(self) -> None: self.controller_handshake_socket = None sync_zmq_context.term() + # create async context for data status update + self.zmq_context = zmq.asyncio.Context() + except Exception as e: logger.error(f"Failed to connect to controller: {e}") raise @@ -258,7 +261,7 @@ async def _notify_and_wait(self, request_msg: list) -> None: """Send a data status notification to the controller and block until ACK is received.""" identity = f"{self.storage_manager_id}-notify-{uuid4().hex[:8]}".encode() sock = create_zmq_socket( - ctx=self.zmq_ctx, socket_type=zmq.DEALER, ip=self.controller_info.ip, identity=identity + ctx=self.zmq_context, socket_type=zmq.DEALER, ip=self.controller_info.ip, identity=identity ) sock.setsockopt(zmq.LINGER, 0) sock.connect(self.controller_info.to_addr("request_handle_socket")) @@ -301,7 +304,11 @@ async def _notify_and_wait(self, request_msg: list) -> None: f"from controller after {TQ_DATA_UPDATE_RESPONSE_TIMEOUT}s." ) finally: - sock.close() + try: + if not sock.closed: + sock.close(linger=-1) + except Exception: + pass @abstractmethod async def put_data( @@ -363,7 +370,7 @@ def close(self) -> None: self._notify_loop.call_soon_threadsafe(self._notify_loop.stop) self._notify_thread.join(timeout=5) - self.zmq_ctx.term() + self.zmq_context.term() logger.debug(f"[{self.storage_manager_id}]: Notify ZMQ thread shut down.") From 4ff1496a1913eb7564af3fcb5a79090f5206d5c2 Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Fri, 5 Jun 2026 18:10:43 +0800 Subject: [PATCH 7/9] fix Signed-off-by: 0oshowero0 --- transfer_queue/storage/managers/base.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 4c53bb8..1c1d83a 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -71,7 +71,7 @@ def __init__(self, controller_info: ZMQServerInfo, config: DictConfig): # Handshake socket is sync (used only during initialization) self.controller_handshake_socket: zmq.Socket | None = None - self.zmq_context: zmq.asyncio.Context | None = None + self.zmq_context = zmq.asyncio.Context() self._connect_to_controller() # Dedicated asyncio loop for ZMQ notify traffic, isolated from the caller's loop @@ -110,9 +110,6 @@ def _connect_to_controller(self) -> None: self.controller_handshake_socket = None sync_zmq_context.term() - # create async context for data status update - self.zmq_context = zmq.asyncio.Context() - except Exception as e: logger.error(f"Failed to connect to controller: {e}") raise From 62ea06a950966ee18f2f1158f50e53aa383736cc Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Mon, 8 Jun 2026 09:46:03 +0800 Subject: [PATCH 8/9] fix close() Signed-off-by: 0oshowero0 --- transfer_queue/storage/managers/base.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 1c1d83a..146522a 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -354,6 +354,7 @@ async def clear_data(self, metadata: BatchMeta) -> None: def close(self) -> None: """Close all ZMQ sockets/contexts and stop the notify loop.""" + if self.controller_handshake_socket: try: if not self.controller_handshake_socket.closed: @@ -361,11 +362,9 @@ def close(self) -> None: except Exception as e: logger.error(f"[{self.storage_manager_id}]: Error closing controller_handshake_socket: {str(e)}") - if not hasattr(self, "_notify_loop") or not self._notify_loop.is_running(): - return - - self._notify_loop.call_soon_threadsafe(self._notify_loop.stop) - self._notify_thread.join(timeout=5) + if hasattr(self, "_notify_loop") and self._notify_loop.is_running(): + self._notify_loop.call_soon_threadsafe(self._notify_loop.stop) + self._notify_thread.join(timeout=5) self.zmq_context.term() From 0d989e27f4f5f63e9f766d74f8f71ce22e639def Mon Sep 17 00:00:00 2001 From: 0oshowero0 Date: Mon, 8 Jun 2026 09:56:23 +0800 Subject: [PATCH 9/9] fix Signed-off-by: 0oshowero0 --- transfer_queue/storage/managers/base.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/transfer_queue/storage/managers/base.py b/transfer_queue/storage/managers/base.py index 146522a..f4d545d 100644 --- a/transfer_queue/storage/managers/base.py +++ b/transfer_queue/storage/managers/base.py @@ -303,7 +303,7 @@ async def _notify_and_wait(self, request_msg: list) -> None: finally: try: if not sock.closed: - sock.close(linger=-1) + sock.close(linger=0) except Exception: pass @@ -364,11 +364,15 @@ def close(self) -> None: if hasattr(self, "_notify_loop") and self._notify_loop.is_running(): self._notify_loop.call_soon_threadsafe(self._notify_loop.stop) - self._notify_thread.join(timeout=5) - self.zmq_context.term() + if hasattr(self, "_notify_thread") and self._notify_thread is not None: + self._notify_thread.join(timeout=5.0) + if self._notify_thread.is_alive(): + logger.warning(f"[{self.storage_manager_id}]: Notify ZMQ thread did not stop within 5 second timeout.") + else: + logger.debug(f"[{self.storage_manager_id}]: Notify ZMQ thread shut down.") - logger.debug(f"[{self.storage_manager_id}]: Notify ZMQ thread shut down.") + self.zmq_context.term() def __del__(self): """Destructor to ensure resources are cleaned up."""