Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import java.security.SecureRandom
import java.util.{Locale, Properties, Random, UUID}
import java.util.concurrent._
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.util.zip.GZIPInputStream
import java.util.zip.{GZIPInputStream, ZipInputStream}

import scala.annotation.tailrec
import scala.collection.{mutable, Map, Seq}
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
Expand All @@ -49,6 +49,7 @@ import com.google.common.collect.Interners
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.net.InetAddresses
import org.apache.commons.codec.binary.Hex
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
Expand Down Expand Up @@ -3121,6 +3122,38 @@ private[spark] object Utils extends Logging {
"If %s is not used, set spark.security.credentials.%s.enabled to false."
message.format(serviceName, e, serviceName, serviceName)
}

/**
* Decompress a zip file into a local dir. File names are read from the zip file. Note, we skip
* addressing the directory here. Also, we rely on the caller side to address any exceptions.
*/
def unzipFilesFromFile(fs: FileSystem, dfsZipFile: Path, localDir: File): Seq[File] = {
val files = new mutable.ArrayBuffer[File]()
val in = new ZipInputStream(fs.open(dfsZipFile))
var out: OutputStream = null
try {
var entry = in.getNextEntry()
while (entry != null) {
if (!entry.isDirectory) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't process directory. Could you also mention it in the method doc?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be ideal if we make it clear in method name, like unzipFilesFromFile. (Ideally I'd like to see this also extracts the directory, but let's postpone it till necessary.)

In general we expect unzipping will extract the directories as well. That said, we need to make the behavior very clear to the caller side. I agree this should be mentioned to the java doc, but method name should be also intuitive to expect the actual behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, method name changed and comment added.

val fileName = localDir.toPath.resolve(entry.getName).getFileName.toString
val outFile = new File(localDir, fileName)
files += outFile
out = new FileOutputStream(outFile)
IOUtils.copy(in, out)
out.close()
in.closeEntry()
}
entry = in.getNextEntry()
}
in.close() // so that any error in closing does not get ignored
logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}")
} finally {
// Close everything no matter what happened
IOUtils.closeQuietly(in)
IOUtils.closeQuietly(out)
}
Comment on lines +3150 to +3154
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, are we sure we don't need to process any error during unzipping?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks safe; if there's an exception we may see some files being extracted and the one of output files may be broken, but callers will catch an exception and indicate the output directory is not healthy. If necessary let's document this in javadoc as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we rely on the caller side to address any exceptions. Javadoc added as well.

files
}
}

private[util] object CallerContext extends Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import org.apache.commons.io.{FilenameUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{Path, PathFilter}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

Expand Down Expand Up @@ -130,6 +130,9 @@ class RocksDBFileManager(
private val versionToRocksDBFiles = new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir), hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
private val onlyZipFiles = new PathFilter {
override def accept(path: Path): Boolean = path.toString.endsWith(".zip")
}

/** Save all the files in given local checkpoint directory as a committed version in DFS */
def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long): Unit = {
Expand All @@ -153,6 +156,49 @@ class RocksDBFileManager(
logInfo(s"Saved checkpoint file for version $version")
}

/**
* Load all necessary files for specific checkpoint version from DFS to given local directory.
* If version is 0, then it will delete all files in the directory. For other versions, it
* ensures that only the exact files generated during checkpointing will be present in the
* local directory.
*/
def loadCheckpointFromDfs(version: Long, localDir: File): RocksDBCheckpointMetadata = {
logInfo(s"Loading checkpoint files for version $version")
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
// Delete all non-immutable files in local dir, and unzip new ones from DFS commit file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just remove the all files in localDir? Just would like to know the reason we don't clear the directory but just remove the specific files. Would we need to leverage some remaining files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The consideration here is mainly for immutable files like sst/log files. We can avoid IO for the immutable files shared among different versions.

listRocksDBFiles(localDir)._2.foreach(_.delete())
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localDir)

// Copy the necessary immutable files
val metadataFile = localMetadataFile(localDir)
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
versionToRocksDBFiles.put(version, metadata.immutableFiles)
metadataFile.delete()
metadata
}
logFilesInDir(localDir, s"Loaded checkpoint files for version $version")
metadata
}

/** Get the latest version available in the DFS directory. If no data present, it returns 0. */
def getLatestVersion(): Long = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only used by tests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

val path = new Path(dfsRootDir)
if (fm.exists(path)) {
fm.list(path, onlyZipFiles)
.map(_.getPath.getName.stripSuffix(".zip"))
.map(_.toLong)
.foldLeft(0L)(math.max)
} else {
0
}
}

/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
Expand Down Expand Up @@ -200,6 +246,56 @@ class RocksDBFileManager(
immutableFiles
}

