From 4d5b888d8b861f9e888191d76c9002cb67c160aa Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 28 May 2021 11:06:08 +0800 Subject: [PATCH 1/4] implementation for RocksDBFileManager - save checkpoint to DFS --- .../scala/org/apache/spark/util/Utils.scala | 9 + .../streaming/state/RocksDBFileManager.scala | 254 +++++++++++++++++- .../streaming/state/RocksDBSuite.scala | 101 +++++++ 3 files changed, 363 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 886555108a2b..7654eb1ac422 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1129,6 +1129,15 @@ private[spark] object Utils extends Logging { s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms" } + /** + * Lists files recursively. + */ + def recursiveList(f: File): Array[File] = { + require(f.isDirectory) + val current = f.listFiles + current ++ current.filter(_.isDirectory).flatMap(recursiveList) + } + /** * Delete a file or directory and its contents recursively. * Don't follow directories if they are symlinks. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index ac7aa955a74a..55878f939a7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -17,18 +17,270 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.File +import java.io.{File, FileInputStream, InputStream} import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.collection.JavaConverters._ import scala.collection.Seq import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper} +import org.apache.commons.io.{FilenameUtils, IOUtils} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.util.Utils + +/** + * Class responsible for syncing RocksDB checkpoint files from local disk to DFS. + * Every version checkpoint is saved in specific directory structure that allows successive + * versions to reuse to SST data files and archived log files. This allows each commit to be + * incremental, only new SST files and archived log files generated by RocksDB will be uploaded. + * The directory structures on local disk and in DFS are as follows. + * + * Local checkpoint dir structure + * ------------------------------ + * RocksDB generates a bunch of files in the local checkpoint directory. The most important among + * them are the SST files; they are the actual log structured data files. Rest of the files contain + * the metadata necessary for RocksDB to read the SST files and start from the checkpoint. + * Note that the SST files are hard links to files in the RocksDB's working directory, and therefore + * successive checkpoints can share some of the SST files. So these SST files have to be copied to + * DFS in shared directory such that different committed versions can save them. + * + * We consider both SST files and archived log files as immutable files which can be shared between + * different checkpoints. + * + * localCheckpointDir + * | + * +-- OPTIONS-000005 + * +-- MANIFEST-000008 + * +-- CURRENT + * +-- 00007.sst + * +-- 00011.sst + * +-- archive + * | +-- 00008.log + * | +-- 00013.log + * ... + * + * + * DFS directory structure after saving to DFS as version 10 + * ----------------------------------------------------------- + * The SST and archived log files are given unique file names and copied to the shared subdirectory. + * Every version maintains a mapping of local immutable file name to the unique file name in DFS. + * This mapping is saved in a JSON file (named `metadata`), which is zipped along with other + * checkpoint files into a single file `[version].zip`. + * + * dfsRootDir + * | + * +-- SSTs + * | +-- 00007-[uuid1].sst + * | +-- 00011-[uuid2].sst + * +-- logs + * | +-- 00008-[uuid3].log + * | +-- 00013-[uuid4].log + * +-- 10.zip + * | +-- metadata <--- contains mapping between 00007.sst and [uuid1].sst, + * and the mapping between 00008.log and [uuid3].log + * | +-- OPTIONS-000005 + * | +-- MANIFEST-000008 + * | +-- CURRENT + * | ... + * | + * +-- 9.zip + * +-- 8.zip + * ... + * + * Note the following. + * - Each [version].zip is a complete description of all the data and metadata needed to recover + * a RocksDB instance at the corresponding version. This is unlike the [version].delta files of + * HDFSBackedStateStore where previous delta files needs to be read to be recovered. + * - This is safe wrt speculatively executed tasks running concurrently in different executors + * as each task would upload a different copy of the generated immutable files and + * atomically update the [version].zip. + * - Immutable files are identified uniquely based on their file name and file size. + * - Immutable files can be reused only across adjacent checkpoints/versions. + * - This class is thread-safe. Specifically, it is safe to concurrently delete old files from a + * different thread than the task thread saving files. + * + * @param dfsRootDir Directory where the [version].zip files will be stored + * @param localTempDir Local directory for temporary work + * @param hadoopConf Hadoop configuration for talking to DFS + * @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs + */ +class RocksDBFileManager( + dfsRootDir: String, + localTempDir: File, + hadoopConf: Configuration, + loggingId: String = "") + extends Logging { + + import RocksDBImmutableFile._ + + private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) + private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) + + /** Save all the files in given local checkpoint directory as a committed version in DFS */ + def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { + logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version") + val allCheckpointFiles = listRocksDBFiles(checkpointDir) + val (localImmutableFiles, localOtherFiles) = allCheckpointFiles.partition(isImmutableFile) + val rocksDBFiles = saveImmutableFilesToDfs(version, checkpointDir, localImmutableFiles) + val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys) + rememberImmutableFiles(version, rocksDBFiles) + val metadataFile = localMetadataFile(checkpointDir) + metadata.writeToFile(metadataFile) + logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}") + + if (version <= 0 && numKeys == 0) { + // If we're writing the initial version and there's no data, we have to explicitly initialize + // the root directory. Normally saveImmutableFilesToDfs will do this initialization, but + // when there's no data that method won't write any files, and zipToDfsFile uses the + // CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories. + fm.mkdirs(new Path(dfsRootDir)) + } + zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version)) + logInfo(s"Saved checkpoint file for version $version") + } + + /** Save immutable files to DFS directory */ + private def saveImmutableFilesToDfs( + version: Long, + localDir: File, + localFiles: Seq[File]): Seq[RocksDBImmutableFile] = { + // Get the immutable files used in previous versions, as some of those uploaded files can be + // reused for this version + logInfo(s"Saving RocksDB files to DFS for $version") + val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f => + f.localFileName -> f + }.toMap + + var bytesCopied = 0L + var filesCopied = 0L + var filesReused = 0L + + val immutableFiles = localFiles.map { localFile => + prevFilesToSizes + .get(localFile.getName) + .filter(_.isSameFile(localFile)) + .map { reusable => + filesReused += 1 + reusable + }.getOrElse { + val localFileName = localFile.getName + val dfsFileName = newDFSFileName(localFileName) + val dfsFile = dfsFilePath(dfsFileName) + // Note: The implementation of copyFromLocalFile() closes the output stream when there is + // any exception while copying. So this may generate partial files on DFS. But that is + // okay because until the main [version].zip file is written, those partial files are + // not going to be used at all. Eventually these files should get cleared. + fs.copyFromLocalFile( + new Path(localFile.getAbsoluteFile.toURI), dfsFile) + val localFileSize = localFile.length() + logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") + filesCopied += 1 + bytesCopied += localFileSize + + RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) + } + } + logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" + + s" DFS for version $version.") + + immutableFiles + } + + /** + * Compress files to a single zip file in DFS. Only the file names are embedded in the zip. + * Any error while writing will ensure that the file is not written. + */ + private def zipToDfsFile(files: Seq[File], dfsZipFile: Path): Unit = { + lazy val filesStr = s"$dfsZipFile\n\t${files.mkString("\n\t")}" + var in: InputStream = null + val out = fm.createAtomic(dfsZipFile, overwriteIfPossible = true) + var totalBytes = 0L + val zout = new ZipOutputStream(out) + try { + files.foreach { file => + zout.putNextEntry(new ZipEntry(file.getName)) + in = new FileInputStream(file) + val bytes = IOUtils.copy(in, zout) + in.close() + zout.closeEntry() + totalBytes += bytes + } + zout.close() // so that any error in closing also cancels the output stream + logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr") + } catch { + case e: Exception => + // Cancel the actual output stream first, so that zout.close() does not write the file + out.cancel() + logError(s"Error zipping to $filesStr", e) + throw e + } finally { + // Close everything no matter what happened + IOUtils.closeQuietly(in) + IOUtils.closeQuietly(zout) + } + } + + private def rememberImmutableFiles( + version: Long, rocksDBFiles: Seq[RocksDBImmutableFile]): Unit = { + versionToRocksDBFiles.put(version, rocksDBFiles) + } + + /** Log the files present in a directory. This is useful for debugging. */ + private def logFilesInDir(dir: File, msg: String): Unit = { + lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f => + s"${f.getAbsolutePath} - ${f.length()} bytes" + } + logInfo(s"$msg - ${files.length} files\n\t${files.mkString("\n\t")}") + } + + private def newDFSFileName(localFileName: String): String = { + val baseName = FilenameUtils.getBaseName(localFileName) + val extension = FilenameUtils.getExtension(localFileName) + s"$baseName-${UUID.randomUUID}.$extension" + } + + private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip") + + private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata") + + private def dfsFilePath(fileName: String): Path = { + if (isSstFile(fileName)) { + new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName) + } else if (isLogFile(fileName)) { + new Path(new Path(dfsRootDir, LOG_FILES_DFS_SUBDIR), fileName) + } else { + new Path(dfsRootDir, fileName) + } + } + + /** + * List all the RocksDB files that need be synced or recovered. + */ + private def listRocksDBFiles(localDir: File): Seq[File] = { + val topLevelFiles = localDir.listFiles.filter(!_.isDirectory) + val archivedLogFiles = + Option(new File(localDir, LOG_FILES_LOCAL_SUBDIR).listFiles()) + .getOrElse(Array[File]()) + // To ignore .log.crc files + .filter(file => isLogFile(file.getName)) + + topLevelFiles ++ archivedLogFiles + } +} + /** * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any * changes to this MUST be backward-compatible. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index b66c1e83f2ff..50f02f7a8806 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -20,12 +20,86 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ import java.nio.charset.Charset +import scala.language.implicitConversions + import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration import org.apache.spark._ +import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils class RocksDBSuite extends SparkFunSuite { + test("RocksDBFileManager: upload only new immutable files") { + val dfsRootDir = Utils.createTempDir().getAbsolutePath + val fileManager = new RocksDBFileManager( + dfsRootDir, Utils.createTempDir(), new Configuration) + val sstDir = s"$dfsRootDir/SSTs" + def numRemoteSSTFiles: Int = listFiles(sstDir).length + val logDir = s"$dfsRootDir/logs" + def numRemoteLogFiles: Int = listFiles(logDir).length + + // Save a version of checkpoint files + val cpFiles1 = Seq( + "sst-file1.sst" -> 10, + "sst-file2.sst" -> 20, + "other-file1" -> 100, + "other-file2" -> 200, + "archive/00001.log" -> 1000, + "archive/00002.log" -> 2000 + ) + saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101) + assert(numRemoteSSTFiles == 2) // 2 sst files copied + assert(numRemoteLogFiles == 2) // 2 log files copied + + // Save SAME version again with different checkpoint files and verify + val cpFiles1_ = Seq( + "sst-file1.sst" -> 10, // same SST file as before, should not get copied + "sst-file2.sst" -> 25, // new SST file with same name as before, but different length + "sst-file3.sst" -> 30, // new SST file + "other-file1" -> 100, // same non-SST file as before, should not get copied + "other-file2" -> 210, // new non-SST file with same name as before, but different length + "other-file3" -> 300, // new non-SST file + "archive/00001.log" -> 1000, // same log file as before, should not get copied + "archive/00002.log" -> 2500, // new log file with same name as before, but different length + "archive/00003.log" -> 3000 // new log file + ) + saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) + assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files + assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files + + // Save another version and verify + val cpFiles2 = Seq( + "sst-file4.sst" -> 40, + "other-file4" -> 400, + "archive/00004.log" -> 4000 + ) + saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) + assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files + assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files + } + + test("RocksDBFileManager: error writing [version].zip cancels the output stream") { + quietly { + val hadoopConf = new Configuration() + hadoopConf.set( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, + classOf[CreateAtomicTestManager].getName) + val dfsRootDir = Utils.createTempDir().getAbsolutePath + val fileManager = new RocksDBFileManager(dfsRootDir, Utils.createTempDir(), hadoopConf) + val cpFiles = Seq("sst-file1.sst" -> 10, "sst-file2.sst" -> 20, "other-file1" -> 100) + CreateAtomicTestManager.shouldFailInCreateAtomic = true + CreateAtomicTestManager.cancelCalledInCreateAtomic = false + intercept[IOException] { + saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = 101) + } + assert(CreateAtomicTestManager.cancelCalledInCreateAtomic) + } + } + test("checkpoint metadata serde roundtrip") { def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = { assert(metadata.json == json) @@ -54,4 +128,31 @@ class RocksDBSuite extends SparkFunSuite { """{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"logFiles":[{"localFileName":"00001.log","dfsLogFileName":"00001-uuid.log","sizeBytes":12345678901234}],"numKeys":12345678901234}""") // scalastyle:on line.size.limit } + + + def generateFiles(dir: String, fileToLengths: Seq[(String, Int)]): Unit = { + fileToLengths.foreach { case (fileName, length) => + val file = new File(dir, fileName) + FileUtils.write(file, "a" * length) + } + } + + def saveCheckpointFiles( + fileManager: RocksDBFileManager, + fileToLengths: Seq[(String, Int)], + version: Int, + numKeys: Int): Unit = { + val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to create checkpoints + generateFiles(checkpointDir, fileToLengths) + fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys) + } + + implicit def toFile(path: String): File = new File(path) + + def listFiles(file: File): Seq[File] = { + if (!file.exists()) return Seq.empty + file.listFiles.filter(file => !file.getName.endsWith("crc") && !file.isDirectory) + } + + def listFiles(file: String): Seq[File] = listFiles(new File(file)) } From aae64153b7dadde11a411143c40d9292a0f5571a Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 2 Jun 2021 19:02:07 +0800 Subject: [PATCH 2/4] address comments --- .../streaming/state/RocksDBFileManager.scala | 73 +++++++++---------- .../streaming/state/RocksDBSuite.scala | 1 - 2 files changed, 34 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 55878f939a7c..7158c42daa09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.Utils /** * Class responsible for syncing RocksDB checkpoint files from local disk to DFS. - * Every version checkpoint is saved in specific directory structure that allows successive + * For each version, checkpoint is saved in specific directory structure that allows successive * versions to reuse to SST data files and archived log files. This allows each commit to be * incremental, only new SST files and archived log files generated by RocksDB will be uploaded. * The directory structures on local disk and in DFS are as follows. @@ -101,8 +101,10 @@ import org.apache.spark.util.Utils * * Note the following. * - Each [version].zip is a complete description of all the data and metadata needed to recover - * a RocksDB instance at the corresponding version. This is unlike the [version].delta files of - * HDFSBackedStateStore where previous delta files needs to be read to be recovered. + * a RocksDB instance at the corresponding version. The SST files and log files are not included + * in the zip files, they can be shared cross different versions. This is unlike the + * [version].delta files of HDFSBackedStateStore where previous delta files needs to be read + * to be recovered. * - This is safe wrt speculatively executed tasks running concurrently in different executors * as each task would upload a different copy of the generated immutable files and * atomically update the [version].zip. @@ -117,10 +119,10 @@ import org.apache.spark.util.Utils * @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs */ class RocksDBFileManager( - dfsRootDir: String, - localTempDir: File, - hadoopConf: Configuration, - loggingId: String = "") + dfsRootDir: String, + localTempDir: File, + hadoopConf: Configuration, + loggingId: String = "") extends Logging { import RocksDBImmutableFile._ @@ -132,11 +134,9 @@ class RocksDBFileManager( /** Save all the files in given local checkpoint directory as a committed version in DFS */ def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version") - val allCheckpointFiles = listRocksDBFiles(checkpointDir) - val (localImmutableFiles, localOtherFiles) = allCheckpointFiles.partition(isImmutableFile) - val rocksDBFiles = saveImmutableFilesToDfs(version, checkpointDir, localImmutableFiles) + val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir) + val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles) val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys) - rememberImmutableFiles(version, rocksDBFiles) val metadataFile = localMetadataFile(checkpointDir) metadata.writeToFile(metadataFile) logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}") @@ -154,9 +154,8 @@ class RocksDBFileManager( /** Save immutable files to DFS directory */ private def saveImmutableFilesToDfs( - version: Long, - localDir: File, - localFiles: Seq[File]): Seq[RocksDBImmutableFile] = { + version: Long, + localFiles: Seq[File]): Seq[RocksDBImmutableFile] = { // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(s"Saving RocksDB files to DFS for $version") @@ -176,25 +175,26 @@ class RocksDBFileManager( filesReused += 1 reusable }.getOrElse { - val localFileName = localFile.getName - val dfsFileName = newDFSFileName(localFileName) - val dfsFile = dfsFilePath(dfsFileName) - // Note: The implementation of copyFromLocalFile() closes the output stream when there is - // any exception while copying. So this may generate partial files on DFS. But that is - // okay because until the main [version].zip file is written, those partial files are - // not going to be used at all. Eventually these files should get cleared. - fs.copyFromLocalFile( - new Path(localFile.getAbsoluteFile.toURI), dfsFile) - val localFileSize = localFile.length() - logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") - filesCopied += 1 - bytesCopied += localFileSize - - RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) - } + val localFileName = localFile.getName + val dfsFileName = newDFSFileName(localFileName) + val dfsFile = dfsFilePath(dfsFileName) + // Note: The implementation of copyFromLocalFile() closes the output stream when there is + // any exception while copying. So this may generate partial files on DFS. But that is + // okay because until the main [version].zip file is written, those partial files are + // not going to be used at all. Eventually these files should get cleared. + fs.copyFromLocalFile( + new Path(localFile.getAbsoluteFile.toURI), dfsFile) + val localFileSize = localFile.length() + logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") + filesCopied += 1 + bytesCopied += localFileSize + + RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) + } } logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" + - s" DFS for version $version.") + s" DFS for version $version. $filesReused files reused without copying.") + versionToRocksDBFiles.put(version, immutableFiles) immutableFiles } @@ -233,11 +233,6 @@ class RocksDBFileManager( } } - private def rememberImmutableFiles( - version: Long, rocksDBFiles: Seq[RocksDBImmutableFile]): Unit = { - versionToRocksDBFiles.put(version, rocksDBFiles) - } - /** Log the files present in a directory. This is useful for debugging. */ private def logFilesInDir(dir: File, msg: String): Unit = { lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f => @@ -269,15 +264,15 @@ class RocksDBFileManager( /** * List all the RocksDB files that need be synced or recovered. */ - private def listRocksDBFiles(localDir: File): Seq[File] = { + private def listRocksDBFiles(localDir: File): (Seq[File], Seq[File]) = { val topLevelFiles = localDir.listFiles.filter(!_.isDirectory) val archivedLogFiles = Option(new File(localDir, LOG_FILES_LOCAL_SUBDIR).listFiles()) .getOrElse(Array[File]()) // To ignore .log.crc files .filter(file => isLogFile(file.getName)) - - topLevelFiles ++ archivedLogFiles + val (topLevelSstFiles, topLevelOtherFiles) = topLevelFiles.partition(f => isSstFile(f.getName)) + (topLevelSstFiles ++ archivedLogFiles, topLevelOtherFiles) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 50f02f7a8806..1680d721ec65 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -129,7 +129,6 @@ class RocksDBSuite extends SparkFunSuite { // scalastyle:on line.size.limit } - def generateFiles(dir: String, fileToLengths: Seq[(String, Int)]): Unit = { fileToLengths.foreach { case (fileName, length) => val file = new File(dir, fileName) From 72962ce878f564b8e1f4bacf8ba7938d9439ebbb Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 3 Jun 2021 18:29:57 +0800 Subject: [PATCH 3/4] address comment --- .../scala/org/apache/spark/util/Utils.scala | 11 ++- .../streaming/state/RocksDBFileManager.scala | 3 +- .../streaming/state/RocksDBSuite.scala | 94 ++++++++++--------- 3 files changed, 59 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7654eb1ac422..a082442efe15 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1134,8 +1134,15 @@ private[spark] object Utils extends Logging { */ def recursiveList(f: File): Array[File] = { require(f.isDirectory) - val current = f.listFiles - current ++ current.filter(_.isDirectory).flatMap(recursiveList) + val result = f.listFiles.toBuffer + val dirList = result.filter(_.isDirectory) + while (dirList.nonEmpty) { + val curDir = dirList.remove(0) + val files = curDir.listFiles() + result ++= files + dirList ++= files.filter(_.isDirectory) + } + result.toArray } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 7158c42daa09..8526755bfe4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -146,7 +146,8 @@ class RocksDBFileManager( // the root directory. Normally saveImmutableFilesToDfs will do this initialization, but // when there's no data that method won't write any files, and zipToDfsFile uses the // CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories. - fm.mkdirs(new Path(dfsRootDir)) + val path = new Path(dfsRootDir) + if (!fm.exists(path)) fm.mkdirs(path) } zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version)) logInfo(s"Saved checkpoint file for version $version") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 1680d721ec65..989f75580b42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -34,52 +34,54 @@ import org.apache.spark.util.Utils class RocksDBSuite extends SparkFunSuite { test("RocksDBFileManager: upload only new immutable files") { - val dfsRootDir = Utils.createTempDir().getAbsolutePath - val fileManager = new RocksDBFileManager( - dfsRootDir, Utils.createTempDir(), new Configuration) - val sstDir = s"$dfsRootDir/SSTs" - def numRemoteSSTFiles: Int = listFiles(sstDir).length - val logDir = s"$dfsRootDir/logs" - def numRemoteLogFiles: Int = listFiles(logDir).length - - // Save a version of checkpoint files - val cpFiles1 = Seq( - "sst-file1.sst" -> 10, - "sst-file2.sst" -> 20, - "other-file1" -> 100, - "other-file2" -> 200, - "archive/00001.log" -> 1000, - "archive/00002.log" -> 2000 - ) - saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101) - assert(numRemoteSSTFiles == 2) // 2 sst files copied - assert(numRemoteLogFiles == 2) // 2 log files copied - - // Save SAME version again with different checkpoint files and verify - val cpFiles1_ = Seq( - "sst-file1.sst" -> 10, // same SST file as before, should not get copied - "sst-file2.sst" -> 25, // new SST file with same name as before, but different length - "sst-file3.sst" -> 30, // new SST file - "other-file1" -> 100, // same non-SST file as before, should not get copied - "other-file2" -> 210, // new non-SST file with same name as before, but different length - "other-file3" -> 300, // new non-SST file - "archive/00001.log" -> 1000, // same log file as before, should not get copied - "archive/00002.log" -> 2500, // new log file with same name as before, but different length - "archive/00003.log" -> 3000 // new log file - ) - saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) - assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files - assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files - - // Save another version and verify - val cpFiles2 = Seq( - "sst-file4.sst" -> 40, - "other-file4" -> 400, - "archive/00004.log" -> 4000 - ) - saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) - assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files - assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files + withTempDir { dir => + val dfsRootDir = dir.getAbsolutePath + val fileManager = new RocksDBFileManager( + dfsRootDir, Utils.createTempDir(), new Configuration) + val sstDir = s"$dfsRootDir/SSTs" + def numRemoteSSTFiles: Int = listFiles(sstDir).length + val logDir = s"$dfsRootDir/logs" + def numRemoteLogFiles: Int = listFiles(logDir).length + + // Save a version of checkpoint files + val cpFiles1 = Seq( + "sst-file1.sst" -> 10, + "sst-file2.sst" -> 20, + "other-file1" -> 100, + "other-file2" -> 200, + "archive/00001.log" -> 1000, + "archive/00002.log" -> 2000 + ) + saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101) + assert(numRemoteSSTFiles == 2) // 2 sst files copied + assert(numRemoteLogFiles == 2) // 2 log files copied + + // Save SAME version again with different checkpoint files and verify + val cpFiles1_ = Seq( + "sst-file1.sst" -> 10, // same SST file as before, should not get copied + "sst-file2.sst" -> 25, // new SST file with same name as before, but different length + "sst-file3.sst" -> 30, // new SST file + "other-file1" -> 100, // same non-SST file as before, should not get copied + "other-file2" -> 210, // new non-SST file with same name as before, but different length + "other-file3" -> 300, // new non-SST file + "archive/00001.log" -> 1000, // same log file as before, should not get copied + "archive/00002.log" -> 2500, // new log file with same name as before, but different length + "archive/00003.log" -> 3000 // new log file + ) + saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) + assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files + assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files + + // Save another version and verify + val cpFiles2 = Seq( + "sst-file4.sst" -> 40, + "other-file4" -> 400, + "archive/00004.log" -> 4000 + ) + saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) + assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files + assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files + } } test("RocksDBFileManager: error writing [version].zip cancels the output stream") { From c05b140f73f855a148ccedaf753841fb94a4ab1d Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Fri, 4 Jun 2021 23:55:23 +0800 Subject: [PATCH 4/4] address comments --- .../sql/execution/streaming/state/RocksDBFileManager.scala | 2 +- .../spark/sql/execution/streaming/state/RocksDBSuite.scala | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 8526755bfe4c..43e0ec480461 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -141,7 +141,7 @@ class RocksDBFileManager( metadata.writeToFile(metadataFile) logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}") - if (version <= 0 && numKeys == 0) { + if (version <= 1 && numKeys == 0) { // If we're writing the initial version and there's no data, we have to explicitly initialize // the root directory. Normally saveImmutableFilesToDfs will do this initialization, but // when there's no data that method won't write any files, and zipToDfsFile uses the diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 989f75580b42..6a422f781607 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -94,7 +94,6 @@ class RocksDBSuite extends SparkFunSuite { val fileManager = new RocksDBFileManager(dfsRootDir, Utils.createTempDir(), hadoopConf) val cpFiles = Seq("sst-file1.sst" -> 10, "sst-file2.sst" -> 20, "other-file1" -> 100) CreateAtomicTestManager.shouldFailInCreateAtomic = true - CreateAtomicTestManager.cancelCalledInCreateAtomic = false intercept[IOException] { saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = 101) }