Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ class RocksDB(
logInfo(s"Rolled back to $loadedVersion")
}

def cleanup(): Unit = {
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
}
logInfo(s"Cleaned old data, time taken: $cleanupTime ms")
}

/** Release all resources */
def close(): Unit = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable

import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
Expand Down Expand Up @@ -198,6 +199,99 @@ class RocksDBFileManager(
}
}

/**
* Delete old versions by deleting the associated version and SST files.
* At a high-level, this method finds which versions to delete, and which SST files that were
* last used in those versions. It's safe to delete these SST files because a SST file can
* be reused only in successive versions. Therefore, if a SST file F was last used in version
* V, then it won't be used in version V+1 or later, and if version V can be deleted, then
* F can safely be deleted as well.
*
* To find old files, it does the following.
* - List all the existing [version].zip files
* - Find the min version that needs to be retained based on the given `numVersionsToRetain`.
* - Accordingly decide which versions should be deleted.
* - Resolve all SSTs files of all the existing versions, if not already resolved.
* - Find what was the latest version in which each SST file was used.
* - Delete the files that were last used in the to-be-deleted versions as we will not
* need those files any more.
*
* Note that it only deletes files that it knows are safe to delete.
* It may not delete the following files.
* - Partially written SST files
* - SST files that were used in a version, but that version got overwritten with a different
* set of SST files.
*/
def deleteOldVersions(numVersionsToRetain: Int): Unit = {
val path = new Path(dfsRootDir)

// All versions present in DFS, sorted
val sortedVersions = fm.list(path, onlyZipFiles)
.map(_.getPath.getName.stripSuffix(".zip"))
.map(_.toLong)
.sorted

// Return if no versions generated yet
if (sortedVersions.isEmpty) return

// Find the versions to delete
val maxVersionPresent = sortedVersions.last
val minVersionPresent = sortedVersions.head
val minVersionToRetain =
math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long]

// Return if no version to delete
if (versionsToDelete.isEmpty) return

logInfo(
s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " +
s"cleaning up all versions older than $minVersionToRetain to retain last " +
s"$numVersionsToRetain versions")

// Resolve RocksDB files for all the versions and find the max version each file is used
val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long]
sortedVersions.foreach { version =>
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f) = version)
}

// Best effort attempt to delete SST files that were last used in to-be-deleted versions
val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v) }
logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain")
var failedToDelete = 0
filesToDelete.foreach { case (file, maxUsedVersion) =>
try {
val dfsFile = dfsFilePath(file.dfsFileName)
fm.delete(dfsFile)
logDebug(s"Deleted file $file that was last used in version $maxUsedVersion")
} catch {
case e: Exception =>
failedToDelete += 1
logWarning(s"Error deleting file $file, last used in version $maxUsedVersion", e)
}
}

// Delete the version files and forget about them
versionsToDelete.foreach { version =>
val versionFile = dfsBatchZipFile(version)
try {
fm.delete(versionFile)
versionToRocksDBFiles.remove(version)
logDebug(s"Deleted version $version")
} catch {
case e: Exception =>
logWarning(s"Error deleting version file $versionFile for version $version", e)
}
}
logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" +
s"$failedToDelete files) not used in versions >= $minVersionToRetain")
}

/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
Expand Down Expand Up @@ -295,6 +389,16 @@ class RocksDBFileManager(
s"$filesReused files reused.")
}

/** Get the SST files required for a version from the version zip file in DFS */
private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = {
Utils.deleteRecursively(localTempDir)
localTempDir.mkdirs()
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localTempDir)
val metadataFile = localMetadataFile(localTempDir)
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
metadata.immutableFiles
}

/**
* Compress files to a single zip file in DFS. Only the file names are embedded in the zip.
* Any error while writing will ensure that the file is not written.
Expand Down Expand Up @@ -347,6 +451,8 @@ class RocksDBFileManager(

private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata")

override protected def logName: String = s"${super.logName} $loggingId"

private def dfsFilePath(fileName: String): Path = {
if (isSstFile(fileName)) {
new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark._
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

class RocksDBSuite extends SparkFunSuite {

Expand Down Expand Up @@ -102,6 +102,72 @@ class RocksDBSuite extends SparkFunSuite {
}
}

test("RocksDB: cleanup old files") {
val remoteDir = Utils.createTempDir().toString
val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain = 10)

def versionsPresent: Seq[Long] = {
remoteDir.listFiles.filter(_.getName.endsWith(".zip"))
.map(_.getName.stripSuffix(".zip"))
.map(_.toLong)
.sorted
}

withDB(remoteDir, conf = conf) { db =>
// Generate versions without cleaning up
for (version <- 1 to 50) {
db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
db.commit()
}

// Clean up and verify version files and SST files were deleted
require(versionsPresent === (1L to 50L))
val sstDir = new File(remoteDir, "SSTs")
val numSstFiles = listFiles(sstDir).length
db.cleanup()
assert(versionsPresent === (41L to 50L))
assert(listFiles(sstDir).length < numSstFiles)

// Verify data in retained vesions.
versionsPresent.foreach { version =>
db.load(version)
val data = db.iterator().map(toStr).toSet
assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet)
}
}
}

test("RocksDB: handle commit failures and aborts") {
val hadoopConf = new Configuration()
hadoopConf.set(
SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
classOf[CreateAtomicTestManager].getName)
val remoteDir = Utils.createTempDir().getAbsolutePath
val conf = RocksDBConf().copy(compactOnCommit = true)
withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
// Disable failure of output stream and generate versions
CreateAtomicTestManager.shouldFailInCreateAtomic = false
for (version <- 1 to 10) {
db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
db.commit()
}
val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet

// Fail commit for next version and verify that reloading resets the files
CreateAtomicTestManager.shouldFailInCreateAtomic = true
db.put("11", "11")
intercept[IOException] { quietly { db.commit() } }
assert(db.load(10).iterator().map(toStr).toSet === version10Data)
CreateAtomicTestManager.shouldFailInCreateAtomic = false

// Abort commit for next version and verify that reloading resets the files
db.load(10)
db.put("11", "11")
db.rollback()
assert(db.load(10).iterator().map(toStr).toSet === version10Data)
}
}

test("RocksDBFileManager: upload only new immutable files") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
Expand Down Expand Up @@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite {
}
}

test("disallow concurrent updates to the same RocksDB instance") {
quietly {
withDB(
Utils.createTempDir().toString,
conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db =>
// DB has been loaded so current thread has alread acquired the lock on the RocksDB instance

db.load(0) // Current thread should be able to load again

// Another thread should not be able to load while current thread is using it
val ex = intercept[IllegalStateException] {
ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) }
}
// Assert that the error message contains the stack trace
assert(ex.getMessage.contains("Thread holding the lock has trace:"))
assert(ex.getMessage.contains("runInNewThread"))

// Commit should release the instance allowing other threads to load new version
db.commit()
ThreadUtils.runInNewThread("concurrent-test-thread-2") {
db.load(1)
db.commit()
}

// Another thread should not be able to load while current thread is using it
db.load(2)
intercept[IllegalStateException] {
ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
}

// Rollback should release the instance allowing other threads to load new version
db.rollback()
ThreadUtils.runInNewThread("concurrent-test-thread-3") {
db.load(1)
db.commit()
}
}
}
}

test("ensure concurrent access lock is released after Spark task completes") {
val conf = new SparkConf().setAppName("test").setMaster("local")
val sc = new SparkContext(conf)

try {
RocksDBSuite.withSingletonDB {
// Load a RocksDB instance, that is, get a lock inside a task and then fail
quietly {
intercept[Exception] {
sc.makeRDD[Int](1 to 1, 1).map { i =>
RocksDBSuite.singleton.load(0)
throw new Exception("fail this task to test lock release")
}.count()
}
}

// Test whether you can load again, that is, will it successfully lock again
RocksDBSuite.singleton.load(0)
}
} finally {
sc.stop()
}
}

test("ensure that concurrent update and cleanup consistent versions") {
quietly {
val numThreads = 20
val numUpdatesInEachThread = 20
val remoteDir = Utils.createTempDir().toString
@volatile var exception: Exception = null
val updatingThreads = Array.fill(numThreads) {
new Thread() {
override def run(): Unit = {
try {
for (version <- 0 to numUpdatesInEachThread) {
withDB(
remoteDir,
version = version) { db =>
val prevValue = Option(toStr(db.get("a"))).getOrElse("0").toInt
db.put("a", (prevValue + 1).toString)
db.commit()
}
}
} catch {
case e: Exception =>
val newException = new Exception(s"ThreadId ${this.getId} failed", e)
if (exception != null) {
exception = newException
}
throw e
}
}
}
}
val cleaningThread = new Thread() {
override def run(): Unit = {
try {
withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = true)) { db =>
while (!this.isInterrupted) {
db.cleanup()
Thread.sleep(1)
}
}
} catch {
case e: Exception =>
val newException = new Exception(s"ThreadId ${this.getId} failed", e)
if (exception != null) {
exception = newException
}
throw e
}
}
}
updatingThreads.foreach(_.start())
cleaningThread.start()
updatingThreads.foreach(_.join())
cleaningThread.interrupt()
cleaningThread.join()
if (exception != null) {
fail(exception)
}
withDB(remoteDir, numUpdatesInEachThread) { db =>
assert(toStr(db.get("a")) === numUpdatesInEachThread.toString)
}
}
}

test("checkpoint metadata serde roundtrip") {
def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = {
assert(metadata.json == json)
Expand Down Expand Up @@ -304,3 +497,24 @@ class RocksDBSuite extends SparkFunSuite {

def listFiles(file: String): Seq[File] = listFiles(new File(file))
}

object RocksDBSuite {
@volatile var singleton: RocksDB = _

def withSingletonDB[T](func: => T): T = {
try {
singleton = new RocksDB(
dfsRootDir = Utils.createTempDir().getAbsolutePath,
conf = RocksDBConf().copy(compactOnCommit = false, minVersionsToRetain = 100),
hadoopConf = new Configuration(),
loggingId = s"[Thread-${Thread.currentThread.getId}]")

func
} finally {
if (singleton != null) {
singleton.close()
singleton = null
}
}
}
}