Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 42 additions & 2 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,14 @@ class SparkConnectClient(object):
Conceptually the remote spark session that communicates with the server
"""

# Thread id currently executing a best-effort ML-cache RPC (clean_cache / delete), or None.
# Used to detect re-entrant ML-cache RPCs on the same thread: a CPython finalizer
# (RemoteModelRef.__del__ -> del_remote_cache -> _delete_ml_cache) can fire while the GIL is
# released inside a blocking ML-cache RPC, issuing a second blocking RPC on the same thread
# that deadlocks the gRPC channel and hangs until the test/process timeout. See the guards in
# _cleanup_ml_cache / _delete_ml_cache.
_ml_cache_rpc_thread: Optional[int] = None

def __init__(
self,
connection: Union[str, ChannelBuilder],
Expand Down Expand Up @@ -2397,12 +2405,31 @@ def _delete_ml_cache(self, cache_ids: List[str], evict_only: bool = False) -> Li
# try best to delete the cache
try:
if len(cache_ids) > 0:
# Re-entrancy guard: this is reachable from a RemoteModelRef finalizer
# (__del__ -> del_remote_cache), which CPython may run on this thread while the
# GIL is released inside another in-flight ML-cache RPC (e.g. _cleanup_ml_cache's
# blocking call). Issuing a second blocking RPC re-entrantly can deadlock the gRPC
# channel and hang until the test/process timeout. The nested delete is redundant
# (the in-flight cleanup/delete is already releasing server-side state, and the
# server evicts on session end), so skip it and log so a recurrence in scheduled
# jobs is visible instead of a silent multi-minute hang.
if self._ml_cache_rpc_thread == threading.get_ident():
logger.warning(
"Skipping re-entrant ML cache delete of %s object ref(s) while another "
"ML-cache RPC is in flight on this thread (avoids a re-entrant gRPC hang).",
len(cache_ids),
)
return []
command = pb2.Command()
command.ml_command.delete.obj_refs.extend(
[pb2.ObjectRef(id=cache_id) for cache_id in cache_ids]
)
command.ml_command.delete.evict_only = evict_only
_, properties, _ = self.execute_command(command)
self._ml_cache_rpc_thread = threading.get_ident()
try:
_, properties, _ = self.execute_command(command)
finally:
self._ml_cache_rpc_thread = None

assert properties is not None

Expand Down Expand Up @@ -2435,9 +2462,22 @@ def _on_exit(self) -> None:

def _cleanup_ml_cache(self) -> None:
try:
# See _delete_ml_cache for the re-entrancy rationale. If a finalizer-driven ML-cache
# RPC is already in flight on this thread, skip this nested cleanup rather than risk a
# re-entrant gRPC hang; the in-flight RPC plus server-side session eviction cover it.
if self._ml_cache_rpc_thread == threading.get_ident():
logger.warning(
"Skipping re-entrant ML cache cleanup while another ML-cache RPC is in flight "
"on this thread (avoids a re-entrant gRPC hang)."
)
return
command = pb2.Command()
command.ml_command.clean_cache.SetInParent()
self.execute_command(command)
self._ml_cache_rpc_thread = threading.get_ident()
try:
self.execute_command(command)
finally:
self._ml_cache_rpc_thread = None
except Exception:
pass

Expand Down