Skip to content

[SPARK-56324] Introducing message-based communication to Spark -> PySpark communication channel#55716

Open
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:sven-weber_data/spark-56324
Open

[SPARK-56324] Introducing message-based communication to Spark -> PySpark communication channel#55716
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:sven-weber_data/spark-56324

Conversation

@sven-weber-db
Copy link
Copy Markdown
Contributor

@sven-weber-db sven-weber-db commented May 6, 2026

What changes were proposed in this pull request?

This is the second in a series of PRs that introduce message-based communication to PySpark UDFs. This initiative is part of SPIP SPARK-55278, which proposes language-agnostic UDFs. This PR builds on top of the changes from PR #55515.

The goal of introducing message-based communication to PySpark is to:

  1. Make the communication between Spark <-> PySpark more structured.
  2. Enable new communication protocols (e.g., gRPC) transparently.

The overall goal is to introduce a second communication channel while keeping the existing channel intact. Specifically, we want to introduce gRPC in addition to Unix Domain Sockets (UDS). The existing UDS channel will not be changed, and its characteristics, including performance, will remain untouched.

This PR specifically propose the following changes:

  1. PythonRunner.scala - Add a new message header and a length field to the initialization data/message send from Spark to PySpark. This change is required to distinguish the initial message from other, later, messages. It constitutes the only required change in the Spark -> PySpark wire protocol.
  2. Add new abstractions to read Spark -> PySpark messages from the existing socket channel - including the new init message
  3. Change worker.py to use the new socket message reader to process the UDF request

With these changes, a new message reader can be implemented and transparently use for other transport channels (e.g. gRPC).

Why are the changes needed?

The changes introduced here make PySpark transport layer agnostic for the Spark -> PySpark channel. This is required for PySpark to support the new, language agnostic UDF protocol proposed in SPIP SPARK-55278. Follow-up PRs will address the PySpark -> Spark communication in a similar manner.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existing test suites:

PySpark

pyspark.tests.test_worker
pyspark.sql.tests.test_udf
pyspark.sql.tests.test_udtf
pyspark.sql.tests.pandas.test_pandas_udf_scalar
pyspark.sql.tests.arrow.test_arrow_udf_scalar
pyspark.sql.tests.arrow.test_arrow_udf
pyspark.sql.tests.arrow.test_arrow_grouped_map
pyspark.sql.tests.arrow.test_arrow_cogrouped_map
pyspark.tests.test_taskcontext
pyspark.sql.tests.test_python_datasource

Spark

org.apache.spark.sql.execution.python.PythonUDFSuite
org.apache.spark.sql.execution.python.PythonUDTFSuite
org.apache.spark.sql.execution.python.ArrowColumnarPythonUDFSuite
org.apache.spark.sql.execution.python.BatchEvalPythonExecSuite
org.apache.spark.sql.execution.python.PythonDataSourceSuite
org.apache.spark.sql.execution.python.PythonWorkerLogsSuite

Was this patch authored or co-authored using generative AI tooling?

No

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/spark-56324 branch from c0943bb to d22ee1a Compare May 7, 2026 12:22
return None
s = stream.read(length)
return s.decode("utf-8") if self.use_unicode else s
return codecs.decode(s, "utf-8") if self.use_unicode else s
Copy link
Copy Markdown
Contributor Author

@sven-weber-db sven-weber-db May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is required because we use the ZeroCopyByteStream class now to read the initial message. ZeroCopyByteStream returns memoryview objects instead of bytes or bytearray. s.decode is only implemented for s{bytes, bytearray}.

According to the Python documentation, for bytes.decode and codecs.decode the later implementation may behave different in case of errors.

bytes.decode states that

errors controls how decoding errors are handled. If 'strict' (the default), a UnicodeError exception is raised.

codecs.decode states that

The default error handler is 'strict' meaning that decoding errors raise ValueError (or a more codec specific subclass, such as UnicodeDecodeError)

As decoding errors are unexpected and the specific Exception type that is thrown should not matter, I believe this change is safe. However, if there are concerns we can change this implementation to the following:

return bytes(s).decode("utf-8") if self.use_unicode else s

This alternative implementation will invoke a memory copy to copy the memoryview contents into a bytes object before decoding. Given that this call is only invoked on deserialization of the initial message, this memory copy is probably acceptable as it is a one time cost.

Comment thread python/pyspark/worker.py

@with_faulthandler
def main(infile, outfile):
def invoke_udf(message_receiver: SparkMessageReceiver, outfile: BinaryIO):
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to rename this function if another name is prefered

@sven-weber-db sven-weber-db marked this pull request as ready for review May 7, 2026 12:45
@sven-weber-db sven-weber-db force-pushed the sven-weber_data/spark-56324 branch from d22ee1a to 5fa4213 Compare May 7, 2026 17:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant