Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,22 @@ private[spark] object Utils extends Logging {
s"${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs)} ms"
}

/**
* Lists files recursively.
*/
def recursiveList(f: File): Array[File] = {
require(f.isDirectory)
val result = f.listFiles.toBuffer
val dirList = result.filter(_.isDirectory)
while (dirList.nonEmpty) {
val curDir = dirList.remove(0)
val files = curDir.listFiles()
result ++= files
dirList ++= files.filter(_.isDirectory)
}
result.toArray
}

/**
* Delete a file or directory and its contents recursively.
* Don't follow directories if they are symlinks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,266 @@

package org.apache.spark.sql.execution.streaming.state

import java.io.File
import java.io.{File, FileInputStream, InputStream}
import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Files
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.Seq

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.util.Utils

/**
* Class responsible for syncing RocksDB checkpoint files from local disk to DFS.
* For each version, checkpoint is saved in specific directory structure that allows successive
* versions to reuse to SST data files and archived log files. This allows each commit to be
* incremental, only new SST files and archived log files generated by RocksDB will be uploaded.
* The directory structures on local disk and in DFS are as follows.
*
* Local checkpoint dir structure
* ------------------------------
* RocksDB generates a bunch of files in the local checkpoint directory. The most important among
* them are the SST files; they are the actual log structured data files. Rest of the files contain
* the metadata necessary for RocksDB to read the SST files and start from the checkpoint.
* Note that the SST files are hard links to files in the RocksDB's working directory, and therefore
* successive checkpoints can share some of the SST files. So these SST files have to be copied to
* DFS in shared directory such that different committed versions can save them.
*
* We consider both SST files and archived log files as immutable files which can be shared between
* different checkpoints.
*
* localCheckpointDir
* |
* +-- OPTIONS-000005
* +-- MANIFEST-000008
* +-- CURRENT
* +-- 00007.sst
* +-- 00011.sst
* +-- archive
* | +-- 00008.log
* | +-- 00013.log
* ...
*
*
* DFS directory structure after saving to DFS as version 10
* -----------------------------------------------------------
* The SST and archived log files are given unique file names and copied to the shared subdirectory.
* Every version maintains a mapping of local immutable file name to the unique file name in DFS.
* This mapping is saved in a JSON file (named `metadata`), which is zipped along with other
* checkpoint files into a single file `[version].zip`.
*
* dfsRootDir
* |
* +-- SSTs
* | +-- 00007-[uuid1].sst
* | +-- 00011-[uuid2].sst
* +-- logs
* | +-- 00008-[uuid3].log
* | +-- 00013-[uuid4].log
* +-- 10.zip
* | +-- metadata <--- contains mapping between 00007.sst and [uuid1].sst,
* and the mapping between 00008.log and [uuid3].log
* | +-- OPTIONS-000005
* | +-- MANIFEST-000008
* | +-- CURRENT
* | ...
* |
* +-- 9.zip
* +-- 8.zip
* ...
*
* Note the following.
* - Each [version].zip is a complete description of all the data and metadata needed to recover
* a RocksDB instance at the corresponding version. The SST files and log files are not included
* in the zip files, they can be shared cross different versions. This is unlike the
* [version].delta files of HDFSBackedStateStore where previous delta files needs to be read
* to be recovered.
* - This is safe wrt speculatively executed tasks running concurrently in different executors
* as each task would upload a different copy of the generated immutable files and
* atomically update the [version].zip.
* - Immutable files are identified uniquely based on their file name and file size.
* - Immutable files can be reused only across adjacent checkpoints/versions.
* - This class is thread-safe. Specifically, it is safe to concurrently delete old files from a
* different thread than the task thread saving files.
Comment on lines +113 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you talk more about there this claim (thread-safe) applies? Where we delete old files?

Do you mean versionToRocksDBFiles? When we prepare files for new version, there is another thread (maintenance thread) deleting old files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It refers to delete the files contained in the old versions. Here's the description of the deleteOldVersions method of RocksDBFileManager, which will be called in RocksDBStateStoreProvider.doMaintenance. As we did before, I'll also refer this comment when the PR for delete path submitted.

   * Delete old versions by deleting the associated version and SST files.
   * At a high-level, this method finds which versions to delete, and which SST files that were
   * last used in those versions. Its safe to delete these SST files because a SST file can
   * be reused only in successive versions. Therefore, if a SST file F was last used in version
   * V, then it wont be used in version V+1 or later, and if version V can be deleted, then
   * F can safely be deleted as well.

*
* @param dfsRootDir Directory where the [version].zip files will be stored
* @param localTempDir Local directory for temporary work
* @param hadoopConf Hadoop configuration for talking to DFS
* @param loggingId Id that will be prepended in logs for isolating concurrent RocksDBs
*/
class RocksDBFileManager(
dfsRootDir: String,
localTempDir: File,
hadoopConf: Configuration,
loggingId: String = "")
extends Logging {

import RocksDBImmutableFile._

private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, could I safely assume versionToRocksDBFiles will be loaded when RocksDBFileManager is initialized with existing checkpoint in further PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. versionToRocksDBFiles was touched in the following 3 places:

  • saveCheckpointToDfs (this PR)
  • deleteOldVersions
  • loadCheckpointFromDfs

private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)

