diff --git a/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala b/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala index ca82381eec9e3..9d4475c8f042f 100644 --- a/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala +++ b/sql/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala @@ -26,6 +26,7 @@ import ammonite.compiler.iface.CodeWrapper import ammonite.interp.{Interpreter, Watchable} import ammonite.main.Defaults import ammonite.repl.Repl +import ammonite.runtime.Storage import ammonite.util.{Bind, Imports, Name, PredefInfo, Ref, Res, Util} import ammonite.util.Util.newLine @@ -100,11 +101,43 @@ Spark session available as 'spark'. | |spark.registerClassFinder(new AmmoniteClassFinder(repl.sess)) |""".stripMargin + val main = newAmmoniteMain( + welcomeBannerText = splash.format(spark_version, spark.version), + predefCode = predefCode, + inputStream = inputStream, + outputStream = outputStream, + errorStream = errorStream) + + if (semaphore.nonEmpty) { + // Used for testing. + main.run(sparkBind, new Bind[Semaphore]("semaphore", semaphore.get)) + } else { + main.run(sparkBind) + } + } + + /** + * Construct the [[ammonite.Main]] used by the Spark Connect REPL. + * + * The default [[ammonite.runtime.Storage.Folder]] backend persists Ammonite's compile cache + * under `~/.ammonite`. When the Connect REPL is started a second time from the same working + * directory, the cached `CodePredef` class is reloaded but its reference to the per-session + * `ArgsPredef` helper is stale, which causes a `NullPointerException` during predef + * initialization (SPARK-56448). Using [[ammonite.runtime.Storage.InMemory]] keeps the compile + * cache scoped to a single REPL session and avoids the stale-cache failure on restart. + */ + private[application] def newAmmoniteMain( + welcomeBannerText: String, + predefCode: String, + inputStream: InputStream, + outputStream: OutputStream, + errorStream: OutputStream): ammonite.Main = { // Please note that we make ammonite generate classes instead of objects. // Classes tend to have superior serialization behavior when using UDFs. - val main = new ammonite.Main( - welcomeBanner = Option(splash.format(spark_version, spark.version)), + new ammonite.Main( + welcomeBanner = Option(welcomeBannerText), predefCode = predefCode, + storageBackend = new Storage.InMemory(), replCodeWrapper = ExtendedCodeClassWrapper, scriptCodeWrapper = ExtendedCodeClassWrapper, inputStream = inputStream, @@ -160,13 +193,6 @@ Spark session available as 'spark'. } } } - - if (semaphore.nonEmpty) { - // Used for testing. - main.run(sparkBind, new Bind[Semaphore]("semaphore", semaphore.get)) - } else { - main.run(sparkBind) - } } } diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ConnectReplSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ConnectReplSuite.scala new file mode 100644 index 0000000000000..907a53e4a7379 --- /dev/null +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ConnectReplSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.application + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} + +import ammonite.util.Bind + +import org.apache.spark.sql.connect.test.ConnectFunSuite + +class ConnectReplSuite extends ConnectFunSuite { + + test("SPARK-56448: restarting the Connect REPL does not throw NPE") { + // Reproduces the scenario from SPARK-56448: prior to the fix, starting the Connect + // REPL a second time from the same working directory caused Ammonite to reload a + // cached `CodePredef` whose reference to the per-session `ArgsPredef` helper was + // stale, producing a `NullPointerException` during predef initialization. The fix + // wires `Storage.InMemory` in `ConnectRepl.newAmmoniteMain` so nothing is persisted + // across sessions. This test exercises `newAmmoniteMain` twice with the same predef + // and bind setup and asserts that neither start throws. + def startRepl(): Unit = { + val in = new ByteArrayInputStream(Array.emptyByteArray) + val out = new ByteArrayOutputStream() + val err = new ByteArrayOutputStream() + val main = ConnectRepl.newAmmoniteMain( + welcomeBannerText = "", + predefCode = "val x = 1", + inputStream = in, + outputStream = out, + errorStream = err) + // A `Bind` is required to exercise the `ArgsPredef` → `CodePredef` wiring that is + // the root of the NPE in the unfixed code path. + main.run(new Bind("testVal", "hello")) + } + + // First start: compiles the predefs. + startRepl() + + // Second start: without the fix this threw `NullPointerException` during predef + // initialization. With the fix it completes normally. + startRepl() + } +}