Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,9 @@ public boolean contains(final UTF8String substring) {
* Returns the byte at (byte) position `byteIndex`. If byte index is invalid, returns 0.
*/
public byte getByte(int byteIndex) {
if (byteIndex < 0 || byteIndex >= numBytes) {
return 0;
}
return Platform.getByte(base, offset + byteIndex);
}

Expand Down
44 changes: 42 additions & 2 deletions python/pyspark/sql/connect/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,14 @@ class SparkConnectClient(object):
Conceptually the remote spark session that communicates with the server
"""

# Thread id currently executing a best-effort ML-cache RPC (clean_cache / delete), or None.
# Used to detect re-entrant ML-cache RPCs on the same thread: a CPython finalizer
# (RemoteModelRef.__del__ -> del_remote_cache -> _delete_ml_cache) can fire while the GIL is
# released inside a blocking ML-cache RPC, issuing a second blocking RPC on the same thread
# that deadlocks the gRPC channel and hangs until the test/process timeout. See the guards in
# _cleanup_ml_cache / _delete_ml_cache.
_ml_cache_rpc_thread: Optional[int] = None

def __init__(
self,
connection: Union[str, ChannelBuilder],
Expand Down Expand Up @@ -2397,12 +2405,31 @@ def _delete_ml_cache(self, cache_ids: List[str], evict_only: bool = False) -> Li
# try best to delete the cache
try:
if len(cache_ids) > 0:
# Re-entrancy guard: this is reachable from a RemoteModelRef finalizer
# (__del__ -> del_remote_cache), which CPython may run on this thread while the
# GIL is released inside another in-flight ML-cache RPC (e.g. _cleanup_ml_cache's
# blocking call). Issuing a second blocking RPC re-entrantly can deadlock the gRPC
# channel and hang until the test/process timeout. The nested delete is redundant
# (the in-flight cleanup/delete is already releasing server-side state, and the
# server evicts on session end), so skip it and log so a recurrence in scheduled
# jobs is visible instead of a silent multi-minute hang.
if self._ml_cache_rpc_thread == threading.get_ident():
logger.warning(
"Skipping re-entrant ML cache delete of %s object ref(s) while another "
"ML-cache RPC is in flight on this thread (avoids a re-entrant gRPC hang).",
len(cache_ids),
)
return []
command = pb2.Command()
command.ml_command.delete.obj_refs.extend(
[pb2.ObjectRef(id=cache_id) for cache_id in cache_ids]
)
command.ml_command.delete.evict_only = evict_only
_, properties, _ = self.execute_command(command)
self._ml_cache_rpc_thread = threading.get_ident()
try:
_, properties, _ = self.execute_command(command)
finally:
self._ml_cache_rpc_thread = None

assert properties is not None

Expand Down Expand Up @@ -2435,9 +2462,22 @@ def _on_exit(self) -> None:

def _cleanup_ml_cache(self) -> None:
try:
# See _delete_ml_cache for the re-entrancy rationale. If a finalizer-driven ML-cache
# RPC is already in flight on this thread, skip this nested cleanup rather than risk a
# re-entrant gRPC hang; the in-flight RPC plus server-side session eviction cover it.
if self._ml_cache_rpc_thread == threading.get_ident():
logger.warning(
"Skipping re-entrant ML cache cleanup while another ML-cache RPC is in flight "
"on this thread (avoids a re-entrant gRPC hang)."
)
return
command = pb2.Command()
command.ml_command.clean_cache.SetInParent()
self.execute_command(command)
self._ml_cache_rpc_thread = threading.get_ident()
try:
self.execute_command(command)
finally:
self._ml_cache_rpc_thread = None
except Exception:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,20 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
val session = spark
import session.implicits._
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
// Run the long-running query through a single call site, and warm it up once before any
// interrupt. Otherwise the very first execution has to ship/fetch the map closure and its
// TypeTag artifact classes to the executor on demand; if an interrupt lands during that
// first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead of
// OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN).
def runMapQuery(sleepMs: Long): Unit = {
spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect()
}
runMapQuery(0)
val q1 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
runMapQuery(30000)
}
val q2 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
runMapQuery(30000)
}
var q1Interrupted = false
var q2Interrupted = false
Expand Down Expand Up @@ -88,6 +97,16 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
@volatile var finished = false
val interrupted = mutable.ListBuffer[String]()

// Run the long-running query through a single call site, and warm it up once before the
// background interruptor starts. Otherwise the first execution has to ship/fetch the map
// closure and its TypeTag artifact classes to the executor on demand; if an interrupt lands
// during that first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead
// of OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN).
def runMapQuery(sleepMs: Long): Unit = {
spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect()
}
runMapQuery(0)

val interruptor = Future {
eventually(timeout(20.seconds), interval(1.seconds)) {
val ids = spark.interruptAll()
Expand All @@ -96,15 +115,22 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
}
finished
}
val e1 = intercept[SparkException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1")
val e2 = intercept[SparkException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
try {
val e1 = intercept[SparkException] {
runMapQuery(30.seconds.toMillis)
}
assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1")
val e2 = intercept[SparkException] {
runMapQuery(30.seconds.toMillis)
}
assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2")
} finally {
// Always release the background interruptor. If an assertion above fails, this prevents the
// interruptor Future from continuing to call interruptAll() and canceling operations of the
// subsequent tests in this suite (which previously caused cascading OPERATION_CANCELED
// failures across the whole suite).
finished = true
}
assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2")
finished = true
assert(awaitResult(interruptor, 10.seconds))
assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ class PythonPipelineSuite
}

test("flow progress events have correct python source code location") {
// `standalone_flow1` writes to its own dedicated streaming table `st2` (rather than appending
// to `table1`, which already has its own implicit flow). Two flows writing concurrently to the
// same file-based streaming table share a single `_spark_metadata` log and race on the batch 0
// commit (`ManifestFileCommitProtocol`: "Race while writing batch 0"), which made this test
// flaky. Keeping each flow on a separate destination removes the race without changing what the
// test verifies (source-code-location propagation for an append flow).
val unresolvedGraph = buildGraph(pythonText = """
|@dp.table(
| comment = 'my table'
Expand All @@ -197,10 +203,12 @@ class PythonPipelineSuite
| return df.select("age")
|
|@dp.append_flow(
| target = 'table1'
| target = 'st2'
|)
|def standalone_flow1():
| return spark.readStream.table('mv2')
|
|dp.create_streaming_table('st2')
|""".stripMargin)

val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph, storageRoot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,18 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
if (fileLog.add(batchId, fileStatuses)) {
logInfo(log"Committed batch ${MDC(BATCH_ID, batchId)}")
} else {
throw new IllegalStateException(s"Race while writing batch $batchId")
// Reaching here means `fileLog.add` found this batchId already committed to the sink
// metadata log at `path`. This is almost always two concurrent streaming queries writing
// to the same output path: they share a single `_spark_metadata` log and cannot coexist.
// Log the path + batchId at ERROR so a recurrence in scheduled jobs is diagnosable from the
// logs alone, without re-reproducing the race.
logError(log"Race while writing batch ${MDC(BATCH_ID, batchId)} to the file sink metadata " +
log"log at ${MDC(PATH, path)}: another writer already committed this batch. This usually " +
log"means multiple concurrent streaming queries are writing to the same output path.")
throw new IllegalStateException(
s"Race while writing batch $batchId to the file sink metadata log at '$path'. Another " +
"writer already committed this batch, which usually means multiple concurrent streaming " +
"queries are writing to the same output path (they share one _spark_metadata log).")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,3 +450,19 @@ select date_format(timestamp_ntz'2023-08-18 09:13:14.123456', 'yyyy-MM-dd HH:mm:
struct<date_format(TIMESTAMP_NTZ '2023-08-18 09:13:14.123456', yyyy-MM-dd HH:mm:ss.SSSSSS):string,date_format(TIMESTAMP_NTZ '2023-08-18 09:13:14.123456', yyyy-MM-dd HH:mm:ss.SSSSSS):string,date_format(TIMESTAMP_NTZ '2023-08-18 09:13:14.123456', yyyy-MM-dd HH:mm:ss.SSSSSS):string>
-- !query output
2023-08-18 09:13:14.123456 2023-08-18 09:13:14.123456 2023-08-18 09:13:14.123456


-- !query
select to_char(TIME'12:13:14', 'HH:mm:ss'), to_varchar(TIME'12:13:14', 'HH:mm:ss')
-- !query schema
struct<date_format(TIME '12:13:14', HH:mm:ss):string,date_format(TIME '12:13:14', HH:mm:ss):string>
-- !query output
12:13:14 12:13:14


-- !query
select to_char(TIME'23:59:59.123456', 'HH:mm:ss.SSSSSS'), to_varchar(TIME'23:59:59.123456', 'HH:mm:ss.SSSSSS')
-- !query schema
struct<date_format(TIME '23:59:59.123456', HH:mm:ss.SSSSSS):string,date_format(TIME '23:59:59.123456', HH:mm:ss.SSSSSS):string>
-- !query output
23:59:59.123456 23:59:59.123456
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.File
import java.time.Duration

import org.apache.hadoop.conf.Configuration
import org.scalatest.time.SpanSugar._

import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.{Encoders, Row}
Expand Down Expand Up @@ -1050,20 +1051,47 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest
.transformWithState(new AggregationStatefulProcessor(),
TimeMode.None(),
OutputMode.Append())

// Block until the async maintenance thread has written the RocksDB snapshot `.zip` for the
// given state-store `version` (matching both `<v>.zip` and checkpoint-v2 `<v>_<uid>.zip`)
// on each of `partitions`. Used right after `version` is committed (while it is the current
// version), since maintenance only ever snapshots the current version. On timeout it prints
// the actual directory contents so a recurrence in scheduled jobs is diagnosable.
def waitForStateSnapshot(version: Long, partitions: Seq[Int]): StreamAction = Execute { _ =>
val opStateDir = new File(tmpDir, "state/0")
eventually(timeout(60.seconds), interval(100.milliseconds)) {
partitions.foreach { partition =>
val partitionDir = new File(opStateDir, partition.toString)
val files = Option(partitionDir.listFiles())
.map(_.map(_.getName).sorted.toSeq).getOrElse(Seq.empty)
val snapshotUploaded = files.exists(_.matches(s"$version(_.*)?\\.zip"))
assert(snapshotUploaded,
s"Snapshot (version $version) for partition $partition was not uploaded in time. " +
s"Contents of $partitionDir: ${files.mkString("[", ", ", "]")}")
}
}
}

testStream(query)(
StartStream(checkpointLocation = tmpDir.getCanonicalPath),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
AddData(inputData, (5, 1L), (6, 2L), (7, 3L), (8, 4L)),
ProcessAllAvailable(),
// State version 2 is now the current version. The snapshotStartBatchId=1 reader below
// needs the version-2 snapshot, and the asynchronous maintenance thread only creates a
// snapshot for the *current* version. So block here, while version 2 is still current,
// until that snapshot has actually been written - this deterministically forces it to
// exist. (A fixed sleep at the *end* is flaky: once later batches advance the current
// version, maintenance never goes back to snapshot version 2, so it may never appear and
// the reader fails with FileNotFoundException on `2.zip`.)
waitForStateSnapshot(version = 2, partitions = Seq(1, 4)),
AddData(inputData, (9, 1L), (10, 2L), (11, 3L), (12, 4L)),
ProcessAllAvailable(),
AddData(inputData, (13, 1L), (14, 2L), (15, 3L), (16, 4L)),
ProcessAllAvailable(),
AddData(inputData, (17, 1L), (18, 2L), (19, 3L), (20, 4L)),
ProcessAllAvailable(),
// Ensure that we get a chance to upload created snapshots
Execute { _ => Thread.sleep(5000) },
StopStream
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,16 @@ class MetricsFailureInjectionSuite
// OSS Spark cannot roll back a partially-finished result stage, so the job aborts. With
// the default RESULT_STAGE_DELAY=0 the result stage is corrupted before any task
// dispatches and the rollback path does not abort.
//
// We group by the high-cardinality `id` column (not `low_cardinality_col`) so that every
// one of the 20 reducer partitions reads data from the corrupted mapper 0. Otherwise only
// the handful of reducer partitions that happen to hold mapper-0's few low-cardinality keys
// would observe the FetchFailed, and the abort would only fire when one of those specific
// partitions happened to be scheduled after the (asynchronous) corruption -- a scheduling
// race that made this test flaky under Maven. With `id`, every partition depends on mapper
// 0, so once RESULT_STAGE_DELAY=1 has corrupted it (after the first result task), local[2]
// dispatches the remaining result tasks afterwards and at least one is guaranteed to hit
// the corrupted mapper, deterministically triggering the indeterminate-stage abort.
withTable("test_table") {
setUpTestTable("test_table")
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "20") {
Expand All @@ -580,7 +590,7 @@ class MetricsFailureInjectionSuite
config.Tests.INJECT_SHUFFLE_FETCH_FAILURES_RESULT_STAGE_DELAY.key -> "1",
config.Tests.INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE.key -> "true") {
val df = spark.read.table("test_table")
.groupBy("low_cardinality_col")
.groupBy("id")
.count()
val ex = intercept[SparkException] {
df.collect()
Expand Down