diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index c4b51d9d60aa..974b6a6682fb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -31,11 +31,11 @@ import java.security.SecureRandom import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ import java.util.concurrent.TimeUnit.NANOSECONDS -import java.util.zip.GZIPInputStream +import java.util.zip.{GZIPInputStream, ZipInputStream} import scala.annotation.tailrec +import scala.collection.{mutable, Map, Seq} import scala.collection.JavaConverters._ -import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag @@ -49,6 +49,7 @@ import com.google.common.collect.Interners import com.google.common.io.{ByteStreams, Files => GFiles} import com.google.common.net.InetAddresses import org.apache.commons.codec.binary.Hex +import org.apache.commons.io.IOUtils import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} @@ -3121,6 +3122,38 @@ private[spark] object Utils extends Logging { "If %s is not used, set spark.security.credentials.%s.enabled to false." message.format(serviceName, e, serviceName, serviceName) } + + /** + * Decompress a zip file into a local dir. File names are read from the zip file. Note, we skip + * addressing the directory here. Also, we rely on the caller side to address any exceptions. + */ + def unzipFilesFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File): Seq[File] = { + val files = new mutable.ArrayBuffer[File]() + val in = new ZipInputStream(fs.open(dfsZipFile)) + var out: OutputStream = null + try { + var entry = in.getNextEntry() + while (entry != null) { + if (!entry.isDirectory) { + val fileName = localDir.toPath.resolve(entry.getName).getFileName.toString + val outFile = new File(localDir, fileName) + files += outFile + out = new FileOutputStream(outFile) + IOUtils.copy(in, out) + out.close() + in.closeEntry() + } + entry = in.getNextEntry() + } + in.close() // so that any error in closing does not get ignored + logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}") + } finally { + // Close everything no matter what happened + IOUtils.closeQuietly(in) + IOUtils.closeQuietly(out) + } + files + } } private[util] object CallerContext extends Logging { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 43e0ec480461..0dd30d2be78c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -32,7 +32,7 @@ import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper} import org.apache.commons.io.{FilenameUtils, IOUtils} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, PathFilter} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization @@ -130,6 +130,9 @@ class RocksDBFileManager( private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf) private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf) + private val onlyZipFiles = new PathFilter { + override def accept(path: Path): Boolean = path.toString.endsWith(".zip") + } /** Save all the files in given local checkpoint directory as a committed version in DFS */ def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = { @@ -153,6 +156,49 @@ class RocksDBFileManager( logInfo(s"Saved checkpoint file for version $version") } + /** + * Load all necessary files for specific checkpoint version from DFS to given local directory. + * If version is 0, then it will delete all files in the directory. For other versions, it + * ensures that only the exact files generated during checkpointing will be present in the + * local directory. + */ + def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = { + logInfo(s"Loading checkpoint files for version $version") + val metadata = if (version == 0) { + if (localDir.exists) Utils.deleteRecursively(localDir) + localDir.mkdirs() + RocksDBCheckpointMetadata(Seq.empty, 0) + } else { + // Delete all non-immutable files in local dir, and unzip new ones from DFS commit file + listRocksDBFiles(localDir)._2.foreach(_.delete()) + Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localDir) + + // Copy the necessary immutable files + val metadataFile = localMetadataFile(localDir) + val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) + logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}") + loadImmutableFilesFromDfs(metadata.immutableFiles, localDir) + versionToRocksDBFiles.put(version, metadata.immutableFiles) + metadataFile.delete() + metadata + } + logFilesInDir(localDir, s"Loaded checkpoint files for version $version") + metadata + } + + /** Get the latest version available in the DFS directory. If no data present, it returns 0. */ + def getLatestVersion(): Long = { + val path = new Path(dfsRootDir) + if (fm.exists(path)) { + fm.list(path, onlyZipFiles) + .map(_.getPath.getName.stripSuffix(".zip")) + .map(_.toLong) + .foldLeft(0L)(math.max) + } else { + 0 + } + } + /** Save immutable files to DFS directory */ private def saveImmutableFilesToDfs( version: Long, @@ -200,6 +246,56 @@ class RocksDBFileManager( immutableFiles } + /** + * Copy files from DFS directory to a local directory. It will figure out which + * existing files are needed, and accordingly, unnecessary SST files are deleted while + * necessary and non-existing files are copied from DFS. + */ + private def loadImmutableFilesFromDfs( + immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = { + val requiredFileNameToFileDetails = immutableFiles.map(f => f.localFileName -> f).toMap + // Delete unnecessary local immutable files + listRocksDBFiles(localDir)._1 + .foreach { existingFile => + val isSameFile = + requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile)) + if (!isSameFile) { + existingFile.delete() + logInfo(s"Deleted local file $existingFile") + } + } + + var filesCopied = 0L + var bytesCopied = 0L + var filesReused = 0L + immutableFiles.foreach { file => + val localFileName = file.localFileName + val localFile = localFilePath(localDir, localFileName) + if (!localFile.exists) { + val dfsFile = dfsFilePath(file.dfsFileName) + // Note: The implementation of copyToLocalFile() closes the output stream when there is + // any exception while copying. So this may generate partial files on DFS. But that is + // okay because until the main [version].zip file is written, those partial files are + // not going to be used at all. Eventually these files should get cleared. + fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI)) + val localFileSize = localFile.length() + val expectedSize = file.sizeBytes + if (localFileSize != expectedSize) { + throw new IllegalStateException( + s"Copied $dfsFile to $localFile," + + s" expected $expectedSize bytes, found $localFileSize bytes ") + } + filesCopied += 1 + bytesCopied += localFileSize + logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes") + } else { + filesReused += 1 + } + } + logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " + + s"$filesReused files reused.") + } + /** * Compress files to a single zip file in DFS. Only the file names are embedded in the zip. * Any error while writing will ensure that the file is not written. @@ -262,6 +358,14 @@ class RocksDBFileManager( } } + private def localFilePath(localDir: File, fileName: String): File = { + if (isLogFile(fileName)) { + new File(new File(localDir, LOG_FILES_LOCAL_SUBDIR), fileName) + } else { + new File(localDir, fileName) + } + } + /** * List all the RocksDB files that need be synced or recovered. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 6a422f781607..c75eed2ae10f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -36,6 +36,7 @@ class RocksDBSuite extends SparkFunSuite { test("RocksDBFileManager: upload only new immutable files") { withTempDir { dir => val dfsRootDir = dir.getAbsolutePath + val verificationDir = Utils.createTempDir().getAbsolutePath // local dir to load checkpoints val fileManager = new RocksDBFileManager( dfsRootDir, Utils.createTempDir(), new Configuration) val sstDir = s"$dfsRootDir/SSTs" @@ -43,6 +44,14 @@ class RocksDBSuite extends SparkFunSuite { val logDir = s"$dfsRootDir/logs" def numRemoteLogFiles: Int = listFiles(logDir).length + // Verify behavior before any saved checkpoints + assert(fileManager.getLatestVersion() === 0) + + // Try to load incorrect versions + intercept[FileNotFoundException] { + fileManager.loadCheckpointFromDfs(1, Utils.createTempDir()) + } + // Save a version of checkpoint files val cpFiles1 = Seq( "sst-file1.sst" -> 10, @@ -53,10 +62,24 @@ class RocksDBSuite extends SparkFunSuite { "archive/00002.log" -> 2000 ) saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101) + assert(fileManager.getLatestVersion() === 1) assert(numRemoteSSTFiles == 2) // 2 sst files copied assert(numRemoteLogFiles == 2) // 2 log files copied - // Save SAME version again with different checkpoint files and verify + // Load back the checkpoint files into another local dir with existing files and verify + generateFiles(verificationDir, Seq( + "sst-file1.sst" -> 11, // files with same name but different sizes, should get overwritten + "other-file1" -> 101, + "archive/00001.log" -> 1001, + "random-sst-file.sst" -> 100, // unnecessary files, should get deleted + "random-other-file" -> 9, + "00005.log" -> 101, + "archive/00007.log" -> 101 + )) + loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1, 101) + + // Save SAME version again with different checkpoint files and load back again to verify + // whether files were overwritten. val cpFiles1_ = Seq( "sst-file1.sst" -> 10, // same SST file as before, should not get copied "sst-file2.sst" -> 25, // new SST file with same name as before, but different length @@ -71,6 +94,7 @@ class RocksDBSuite extends SparkFunSuite { saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001) assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files + loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001) // Save another version and verify val cpFiles2 = Seq( @@ -81,6 +105,19 @@ class RocksDBSuite extends SparkFunSuite { saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501) assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files + loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501) + + // Loading an older version should work + loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001) + + // Loading incorrect version should fail + intercept[FileNotFoundException] { + loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 3, Nil, 1001) + } + + // Loading 0 should delete all files + require(verificationDir.list().length > 0) + loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 0, Nil, 0) } } @@ -147,6 +184,20 @@ class RocksDBSuite extends SparkFunSuite { fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys) } + def loadAndVerifyCheckpointFiles( + fileManager: RocksDBFileManager, + verificationDir: String, + version: Int, + expectedFiles: Seq[(String, Int)], + expectedNumKeys: Int): Unit = { + val metadata = fileManager.loadCheckpointFromDfs(version, verificationDir) + val filesAndLengths = + listFiles(verificationDir).map(f => f.getName -> f.length).toSet ++ + listFiles(verificationDir + "/archive").map(f => s"archive/${f.getName}" -> f.length()).toSet + assert(filesAndLengths === expectedFiles.toSet) + assert(metadata.numKeys === expectedNumKeys) + } + implicit def toFile(path: String): File = new File(path) def listFiles(file: File): Seq[File] = {