[SPARK-56324] Introducing message-based communication to Spark -> PySpark communication channel#55716
Conversation
c0943bb to
d22ee1a
Compare
| return None | ||
| s = stream.read(length) | ||
| return s.decode("utf-8") if self.use_unicode else s | ||
| return codecs.decode(s, "utf-8") if self.use_unicode else s |
There was a problem hiding this comment.
This change is required because we use the ZeroCopyByteStream class now to read the initial message. ZeroCopyByteStream returns memoryview objects instead of bytes or bytearray. s.decode is only implemented for s ∈ {bytes, bytearray}.
According to the Python documentation, for bytes.decode and codecs.decode the later implementation may behave different in case of errors.
bytes.decode states that
errors controls how decoding errors are handled. If 'strict' (the default), a UnicodeError exception is raised.
codecs.decode states that
The default error handler is 'strict' meaning that decoding errors raise ValueError (or a more codec specific subclass, such as UnicodeDecodeError)
As decoding errors are unexpected and the specific Exception type that is thrown should not matter, I believe this change is safe. However, if there are concerns we can change this implementation to the following:
return bytes(s).decode("utf-8") if self.use_unicode else sThis alternative implementation will invoke a memory copy to copy the memoryview contents into a bytes object before decoding. Given that this call is only invoked on deserialization of the initial message, this memory copy is probably acceptable as it is a one time cost.
|
|
||
| @with_faulthandler | ||
| def main(infile, outfile): | ||
| def invoke_udf(message_receiver: SparkMessageReceiver, outfile: BinaryIO): |
There was a problem hiding this comment.
Happy to rename this function if another name is prefered
…park communication channel
d22ee1a to
5fa4213
Compare
What changes were proposed in this pull request?
This is the second in a series of PRs that introduce message-based communication to PySpark UDFs. This initiative is part of SPIP SPARK-55278, which proposes language-agnostic UDFs. This PR builds on top of the changes from PR #55515.
The goal of introducing message-based communication to PySpark is to:
The overall goal is to introduce a second communication channel while keeping the existing channel intact. Specifically, we want to introduce gRPC in addition to Unix Domain Sockets (UDS). The existing UDS channel will not be changed, and its characteristics, including performance, will remain untouched.
This PR specifically propose the following changes:
PythonRunner.scala- Add a new message header and a length field to the initialization data/message send from Spark to PySpark. This change is required to distinguish the initial message from other, later, messages. It constitutes the only required change in the Spark -> PySpark wire protocol.worker.pyto use the new socket message reader to process the UDF requestWith these changes, a new message reader can be implemented and transparently use for other transport channels (e.g. gRPC).
Why are the changes needed?
The changes introduced here make PySpark transport layer agnostic for the Spark -> PySpark channel. This is required for PySpark to support the new, language agnostic UDF protocol proposed in SPIP SPARK-55278. Follow-up PRs will address the PySpark -> Spark communication in a similar manner.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing test suites:
PySpark
Spark
Was this patch authored or co-authored using generative AI tooling?
No