From f8868ece7a4d04b55ab7082d11f8c31720a034f8 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 24 Jun 2026 14:23:37 +0900 Subject: [PATCH] [SPARK-57655][CONNECT][PYTHON] Avoid re-entrant Spark Connect ML cache cleanup RPC hang ### What changes were proposed in this pull request? Add a same-thread re-entrancy guard around the best-effort ML-cache RPCs `SparkConnectClient._cleanup_ml_cache` / `_delete_ml_cache`. If one is already in flight on the current thread, the nested call is skipped and a WARNING is logged instead of issuing a second blocking RPC. ### Why are the changes needed? A rare CI hang (e.g. `pyspark.ml.tests.connect.test_parity_clustering` timing out at 450s) traces to a re-entrant ML-cache RPC: while a cleanup/delete RPC is blocked in gRPC with the GIL released, CPython runs a pending `RemoteModelRef` finalizer (`__del__` -> `del_remote_cache` -> `_delete_ml_cache`) on the same thread, issuing a second blocking RPC that deadlocks the channel until the process/test timeout. The nested call is redundant (the in-flight RPC is already releasing server state, also evicted on session end), so skipping it is safe. The WARNING turns a silent multi-minute hang into an observable, attributable signal if it recurs in scheduled jobs. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? The underlying hang is a rare, timing-dependent flake that cannot be reproduced on demand, so this cannot be proven to eliminate it. It is a no-regression safety net: in normal operation no ML-cache RPC is in flight when another is issued. Verified on a fork by building Spark Connect and running `test_parity_clustering` 15x (each run actually executed, ~30s; not skipped): all 15 passed. ### Was this patch authored or co-authored using generative AI tooling? Yes, Generated-by: Claude Code Co-authored-by: Isaac --- python/pyspark/sql/connect/client/core.py | 44 +++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 6e0d4cbcf1ef7..db6067f25e096 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -703,6 +703,14 @@ class SparkConnectClient(object): Conceptually the remote spark session that communicates with the server """ + # Thread id currently executing a best-effort ML-cache RPC (clean_cache / delete), or None. + # Used to detect re-entrant ML-cache RPCs on the same thread: a CPython finalizer + # (RemoteModelRef.__del__ -> del_remote_cache -> _delete_ml_cache) can fire while the GIL is + # released inside a blocking ML-cache RPC, issuing a second blocking RPC on the same thread + # that deadlocks the gRPC channel and hangs until the test/process timeout. See the guards in + # _cleanup_ml_cache / _delete_ml_cache. + _ml_cache_rpc_thread: Optional[int] = None + def __init__( self, connection: Union[str, ChannelBuilder], @@ -2397,12 +2405,31 @@ def _delete_ml_cache(self, cache_ids: List[str], evict_only: bool = False) -> Li # try best to delete the cache try: if len(cache_ids) > 0: + # Re-entrancy guard: this is reachable from a RemoteModelRef finalizer + # (__del__ -> del_remote_cache), which CPython may run on this thread while the + # GIL is released inside another in-flight ML-cache RPC (e.g. _cleanup_ml_cache's + # blocking call). Issuing a second blocking RPC re-entrantly can deadlock the gRPC + # channel and hang until the test/process timeout. The nested delete is redundant + # (the in-flight cleanup/delete is already releasing server-side state, and the + # server evicts on session end), so skip it and log so a recurrence in scheduled + # jobs is visible instead of a silent multi-minute hang. + if self._ml_cache_rpc_thread == threading.get_ident(): + logger.warning( + "Skipping re-entrant ML cache delete of %s object ref(s) while another " + "ML-cache RPC is in flight on this thread (avoids a re-entrant gRPC hang).", + len(cache_ids), + ) + return [] command = pb2.Command() command.ml_command.delete.obj_refs.extend( [pb2.ObjectRef(id=cache_id) for cache_id in cache_ids] ) command.ml_command.delete.evict_only = evict_only - _, properties, _ = self.execute_command(command) + self._ml_cache_rpc_thread = threading.get_ident() + try: + _, properties, _ = self.execute_command(command) + finally: + self._ml_cache_rpc_thread = None assert properties is not None @@ -2435,9 +2462,22 @@ def _on_exit(self) -> None: def _cleanup_ml_cache(self) -> None: try: + # See _delete_ml_cache for the re-entrancy rationale. If a finalizer-driven ML-cache + # RPC is already in flight on this thread, skip this nested cleanup rather than risk a + # re-entrant gRPC hang; the in-flight RPC plus server-side session eviction cover it. + if self._ml_cache_rpc_thread == threading.get_ident(): + logger.warning( + "Skipping re-entrant ML cache cleanup while another ML-cache RPC is in flight " + "on this thread (avoids a re-entrant gRPC hang)." + ) + return command = pb2.Command() command.ml_command.clean_cache.SetInParent() - self.execute_command(command) + self._ml_cache_rpc_thread = threading.get_ident() + try: + self.execute_command(command) + finally: + self._ml_cache_rpc_thread = None except Exception: pass