Skip to content

[SPARK-56661] Implementing UDFDispatcherManager for new UDF worker sessions#55712

Open
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:sven-weber_data/SPARK-56661-udf-changes
Open

[SPARK-56661] Implementing UDFDispatcherManager for new UDF worker sessions#55712
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:sven-weber_data/SPARK-56661-udf-changes

Conversation

@sven-weber-db
Copy link
Copy Markdown
Contributor

@sven-weber-db sven-weber-db commented May 6, 2026

What changes were proposed in this pull request?

This PR implements a UDFDispatcherManager class in the new /udf package that was initiated by SPIP SPARK-55278. The purpose of the new Manager class is to provide a single entry-point for Spark with which a UDF session to a external UDF worker can be created, based on a WorkerSpecification instance. This manager and entry-point will be used by follow-up PRs to implement new, language agnostic Catalyst nodes.

Why are the changes needed?

The UDFWorkerManager serves two main purposes:

  1. Provide a single, unified entry-point to Spark for UDF worker/session creation
  2. Implement the management of UDF WorkerDispachter classes - depending on the UDFWorkerSpecification they are created for. This is required because the newly proposed UDF framework from SPIP SPARK-55278, enables clients to specify different UDF dispatchers for their UDFs. This implies:

2.1. Multiple, different dispatchers can exist at the same time
-> The right one needs to be selected to create a UDF session
2.2. Dispatcher lifetime needs to be managed
-> Dispatchers and their resources need to be cleaned-up if they are no longer needed by clients

Does this PR introduce any user-facing change?

No - All changes are marked as Experimental and not yet consumed.

How was this patch tested?

New unit-tests where added for the changes in the UDFDispatcherManager and WorkerSession

Was this patch authored or co-authored using generative AI tooling?

Partially. However, the code was manually reviewed and adjusted.

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661-udf-changes branch 2 times, most recently from 83e1033 to 184fc44 Compare May 6, 2026 14:25
@sven-weber-db sven-weber-db changed the title [SPARK-56324] Implementing UDFWorkerManager for new UDF worker sessions [SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions May 6, 2026
@sven-weber-db sven-weber-db marked this pull request as ready for review May 6, 2026 14:43

// Must be called while holding `lock`.
private def handleSessionTermination(
workerSpec: UDFWorkerSpecification
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we shall also pass the session object here?

Copy link
Copy Markdown
Contributor Author

@sven-weber-db sven-weber-db May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean move the activeSessions.remove(session) call into this function like so?

private def handleSessionTermination(
     session: WorkerSession,
     workerSpec: UDFWorkerSpecification
  ): Unit = {
    activeSessions.remove(session)
    
    val entry = dispatchers.get(workerSpec)
    // Note: entry == null is unexpected and should
    // throw here.
    entry.activeSessionCount -= 1
    if (entry.activeSessionCount == 0) {
      logger.info("All sessions closed for dispatcher " +
        s"${entry.dispatcher.dispatcherId}, removing from cache")
      dispatchers.remove(workerSpec)
      onAllDispatcherSessionsClosed(entry.dispatcher)
    }
  }

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661-udf-changes branch 2 times, most recently from c1e56ab to 7c77e8d Compare May 7, 2026 08:54
@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661-udf-changes branch from 7c77e8d to 1f11959 Compare May 7, 2026 11:57
@sven-weber-db sven-weber-db changed the title [SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions [SPARK-56661] Implementing UDFDispatcherManager for new UDF worker sessions May 7, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants