From af41d59f47372f1e35904211bd7135fc99e31cdc Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 24 Jun 2026 13:24:23 +0900 Subject: [PATCH] [SPARK-57658][SS] Report available snapshot versions when a snapshot load fails When loading state from a snapshot (e.g. reading with the snapshotStartBatchId option) the snapshot zip for the requested version can be missing, most commonly because the asynchronous maintenance thread has not uploaded it yet. Today this surfaces only as: CANNOT_LOAD_STATE_STORE.UNCATEGORIZED Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist which gives no clue whether the snapshot was never created or just not uploaded in time. This has caused hard-to-diagnose intermittent failures in scheduled CI (snapshotStartBatchId transformWithState tests). Enrich the FileNotFoundException thrown from RocksDBFileManager with the snapshot (.zip) and changelog (.changelog) files that ARE present in the DFS checkpoint root, so any future occurrence is self-diagnosing from the logs. The listing is best-effort and never throws. A unit test in RocksDBSuite validates the message. Co-authored-by: Isaac --- .../streaming/state/RocksDBFileManager.scala | 47 ++++++++++++++++--- .../streaming/state/RocksDBSuite.scala | 23 +++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 593b2ef7951fd..b4e24ab462d17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.{File, FileInputStream, InputStream} +import java.io.{File, FileInputStream, FileNotFoundException, InputStream} import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files import java.util.concurrent.ConcurrentHashMap @@ -25,6 +25,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.{mutable, Map} import scala.math._ +import scala.util.control.NonFatal import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -976,11 +977,45 @@ class RocksDBFileManager( */ private def unzipBatchZipFileFromDfs( version: Long, checkpointUniqueId: Option[String], localDir: File): Seq[File] = { - if (checkpointUniqueId.isDefined) { - Utils.unzipFilesFromInputStream( - fm.open(dfsBatchZipFile(version, checkpointUniqueId)), localDir) - } else { - Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localDir) + try { + if (checkpointUniqueId.isDefined) { + Utils.unzipFilesFromInputStream( + fm.open(dfsBatchZipFile(version, checkpointUniqueId)), localDir) + } else { + Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localDir) + } + } catch { + case e: FileNotFoundException => + // The snapshot zip for this version is missing. The most common cause is that the + // asynchronous maintenance thread has not uploaded the snapshot yet (e.g. reading state + // with the snapshotStartBatchId option immediately after writing). Enrich the error with + // the snapshot/changelog files that ARE present in DFS, so the situation is diagnosable + // straight from the logs (in particular for intermittent failures in scheduled jobs). + throw new FileNotFoundException( + s"${e.getMessage}\nFailed to load the snapshot file for version $version" + + checkpointUniqueId.map(id => s" (checkpointUniqueId=$id)").getOrElse("") + + s" from ${dfsBatchZipFile(version, checkpointUniqueId)}. " + + s"Files currently present in $dfsRootDir: ${listDfsFilesForDiagnostics()}") + } + } + + /** + * Best-effort listing of the snapshot (.zip) and changelog (.changelog) files present in the + * DFS checkpoint root. Used only to enrich diagnostics when a snapshot load fails; never throws. + */ + private def listDfsFilesForDiagnostics(): String = { + try { + val path = new Path(dfsRootDir) + if (!fm.exists(path)) { + "" + } else { + val names = fm.list(path).map(_.getPath.getName) + val snapshots = names.filter(_.endsWith(".zip")).sorted + val changelogs = names.filter(_.endsWith(".changelog")).sorted + s"snapshots=[${snapshots.mkString(", ")}], changelogs=[${changelogs.mkString(", ")}]" + } + } catch { + case NonFatal(t) => s"" } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index dc697f5b99dc5..2404bf113024b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -1826,6 +1826,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + test("RocksDBFileManager: missing snapshot during load reports the available versions") { + // Loading a snapshot version that has not been uploaded yet (e.g. the asynchronous + // maintenance thread has not finished uploading it when reading state with + // snapshotStartBatchId) should fail with a FileNotFoundException whose message lists the + // snapshot/changelog files that ARE present, so intermittent failures in scheduled jobs are + // diagnosable straight from the logs. + val hadoopConf = new Configuration() + val remoteDir = Utils.createTempDir().toString + val fileManager = new RocksDBFileManager(remoteDir, Utils.createTempDir(), hadoopConf) + val fileMapping = new RocksDBFileMapping() + // Upload only snapshot version 1, leaving version 2 absent. + saveCheckpointFiles( + fileManager, Seq("001.sst" -> 10, "002.sst" -> 20), version = 1, numKeys = 10, fileMapping) + + val ex = intercept[FileNotFoundException] { + fileManager.loadCheckpointFromDfs(2, Utils.createTempDir(), fileMapping) + } + assert(ex.getMessage.contains("Failed to load the snapshot file for version 2")) + assert(ex.getMessage.contains("Files currently present")) + // The version-1 snapshot that does exist must be surfaced in the diagnostic. + assert(ex.getMessage.contains("snapshots=[1.zip]")) + } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager(