Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.execution.streaming.state

import java.io.{File, FileInputStream, InputStream}
import java.io.{File, FileInputStream, FileNotFoundException, InputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.{mutable, Map}
import scala.math._
import scala.util.control.NonFatal

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
Expand Down Expand Up @@ -976,11 +977,45 @@ class RocksDBFileManager(
*/
private def unzipBatchZipFileFromDfs(
version: Long, checkpointUniqueId: Option[String], localDir: File): Seq[File] = {
if (checkpointUniqueId.isDefined) {
Utils.unzipFilesFromInputStream(
fm.open(dfsBatchZipFile(version, checkpointUniqueId)), localDir)
} else {
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localDir)
try {
if (checkpointUniqueId.isDefined) {
Utils.unzipFilesFromInputStream(
fm.open(dfsBatchZipFile(version, checkpointUniqueId)), localDir)
} else {
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version, checkpointUniqueId), localDir)
}
} catch {
case e: FileNotFoundException =>
// The snapshot zip for this version is missing. The most common cause is that the
// asynchronous maintenance thread has not uploaded the snapshot yet (e.g. reading state
// with the snapshotStartBatchId option immediately after writing). Enrich the error with
// the snapshot/changelog files that ARE present in DFS, so the situation is diagnosable
// straight from the logs (in particular for intermittent failures in scheduled jobs).
throw new FileNotFoundException(
s"${e.getMessage}\nFailed to load the snapshot file for version $version" +
checkpointUniqueId.map(id => s" (checkpointUniqueId=$id)").getOrElse("") +
s" from ${dfsBatchZipFile(version, checkpointUniqueId)}. " +
s"Files currently present in $dfsRootDir: ${listDfsFilesForDiagnostics()}")
}
}

/**
* Best-effort listing of the snapshot (.zip) and changelog (.changelog) files present in the
* DFS checkpoint root. Used only to enrich diagnostics when a snapshot load fails; never throws.
*/
private def listDfsFilesForDiagnostics(): String = {
try {
val path = new Path(dfsRootDir)
if (!fm.exists(path)) {
"<DFS checkpoint root does not exist>"
} else {
val names = fm.list(path).map(_.getPath.getName)
val snapshots = names.filter(_.endsWith(".zip")).sorted
val changelogs = names.filter(_.endsWith(".changelog")).sorted
s"snapshots=[${snapshots.mkString(", ")}], changelogs=[${changelogs.mkString(", ")}]"
}
} catch {
case NonFatal(t) => s"<failed to list DFS files for diagnostics: ${t.getMessage}>"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
}
}

test("RocksDBFileManager: missing snapshot during load reports the available versions") {
// Loading a snapshot version that has not been uploaded yet (e.g. the asynchronous
// maintenance thread has not finished uploading it when reading state with
// snapshotStartBatchId) should fail with a FileNotFoundException whose message lists the
// snapshot/changelog files that ARE present, so intermittent failures in scheduled jobs are
// diagnosable straight from the logs.
val hadoopConf = new Configuration()
val remoteDir = Utils.createTempDir().toString
val fileManager = new RocksDBFileManager(remoteDir, Utils.createTempDir(), hadoopConf)
val fileMapping = new RocksDBFileMapping()
// Upload only snapshot version 1, leaving version 2 absent.
saveCheckpointFiles(
fileManager, Seq("001.sst" -> 10, "002.sst" -> 20), version = 1, numKeys = 10, fileMapping)

val ex = intercept[FileNotFoundException] {
fileManager.loadCheckpointFromDfs(2, Utils.createTempDir(), fileMapping)
}
assert(ex.getMessage.contains("Failed to load the snapshot file for version 2"))
assert(ex.getMessage.contains("Files currently present"))
// The version-1 snapshot that does exist must be surfaced in the diagnostic.
assert(ex.getMessage.contains("snapshots=[1.zip]"))
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit regarding coverage gap: the new test only exercises the else-branch (loadCheckpointFromDfs with no checkpointUniqueId, local fs.open). The if (checkpointUniqueId.isDefined) fm.open (V2/checksum) path is not directly tested.

testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
val fileManager = new RocksDBFileManager(
Expand Down