/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version $version")
val (localImmutableFiles, localOtherFiles) = listRocksDBFiles(checkpointDir)
val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per #32272 (comment), we only use prettyJson in the log.


if (version <= 1 && numKeys == 0) {
// If we're writing the initial version and there's no data, we have to explicitly initialize
// the root directory. Normally saveImmutableFilesToDfs will do this initialization, but
// when there's no data that method won't write any files, and zipToDfsFile uses the
// CheckpointFileManager.createAtomic API which doesn't auto-initialize parent directories.
val path = new Path(dfsRootDir)
if (!fm.exists(path)) fm.mkdirs(path)
}
zipToDfsFile(localOtherFiles :+ metadataFile, dfsBatchZipFile(version))
logInfo(s"Saved checkpoint file for version $version")
}

/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
// Get the immutable files used in previous versions, as some of those uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
val prevFilesToSizes = versionToRocksDBFiles.values.asScala.flatten.map { f =>
f.localFileName -> f
}.toMap

var bytesCopied = 0L
var filesCopied = 0L
var filesReused = 0L

val immutableFiles = localFiles.map { localFile =>
prevFilesToSizes
.get(localFile.getName)
.filter(_.isSameFile(localFile))
.map { reusable =>
filesReused += 1
reusable
}.getOrElse {
val localFileName = localFile.getName
val dfsFileName = newDFSFileName(localFileName)
val dfsFile = dfsFilePath(dfsFileName)
// Note: The implementation of copyFromLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyFromLocalFile(
new Path(localFile.getAbsoluteFile.toURI), dfsFile)
val localFileSize = localFile.length()
logInfo(s"Copied $localFile to $dfsFile - $localFileSize bytes")
filesCopied += 1
bytesCopied += localFileSize

RocksDBImmutableFile(localFile.getName, dfsFileName, localFileSize)
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
s" DFS for version $version. $filesReused files reused without copying.")
versionToRocksDBFiles.put(version, immutableFiles)

immutableFiles
}

/**
* Compress files to a single zip file in DFS. Only the file names are embedded in the zip.
* Any error while writing will ensure that the file is not written.
*/
private def zipToDfsFile(files: Seq[File], dfsZipFile: Path): Unit = {
lazy val filesStr = s"$dfsZipFile\n\t${files.mkString("\n\t")}"
var in: InputStream = null
val out = fm.createAtomic(dfsZipFile, overwriteIfPossible = true)
var totalBytes = 0L
val zout = new ZipOutputStream(out)
try {
files.foreach { file =>
zout.putNextEntry(new ZipEntry(file.getName))
in = new FileInputStream(file)
val bytes = IOUtils.copy(in, zout)
in.close()
zout.closeEntry()
totalBytes += bytes
}
zout.close() // so that any error in closing also cancels the output stream
logInfo(s"Zipped $totalBytes bytes (before compression) to $filesStr")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The zipped file is dfsZipFile, right? why filesStr here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the fileStr contains the dfsZipFile name. The log format here is ${dfsZipFile} \n ${listing all the file names}

} catch {
case e: Exception =>
// Cancel the actual output stream first, so that zout.close() does not write the file
out.cancel()
logError(s"Error zipping to $filesStr", e)
throw e
} finally {
// Close everything no matter what happened
IOUtils.closeQuietly(in)
IOUtils.closeQuietly(zout)
}
}

/** Log the files present in a directory. This is useful for debugging. */
private def logFilesInDir(dir: File, msg: String): Unit = {
lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f =>
s"${f.getAbsolutePath} - ${f.length()} bytes"
}
logInfo(s"$msg - ${files.length} files\n\t${files.mkString("\n\t")}")
}

private def newDFSFileName(localFileName: String): String = {
val baseName = FilenameUtils.getBaseName(localFileName)
val extension = FilenameUtils.getExtension(localFileName)
s"$baseName-${UUID.randomUUID}.$extension"
}

private def dfsBatchZipFile(version: Long): Path = new Path(s"$dfsRootDir/$version.zip")

private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata")

private def dfsFilePath(fileName: String): Path = {
if (isSstFile(fileName)) {
new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName)
} else if (isLogFile(fileName)) {
new Path(new Path(dfsRootDir, LOG_FILES_DFS_SUBDIR), fileName)
} else {
new Path(dfsRootDir, fileName)
}
}

/**
* List all the RocksDB files that need be synced or recovered.
*/
private def listRocksDBFiles(localDir: File): (Seq[File], Seq[File]) = {
val topLevelFiles = localDir.listFiles.filter(!_.isDirectory)
val archivedLogFiles =
Option(new File(localDir, LOG_FILES_LOCAL_SUBDIR).listFiles())
.getOrElse(Array[File]())
// To ignore .log.crc files
.filter(file => isLogFile(file.getName))
val (topLevelSstFiles, topLevelOtherFiles) = topLevelFiles.partition(f => isSstFile(f.getName))
(topLevelSstFiles ++ archivedLogFiles, topLevelOtherFiles)
}
}

/**
* Classes to represent metadata of checkpoints saved to DFS. Since this is converted to JSON, any
* changes to this MUST be backward-compatible.
Expand Down
Loading