diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 6e0d4cbcf1ef7..db6067f25e096 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -703,6 +703,14 @@ class SparkConnectClient(object): Conceptually the remote spark session that communicates with the server """ + # Thread id currently executing a best-effort ML-cache RPC (clean_cache / delete), or None. + # Used to detect re-entrant ML-cache RPCs on the same thread: a CPython finalizer + # (RemoteModelRef.__del__ -> del_remote_cache -> _delete_ml_cache) can fire while the GIL is + # released inside a blocking ML-cache RPC, issuing a second blocking RPC on the same thread + # that deadlocks the gRPC channel and hangs until the test/process timeout. See the guards in + # _cleanup_ml_cache / _delete_ml_cache. + _ml_cache_rpc_thread: Optional[int] = None + def __init__( self, connection: Union[str, ChannelBuilder], @@ -2397,12 +2405,31 @@ def _delete_ml_cache(self, cache_ids: List[str], evict_only: bool = False) -> Li # try best to delete the cache try: if len(cache_ids) > 0: + # Re-entrancy guard: this is reachable from a RemoteModelRef finalizer + # (__del__ -> del_remote_cache), which CPython may run on this thread while the + # GIL is released inside another in-flight ML-cache RPC (e.g. _cleanup_ml_cache's + # blocking call). Issuing a second blocking RPC re-entrantly can deadlock the gRPC + # channel and hang until the test/process timeout. The nested delete is redundant + # (the in-flight cleanup/delete is already releasing server-side state, and the + # server evicts on session end), so skip it and log so a recurrence in scheduled + # jobs is visible instead of a silent multi-minute hang. + if self._ml_cache_rpc_thread == threading.get_ident(): + logger.warning( + "Skipping re-entrant ML cache delete of %s object ref(s) while another " + "ML-cache RPC is in flight on this thread (avoids a re-entrant gRPC hang).", + len(cache_ids), + ) + return [] command = pb2.Command() command.ml_command.delete.obj_refs.extend( [pb2.ObjectRef(id=cache_id) for cache_id in cache_ids] ) command.ml_command.delete.evict_only = evict_only - _, properties, _ = self.execute_command(command) + self._ml_cache_rpc_thread = threading.get_ident() + try: + _, properties, _ = self.execute_command(command) + finally: + self._ml_cache_rpc_thread = None assert properties is not None @@ -2435,9 +2462,22 @@ def _on_exit(self) -> None: def _cleanup_ml_cache(self) -> None: try: + # See _delete_ml_cache for the re-entrancy rationale. If a finalizer-driven ML-cache + # RPC is already in flight on this thread, skip this nested cleanup rather than risk a + # re-entrant gRPC hang; the in-flight RPC plus server-side session eviction cover it. + if self._ml_cache_rpc_thread == threading.get_ident(): + logger.warning( + "Skipping re-entrant ML cache cleanup while another ML-cache RPC is in flight " + "on this thread (avoids a re-entrant gRPC hang)." + ) + return command = pb2.Command() command.ml_command.clean_cache.SetInParent() - self.execute_command(command) + self._ml_cache_rpc_thread = threading.get_ident() + try: + self.execute_command(command) + finally: + self._ml_cache_rpc_thread = None except Exception: pass