diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 886555108a2b..a082442efe15 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1129,6 +1129,22 @@ private[spark] object Utils extends Logging { s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms" } + /** + * Lists files recursively. + */ + def recursiveList(f: File): Array[File] = { + require(f.isDirectory) + val result = f.listFiles.toBuffer + val dirList = result.filter(_.isDirectory) + while (dirList.nonEmpty) { + val curDir = dirList.remove(0) + val files = curDir.listFiles() + result ++= files + dirList ++= files.filter(_.isDirectory) + } + result.toArray + } + /** * Delete a file or directory and its contents recursively. * Don't follow directories if they are symlinks. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index ac7aa955a74a..43e0ec480461 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -17,18 +17,266 @@ package org.apache.spark.sql.execution.streaming.state -import java.io.File +import java.io.{File, FileInputStream, InputStream} import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Files +import java.util.UUID +import java.util.concurrent.ConcurrentHashMap +import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.collection.JavaConverters._ import scala.collection.Seq import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper} +import org.apache.commons.io.{FilenameUtils, IOUtils} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path import org.json4s.NoTypeHints import org.json4s.jackson.Serialization +import org.apache.spark.internal.Logging +import org.apache.spark.sql.execution.streaming.CheckpointFileManager +import org.apache.spark.util.Utils + +/** + * Class responsible for syncing RocksDB checkpoint files from local disk to DFS. + * For each version, checkpoint is saved in specific directory structure that allows successive + * versions to reuse to SST data files and archived log files. This allows each commit to be + * incremental, only new SST files and archived log files generated by RocksDB will be uploaded. + * The directory structures on local disk and in DFS are as follows. + * + * Local checkpoint dir structure + * ------------------------------ + * RocksDB generates a bunch of files in the local checkpoint directory. The most important among + * them are the SST files; they are the actual log structured data files. Rest of the files contain + * the metadata necessary for RocksDB to read the SST files and start from the checkpoint. + * Note that the SST files are hard links to files in the RocksDB's working directory, and therefore + * successive checkpoints can share some of the SST files. So these SST files have to be copied to + * DFS in shared directory such that different committed versions can save them. + * + * We consider both SST files and archived log files as immutable files which can be shared between + * different checkpoints. + * + * localCheckpointDir + * | + * +-- OPTIONS-000005 + * +-- MANIFEST-000008 + * +-- CURRENT + * +-- 00007.sst + * +-- 00011.sst + * +-- archive + * | +-- 00008.log + * | +-- 00013.log + * ... + * + * + * DFS directory structure after saving to DFS as version 10 + * ----------------------------------------------------------- + * The SST and archived log files are given unique file names and copied to the shared subdirectory. + * Every version maintains a mapping of local immutable file name to the unique file name in DFS. + * This mapping is saved in a JSON file (named `metadata`), which is zipped along with other + * checkpoint files into a single file `[version].zip`. + * + * dfsRootDir + * | + * +-- SSTs + * | +-- 00007-[uuid1].sst + * | +-- 00011-[uuid2].sst + * +-- logs + * | +-- 00008-[uuid3].log + * | +-- 00013-[uuid4].log + * +-- 10.zip + * | +-- metadata <--- contains mapping between 00007.sst and [uuid1].sst, + * and the mapping between 00008.log and [uuid3].log + * | +-- OPTIONS-000005 + * | +-- MANIFEST-000008 + * | +-- CURRENT + * | ... + * | + * +-- 9.zip + * +-- 8.zip + * ... + * + * Note the following. + * - Each [version].zip is a complete description of all the data and metadata needed to recover + * a RocksDB instance at the corresponding version. The SST files and log files are not included + * in the zip files, they can be shared cross different versions. This is unlike the + * [version].delta files of HDFSBackedStateStore where previous delta files needs to be read + * to be recovered. + * - This is safe wrt speculatively executed tasks running concurrently in different executors + * as each task would upload a different copy of the generated immutable files and + * atomically update the [version].zip. + * - Immutable files are identified uniquely based on their file name and file size. + * - Immutable files can be reused only across adjacent checkpoints/versions. + * - This class is thread-safe. Specifically, it is safe to concurrently delete old files from a + * different thread than the task thread saving files. + * + * @param dfsRootDir Directory where the [version].zip files will be stored + * @param localTempDir Local directory for temporary work + * @param hadoopConf Hadoop configuration for talking to DFS + * @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs + */ +class RocksDBFileManager( + dfsRootDir: String, + localTempDir: File, + hadoopConf: Configuration, + loggingId: String = "") + extends Logging { + + import RocksDBImmutableFile._ + + private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) + private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) + + /** Save all the files in given local checkpoint directory as a committed version in DFS */ + def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { + logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version") + val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir) + val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles) + val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys) + val metadataFile = localMetadataFile(checkpointDir) + metadata.writeToFile(metadataFile) + logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}") + + if (version <= 1 && numKeys == 0) { + // If we're writing the initial version and there's no data, we have to explicitly initialize + // the root directory. Normally saveImmutableFilesToDfs will do this initialization, but + // when there's no data that method won't write any files, and zipToDfsFile uses the + // CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories. + val path = new Path(dfsRootDir) + if (!fm.exists(path)) fm.mkdirs(path) + } + zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version)) + logInfo(s"Saved checkpoint file for version $version") + } + + /** Save immutable files to DFS directory */ + private def saveImmutableFilesToDfs( + version: Long, + localFiles: Seq[File]): Seq[RocksDBImmutableFile] = { + // Get the immutable files used in previous versions, as some of those uploaded files can be + // reused for this version + logInfo(s"Saving RocksDB files to DFS for $version") + val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f => + f.localFileName -> f + }.toMap + + var bytesCopied = 0L + var filesCopied = 0L + var filesReused = 0L + + val immutableFiles = localFiles.map { localFile => + prevFilesToSizes + .get(localFile.getName) + .filter(_.isSameFile(localFile)) + .map { reusable => + filesReused += 1 + reusable + }.getOrElse { + val localFileName = localFile.getName + val dfsFileName = newDFSFileName(localFileName) + val dfsFile = dfsFilePath(dfsFileName) + // Note: The implementation of copyFromLocalFile() closes the output stream when there is + // any exception while copying. So this may generate partial files on DFS. But that is + // okay because until the main [version].zip file is written, those partial files are + // not going to be used at all. Eventually these files should get cleared. + fs.copyFromLocalFile( + new Path(localFile.getAbsoluteFile.toURI), dfsFile) + val localFileSize = localFile.length() + logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes") + filesCopied += 1 + bytesCopied += localFileSize + + RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize) + } + } + logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" + + s" DFS for version $version. $filesReused files reused without copying.") + versionToRocksDBFiles.put(version, immutableFiles) + + immutableFiles + } + + /** + * Compress files to a single zip file in DFS. Only the file names are embedded in the zip. + * Any error while writing will ensure that the file is not written. + */ + private def zipToDfsFile(files: Seq[File], dfsZipFile: Path): Unit = { + lazy val filesStr = s"$dfsZipFile\n\t${files.mkString("\n\t")}" + var in: InputStream = null + val out = fm.createAtomic(dfsZipFile, overwriteIfPossible = true) + var totalBytes = 0L + val zout = new ZipOutputStream(out) + try { + files.foreach { file => + zout.putNextEntry(new ZipEntry(file.getName)) + in = new FileInputStream(file) + val bytes = IOUtils.copy(in, zout) + in.close() + zout.closeEntry() + totalBytes += bytes + } + zout.close() // so that any error in closing also cancels the output stream + logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr") + } catch { + case e: Exception => + // Cancel the actual output stream first, so that zout.close() does not write the file + out.cancel() + logError(s"Error zipping to $filesStr", e) + throw e + } finally { + // Close everything no matter what happened + IOUtils.closeQuietly(in) + IOUtils.closeQuietly(zout) + } + } + + /** Log the files present in a directory. This is useful for debugging. */ + private def logFilesInDir(dir: File, msg: String): Unit = { + lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f => + s"${f.getAbsolutePath} - ${f.length()} bytes" + } + logInfo(s"$msg - ${files.length} files\n\t${files.mkString("\n\t")}") + } + + private def newDFSFileName(localFileName: String): String = { + val baseName = FilenameUtils.getBaseName(localFileName) + val extension = FilenameUtils.getExtension(localFileName) + s"$baseName-${UUID.randomUUID}.$extension" + } + + private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip") + + private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata") + + private def dfsFilePath(fileName: String): Path = { + if (isSstFile(fileName)) { + new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName) + } else if (isLogFile(fileName)) { + new Path(new Path(dfsRootDir, LOG_FILES_DFS_SUBDIR), fileName) + } else { + new Path(dfsRootDir, fileName) + } + } + + /** + * List all the RocksDB files that need be synced or recovered. + */ + private def listRocksDBFiles(localDir: File): (Seq[File], Seq[File]) = { + val topLevelFiles = localDir.listFiles.filter(!_.isDirectory) + val archivedLogFiles = + Option(new File(localDir, LOG_FILES_LOCAL_SUBDIR).listFiles()) + .getOrElse(Array[File]()) + // To ignore .log.crc files + .filter(file => isLogFile(file.getName)) + val (topLevelSstFiles, topLevelOtherFiles) = topLevelFiles.partition(f => isSstFile(f.getName)) + (topLevelSstFiles ++ archivedLogFiles, topLevelOtherFiles) + } +} + /** * Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any * changes to this MUST be backward-compatible. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index b66c1e83f2ff..6a422f781607 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -20,12 +20,87 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ import java.nio.charset.Charset +import scala.language.implicitConversions + import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration import org.apache.spark._ +import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Utils class RocksDBSuite extends SparkFunSuite { + test("RocksDBFileManager: upload only new immutable files") { + withTempDir { dir => + val dfsRootDir = dir.getAbsolutePath + val fileManager = new RocksDBFileManager( + dfsRootDir, Utils.createTempDir(), new Configuration) + val sstDir = s"$dfsRootDir/SSTs" + def numRemoteSSTFiles: Int = listFiles(sstDir).length + val logDir = s"$dfsRootDir/logs" + def numRemoteLogFiles: Int = listFiles(logDir).length + + // Save a version of checkpoint files + val cpFiles1 = Seq( + "sst-file1.sst" -> 10, + "sst-file2.sst" -> 20, + "other-file1" -> 100, + "other-file2" -> 200, + "archive/00001.log" -> 1000, + "archive/00002.log" -> 2000 + ) + saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101) + assert(numRemoteSSTFiles == 2) // 2 sst files copied + assert(numRemoteLogFiles == 2) // 2 log files copied + + // Save SAME version again with different checkpoint files and verify + val cpFiles1_ = Seq( + "sst-file1.sst" -> 10, // same SST file as before, should not get copied + "sst-file2.sst" -> 25, // new SST file with same name as before, but different length + "sst-file3.sst" -> 30, // new SST file + "other-file1" -> 100, // same non-SST file as before, should not get copied + "other-file2" -> 210, // new non-SST file with same name as before, but different length + "other-file3" -> 300, // new non-SST file + "archive/00001.log" -> 1000, // same log file as before, should not get copied + "archive/00002.log" -> 2500, // new log file with same name as before, but different length + "archive/00003.log" -> 3000 // new log file + ) + saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) + assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files + assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files + + // Save another version and verify + val cpFiles2 = Seq( + "sst-file4.sst" -> 40, + "other-file4" -> 400, + "archive/00004.log" -> 4000 + ) + saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) + assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files + assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files + } + } + + test("RocksDBFileManager: error writing [version].zip cancels the output stream") { + quietly { + val hadoopConf = new Configuration() + hadoopConf.set( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, + classOf[CreateAtomicTestManager].getName) + val dfsRootDir = Utils.createTempDir().getAbsolutePath + val fileManager = new RocksDBFileManager(dfsRootDir, Utils.createTempDir(), hadoopConf) + val cpFiles = Seq("sst-file1.sst" -> 10, "sst-file2.sst" -> 20, "other-file1" -> 100) + CreateAtomicTestManager.shouldFailInCreateAtomic = true + intercept[IOException] { + saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = 101) + } + assert(CreateAtomicTestManager.cancelCalledInCreateAtomic) + } + } + test("checkpoint metadata serde roundtrip") { def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = { assert(metadata.json == json) @@ -54,4 +129,30 @@ class RocksDBSuite extends SparkFunSuite { """{"sstFiles":[{"localFileName":"00001.sst","dfsSstFileName":"00001-uuid.sst","sizeBytes":12345678901234}],"logFiles":[{"localFileName":"00001.log","dfsLogFileName":"00001-uuid.log","sizeBytes":12345678901234}],"numKeys":12345678901234}""") // scalastyle:on line.size.limit } + + def generateFiles(dir: String, fileToLengths: Seq[(String, Int)]): Unit = { + fileToLengths.foreach { case (fileName, length) => + val file = new File(dir, fileName) + FileUtils.write(file, "a" * length) + } + } + + def saveCheckpointFiles( + fileManager: RocksDBFileManager, + fileToLengths: Seq[(String, Int)], + version: Int, + numKeys: Int): Unit = { + val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to create checkpoints + generateFiles(checkpointDir, fileToLengths) + fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys) + } + + implicit def toFile(path: String): File = new File(path) + + def listFiles(file: File): Seq[File] = { + if (!file.exists()) return Seq.empty + file.listFiles.filter(file => !file.getName.endsWith("crc") && !file.isDirectory) + } + + def listFiles(file: String): Seq[File] = listFiles(new File(file)) }