/**
* Copy files from DFS directory to a local directory. It will figure out which
* existing files are needed, and accordingly, unnecessary SST files are deleted while
* necessary and non-existing files are copied from DFS.
*/
private def loadImmutableFilesFromDfs(
immutableFiles: Seq[RocksDBImmutableFile], localDir: File): Unit = {
val requiredFileNameToFileDetails = immutableFiles.map(f => f.localFileName -> f).toMap
// Delete unnecessary local immutable files
listRocksDBFiles(localDir)._1
.foreach { existingFile =>
val isSameFile =
requiredFileNameToFileDetails.get(existingFile.getName).exists(_.isSameFile(existingFile))
if (!isSameFile) {
existingFile.delete()
logInfo(s"Deleted local file $existingFile")
}
Comment on lines +260 to +265
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it is possible to have existing file in local dir which has same file name but not the same file in DFS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just a safer guard?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A safer guard for checking both file names and file size.

}

var filesCopied = 0L
var bytesCopied = 0L
var filesReused = 0L
immutableFiles.foreach { file =>
val localFileName = file.localFileName
val localFile = localFilePath(localDir, localFileName)
if (!localFile.exists) {
val dfsFile = dfsFilePath(file.dfsFileName)
// Note: The implementation of copyToLocalFile() closes the output stream when there is
// any exception while copying. So this may generate partial files on DFS. But that is
// okay because until the main [version].zip file is written, those partial files are
// not going to be used at all. Eventually these files should get cleared.
fs.copyToLocalFile(dfsFile, new Path(localFile.getAbsoluteFile.toURI))
val localFileSize = localFile.length()
val expectedSize = file.sizeBytes
if (localFileSize != expectedSize) {
throw new IllegalStateException(
s"Copied $dfsFile to $localFile," +
s" expected $expectedSize bytes, found $localFileSize bytes ")
}
filesCopied += 1
bytesCopied += localFileSize
logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
} else {
filesReused += 1
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from DFS to local with " +
s"$filesReused files reused.")
}

/**
* Compress files to a single zip file in DFS. Only the file names are embedded in the zip.
* Any error while writing will ensure that the file is not written.
Expand Down Expand Up @@ -262,6 +358,14 @@ class RocksDBFileManager(
}
}

private def localFilePath(localDir: File, fileName: String): File = {
if (isLogFile(fileName)) {
new File(new File(localDir, LOG_FILES_LOCAL_SUBDIR), fileName)
} else {
new File(localDir, fileName)
}
}

/**
* List all the RocksDB files that need be synced or recovered.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,22 @@ class RocksDBSuite extends SparkFunSuite {
test("RocksDBFileManager: upload only new immutable files") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
val verificationDir = Utils.createTempDir().getAbsolutePath // local dir to load checkpoints
val fileManager = new RocksDBFileManager(
dfsRootDir, Utils.createTempDir(), new Configuration)
val sstDir = s"$dfsRootDir/SSTs"
def numRemoteSSTFiles: Int = listFiles(sstDir).length
val logDir = s"$dfsRootDir/logs"
def numRemoteLogFiles: Int = listFiles(logDir).length

// Verify behavior before any saved checkpoints
assert(fileManager.getLatestVersion() === 0)

// Try to load incorrect versions
intercept[FileNotFoundException] {
fileManager.loadCheckpointFromDfs(1, Utils.createTempDir())
}

// Save a version of checkpoint files
val cpFiles1 = Seq(
"sst-file1.sst" -> 10,
Expand All @@ -53,10 +62,24 @@ class RocksDBSuite extends SparkFunSuite {
"archive/00002.log" -> 2000
)
saveCheckpointFiles(fileManager, cpFiles1, version = 1, numKeys = 101)
assert(fileManager.getLatestVersion() === 1)
assert(numRemoteSSTFiles == 2) // 2 sst files copied
assert(numRemoteLogFiles == 2) // 2 log files copied

// Save SAME version again with different checkpoint files and verify
// Load back the checkpoint files into another local dir with existing files and verify
generateFiles(verificationDir, Seq(
"sst-file1.sst" -> 11, // files with same name but different sizes, should get overwritten
"other-file1" -> 101,
"archive/00001.log" -> 1001,
"random-sst-file.sst" -> 100, // unnecessary files, should get deleted
"random-other-file" -> 9,
"00005.log" -> 101,
"archive/00007.log" -> 101
))
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1, 101)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Save SAME version again with different checkpoint files and load back again to verify
// whether files were overwritten.
val cpFiles1_ = Seq(
"sst-file1.sst" -> 10, // same SST file as before, should not get copied
"sst-file2.sst" -> 25, // new SST file with same name as before, but different length
Expand All @@ -71,6 +94,7 @@ class RocksDBSuite extends SparkFunSuite {
saveCheckpointFiles(fileManager, cpFiles1_, version = 1, numKeys = 1001)
assert(numRemoteSSTFiles === 4, "shouldn't copy same files again") // 2 old + 2 new SST files
assert(numRemoteLogFiles === 4, "shouldn't copy same files again") // 2 old + 2 new log files
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001)

// Save another version and verify
val cpFiles2 = Seq(
Expand All @@ -81,6 +105,19 @@ class RocksDBSuite extends SparkFunSuite {
saveCheckpointFiles(fileManager, cpFiles2, version = 2, numKeys = 1501)
assert(numRemoteSSTFiles === 5) // 1 new file over earlier 4 files
assert(numRemoteLogFiles === 5) // 1 new file over earlier 4 files
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 2, cpFiles2, 1501)

// Loading an older version should work
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1, cpFiles1_, 1001)

// Loading incorrect version should fail
intercept[FileNotFoundException] {
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 3, Nil, 1001)
}

// Loading 0 should delete all files
require(verificationDir.list().length > 0)
loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 0, Nil, 0)
}
}

Expand Down Expand Up @@ -147,6 +184,20 @@ class RocksDBSuite extends SparkFunSuite {
fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys)
}

def loadAndVerifyCheckpointFiles(
fileManager: RocksDBFileManager,
verificationDir: String,
version: Int,
expectedFiles: Seq[(String, Int)],
expectedNumKeys: Int): Unit = {
val metadata = fileManager.loadCheckpointFromDfs(version, verificationDir)
val filesAndLengths =
listFiles(verificationDir).map(f => f.getName -> f.length).toSet ++
listFiles(verificationDir + "/archive").map(f => s"archive/${f.getName}" -> f.length()).toSet
assert(filesAndLengths === expectedFiles.toSet)
assert(metadata.numKeys === expectedNumKeys)
}

implicit def toFile(path: String): File = new File(path)

def listFiles(file: File): Seq[File] = {
Expand Down