diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 593b2ef7951fd..b4e24ab462d17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.{File, FileInputStream, InputStream} +import java.io.{File, FileInputStream, FileNotFoundException, InputStream} import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files import java.util.concurrent.ConcurrentHashMap @@ -25,6 +25,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.{mutable, Map} import scala.math._ +import scala.util.control.NonFatal import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -976,11 +977,45 @@ class RocksDBFileManager( */ private def unzipBatchZipFileFromDfs( version: Long, checkpointUniqueId: Option[String], localDir: File): Seq[File] = { - if (checkpointUniqueId.isDefined) { - Utils.unzipFilesFromInputStream( - fm.open(dfsBatchZipFile(version, checkpointUniqueId)), localDir) - } else { - Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localDir) + try { + if (checkpointUniqueId.isDefined) { + Utils.unzipFilesFromInputStream( + fm.open(dfsBatchZipFile(version, checkpointUniqueId)), localDir) + } else { + Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localDir) + } + } catch { + case e: FileNotFoundException => + // The snapshot zip for this version is missing. The most common cause is that the + // asynchronous maintenance thread has not uploaded the snapshot yet (e.g. reading state + // with the snapshotStartBatchId option immediately after writing). Enrich the error with + // the snapshot/changelog files that ARE present in DFS, so the situation is diagnosable + // straight from the logs (in particular for intermittent failures in scheduled jobs). + throw new FileNotFoundException( + s"${e.getMessage}\nFailed to load the snapshot file for version $version" + + checkpointUniqueId.map(id => s" (checkpointUniqueId=$id)").getOrElse("") + + s" from ${dfsBatchZipFile(version, checkpointUniqueId)}. " + + s"Files currently present in $dfsRootDir: ${listDfsFilesForDiagnostics()}") + } + } + + /** + * Best-effort listing of the snapshot (.zip) and changelog (.changelog) files present in the + * DFS checkpoint root. Used only to enrich diagnostics when a snapshot load fails; never throws. + */ + private def listDfsFilesForDiagnostics(): String = { + try { + val path = new Path(dfsRootDir) + if (!fm.exists(path)) { + "" + } else { + val names = fm.list(path).map(_.getPath.getName) + val snapshots = names.filter(_.endsWith(".zip")).sorted + val changelogs = names.filter(_.endsWith(".changelog")).sorted + s"snapshots=[${snapshots.mkString(", ")}], changelogs=[${changelogs.mkString(", ")}]" + } + } catch { + case NonFatal(t) => s"" } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index dc697f5b99dc5..2404bf113024b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -1826,6 +1826,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession } } + test("RocksDBFileManager: missing snapshot during load reports the available versions") { + // Loading a snapshot version that has not been uploaded yet (e.g. the asynchronous + // maintenance thread has not finished uploading it when reading state with + // snapshotStartBatchId) should fail with a FileNotFoundException whose message lists the + // snapshot/changelog files that ARE present, so intermittent failures in scheduled jobs are + // diagnosable straight from the logs. + val hadoopConf = new Configuration() + val remoteDir = Utils.createTempDir().toString + val fileManager = new RocksDBFileManager(remoteDir, Utils.createTempDir(), hadoopConf) + val fileMapping = new RocksDBFileMapping() + // Upload only snapshot version 1, leaving version 2 absent. + saveCheckpointFiles( + fileManager, Seq("001.sst" -> 10, "002.sst" -> 20), version = 1, numKeys = 10, fileMapping) + + val ex = intercept[FileNotFoundException] { + fileManager.loadCheckpointFromDfs(2, Utils.createTempDir(), fileMapping) + } + assert(ex.getMessage.contains("Failed to load the snapshot file for version 2")) + assert(ex.getMessage.contains("Files currently present")) + // The version-1 snapshot that does exist must be surfaced in the diagnostic. + assert(ex.getMessage.contains("snapshots=[1.zip]")) + } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager(