-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-35785][SS] Cleanup support for RocksDB instance #32933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
31fad2b
a5aa591
c29acc9
d975fcb
49c4572
c9d274d
41ba563
9c99cdf
ea94983
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap | |
| import java.util.zip.{ZipEntry, ZipOutputStream} | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
| import scala.collection.mutable | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonInclude.Include | ||
| import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} | ||
|
|
@@ -198,6 +199,99 @@ class RocksDBFileManager( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Delete old versions by deleting the associated version and SST files. | ||
| * At a high-level, this method finds which versions to delete, and which SST files that were | ||
| * last used in those versions. It's safe to delete these SST files because a SST file can | ||
| * be reused only in successive versions. Therefore, if a SST file F was last used in version | ||
| * V, then it won't be used in version V+1 or later, and if version V can be deleted, then | ||
| * F can safely be deleted as well. | ||
| * | ||
| * To find old files, it does the following. | ||
| * - List all the existing [version].zip files | ||
| * - Find the min version that needs to be retained based on the given `numVersionsToRetain`. | ||
| * - Accordingly decide which versions should be deleted. | ||
| * - Resolve all SSTs files of all the existing versions, if not already resolved. | ||
| * - Find what was the latest version in which each SST file was used. | ||
| * - Delete the files that were last used in the to-be-deleted versions as we will not | ||
| * need those files any more. | ||
| * | ||
| * Note that it only deletes files that it knows are safe to delete. | ||
| * It may not delete the following files. | ||
| * - Partially written SST files | ||
| * - SST files that were used in a version, but that version got overwritten with a different | ||
| * set of SST files. | ||
|
||
| */ | ||
| def deleteOldVersions(numVersionsToRetain: Int): Unit = { | ||
|
||
| val path = new Path(dfsRootDir) | ||
|
|
||
| // All versions present in DFS, sorted | ||
| val sortedVersions = fm.list(path, onlyZipFiles) | ||
| .map(_.getPath.getName.stripSuffix(".zip")) | ||
| .map(_.toLong) | ||
| .sorted | ||
|
|
||
| // Return if no versions generated yet | ||
| if (sortedVersions.isEmpty) return | ||
|
|
||
| // Find the versions to delete | ||
| val maxVersionPresent = sortedVersions.last | ||
| val minVersionPresent = sortedVersions.head | ||
| val minVersionToRetain = | ||
| math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1) | ||
| val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long] | ||
|
|
||
| // Return if no version to delete | ||
| if (versionsToDelete.isEmpty) return | ||
|
|
||
| logInfo( | ||
| s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " + | ||
| s"cleaning up all versions older than $minVersionToRetain to retain last " + | ||
| s"$numVersionsToRetain versions") | ||
|
|
||
| // Resolve RocksDB files for all the versions and find the max version each file is used | ||
| val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long] | ||
| sortedVersions.foreach { version => | ||
| val files = Option(versionToRocksDBFiles.get(version)).getOrElse { | ||
| val newResolvedFiles = getImmutableFilesFromVersionZip(version) | ||
| versionToRocksDBFiles.put(version, newResolvedFiles) | ||
| newResolvedFiles | ||
| } | ||
| files.foreach(f => fileToMaxUsedVersion(f) = version) | ||
| } | ||
|
|
||
| // Best effort attempt to delete SST files that were last used in to-be-deleted versions | ||
| val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v) } | ||
| logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain") | ||
| var failedToDelete = 0 | ||
| filesToDelete.foreach { case (file, maxUsedVersion) => | ||
| try { | ||
| val dfsFile = dfsFilePath(file.dfsFileName) | ||
| fm.delete(dfsFile) | ||
| logDebug(s"Deleted file $file that was last used in version $maxUsedVersion") | ||
| } catch { | ||
| case e: Exception => | ||
| failedToDelete += 1 | ||
| logWarning(s"Error deleting file $file, last used in version $maxUsedVersion", e) | ||
| } | ||
| } | ||
|
|
||
| // Delete the version files and forget about them | ||
| versionsToDelete.foreach { version => | ||
| val versionFile = dfsBatchZipFile(version) | ||
| try { | ||
| fm.delete(versionFile) | ||
| versionToRocksDBFiles.remove(version) | ||
| logDebug(s"Deleted version $version") | ||
| } catch { | ||
| case e: Exception => | ||
| logWarning(s"Error deleting version file $versionFile for version $version", e) | ||
| } | ||
| } | ||
| logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" + | ||
| s"$failedToDelete files) not used in versions >= $minVersionToRetain") | ||
| } | ||
|
|
||
| /** Save immutable files to DFS directory */ | ||
| private def saveImmutableFilesToDfs( | ||
| version: Long, | ||
|
|
@@ -295,6 +389,16 @@ class RocksDBFileManager( | |
| s"$filesReused files reused.") | ||
| } | ||
|
|
||
| /** Get the SST files required for a version from the version zip file in DFS */ | ||
| private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = { | ||
| Utils.deleteRecursively(localTempDir) | ||
|
||
| localTempDir.mkdirs() | ||
| Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localTempDir) | ||
| val metadataFile = localMetadataFile(localTempDir) | ||
| val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) | ||
| metadata.immutableFiles | ||
| } | ||
|
|
||
| /** | ||
| * Compress files to a single zip file in DFS. Only the file names are embedded in the zip. | ||
| * Any error while writing will ensure that the file is not written. | ||
|
|
@@ -347,6 +451,8 @@ class RocksDBFileManager( | |
|
|
||
| private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata") | ||
|
|
||
| override protected def logName: String = s"${super.logName} $loggingId" | ||
|
|
||
| private def dfsFilePath(fileName: String): Path = { | ||
| if (isSstFile(fileName)) { | ||
| new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,7 +29,7 @@ import org.apache.spark._ | |
| import org.apache.spark.sql.catalyst.util.quietly | ||
| import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.util.Utils | ||
| import org.apache.spark.util.{ThreadUtils, Utils} | ||
|
|
||
| class RocksDBSuite extends SparkFunSuite { | ||
|
|
||
|
|
@@ -102,6 +102,72 @@ class RocksDBSuite extends SparkFunSuite { | |
| } | ||
| } | ||
|
|
||
| test("RocksDB: cleanup old files") { | ||
| val remoteDir = Utils.createTempDir().toString | ||
| val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain = 10) | ||
|
|
||
| def versionsPresent: Seq[Long] = { | ||
| remoteDir.listFiles.filter(_.getName.endsWith(".zip")) | ||
| .map(_.getName.stripSuffix(".zip")) | ||
| .map(_.toLong) | ||
| .sorted | ||
| } | ||
|
|
||
| withDB(remoteDir, conf = conf) { db => | ||
| // Generate versions without cleaning up | ||
| for (version <- 1 to 50) { | ||
| db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... | ||
| db.commit() | ||
| } | ||
|
|
||
| // Clean up and verify version files and SST files were deleted | ||
| require(versionsPresent === (1L to 50L)) | ||
| val sstDir = new File(remoteDir, "SSTs") | ||
| val numSstFiles = listFiles(sstDir).length | ||
| db.cleanup() | ||
| assert(versionsPresent === (41L to 50L)) | ||
| assert(listFiles(sstDir).length < numSstFiles) | ||
|
||
|
|
||
| // Verify data in retained vesions. | ||
| versionsPresent.foreach { version => | ||
| db.load(version) | ||
| val data = db.iterator().map(toStr).toSet | ||
| assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("RocksDB: handle commit failures and aborts") { | ||
| val hadoopConf = new Configuration() | ||
| hadoopConf.set( | ||
| SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, | ||
| classOf[CreateAtomicTestManager].getName) | ||
| val remoteDir = Utils.createTempDir().getAbsolutePath | ||
| val conf = RocksDBConf().copy(compactOnCommit = true) | ||
| withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => | ||
| // Disable failure of output stream and generate versions | ||
| CreateAtomicTestManager.shouldFailInCreateAtomic = false | ||
| for (version <- 1 to 10) { | ||
| db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... | ||
| db.commit() | ||
| } | ||
| val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet | ||
|
|
||
| // Fail commit for next version and verify that reloading resets the files | ||
| CreateAtomicTestManager.shouldFailInCreateAtomic = true | ||
| db.put("11", "11") | ||
| intercept[IOException] { quietly { db.commit() } } | ||
| assert(db.load(10).iterator().map(toStr).toSet === version10Data) | ||
| CreateAtomicTestManager.shouldFailInCreateAtomic = false | ||
|
|
||
| // Abort commit for next version and verify that reloading resets the files | ||
| db.load(10) | ||
| db.put("11", "11") | ||
| db.rollback() | ||
| assert(db.load(10).iterator().map(toStr).toSet === version10Data) | ||
| } | ||
| } | ||
|
|
||
| test("RocksDBFileManager: upload only new immutable files") { | ||
| withTempDir { dir => | ||
| val dfsRootDir = dir.getAbsolutePath | ||
|
|
@@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite { | |
| } | ||
| } | ||
|
|
||
| test("disallow concurrent updates to the same RocksDB instance") { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test seems not related to clean up change here? Looks like more related to RocksDB instance PR.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yea, this is the test for rollback.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay |
||
| quietly { | ||
| withDB( | ||
| Utils.createTempDir().toString, | ||
| conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db => | ||
| // DB has been loaded so current thread has alread acquired the lock on the RocksDB instance | ||
|
|
||
| db.load(0) // Current thread should be able to load again | ||
|
|
||
| // Another thread should not be able to load while current thread is using it | ||
| val ex = intercept[IllegalStateException] { | ||
| ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) } | ||
| } | ||
| // Assert that the error message contains the stack trace | ||
| assert(ex.getMessage.contains("Thread holding the lock has trace:")) | ||
| assert(ex.getMessage.contains("runInNewThread")) | ||
|
|
||
| // Commit should release the instance allowing other threads to load new version | ||
| db.commit() | ||
| ThreadUtils.runInNewThread("concurrent-test-thread-2") { | ||
| db.load(1) | ||
| db.commit() | ||
| } | ||
|
|
||
| // Another thread should not be able to load while current thread is using it | ||
| db.load(2) | ||
| intercept[IllegalStateException] { | ||
| ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) } | ||
| } | ||
|
|
||
| // Rollback should release the instance allowing other threads to load new version | ||
| db.rollback() | ||
| ThreadUtils.runInNewThread("concurrent-test-thread-3") { | ||
| db.load(1) | ||
| db.commit() | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("ensure concurrent access lock is released after Spark task completes") { | ||
| val conf = new SparkConf().setAppName("test").setMaster("local") | ||
| val sc = new SparkContext(conf) | ||
|
|
||
| try { | ||
| RocksDBSuite.withSingletonDB { | ||
| // Load a RocksDB instance, that is, get a lock inside a task and then fail | ||
| quietly { | ||
| intercept[Exception] { | ||
| sc.makeRDD[Int](1 to 1, 1).map { i => | ||
| RocksDBSuite.singleton.load(0) | ||
| throw new Exception("fail this task to test lock release") | ||
| }.count() | ||
| } | ||
| } | ||
|
|
||
| // Test whether you can load again, that is, will it successfully lock again | ||
| RocksDBSuite.singleton.load(0) | ||
| } | ||
| } finally { | ||
| sc.stop() | ||
| } | ||
| } | ||
|
|
||
| test("ensure that concurrent update and cleanup consistent versions") { | ||
| quietly { | ||
| val numThreads = 20 | ||
| val numUpdatesInEachThread = 20 | ||
| val remoteDir = Utils.createTempDir().toString | ||
| @volatile var exception: Exception = null | ||
| val updatingThreads = Array.fill(numThreads) { | ||
| new Thread() { | ||
| override def run(): Unit = { | ||
| try { | ||
| for (version <- 0 to numUpdatesInEachThread) { | ||
| withDB( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, what this test is used for? Each
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to simulate the multi-thread scenario of updating and cleaning old versions. It will not conflict since we call commit for each update thread and the version get updated for each commits.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, will it happens? I think
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks to be more likely simulating the case multiple streaming queries with same checkpoint run concurrently. SST files shouldn't conflict as we make the file name be unique, and for metadata files we use
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I think the purpose of this test is to make sure no error thrown and the result is correct in the end.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xuanyuanking and me discussed this test offline. Seems there is something wrong with |
||
| remoteDir, | ||
| version = version) { db => | ||
| val prevValue = Option(toStr(db.get("a"))).getOrElse("0").toInt | ||
| db.put("a", (prevValue + 1).toString) | ||
| db.commit() | ||
| } | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| val newException = new Exception(s"ThreadId ${this.getId} failed", e) | ||
| if (exception != null) { | ||
| exception = newException | ||
| } | ||
| throw e | ||
| } | ||
| } | ||
| } | ||
| } | ||
| val cleaningThread = new Thread() { | ||
| override def run(): Unit = { | ||
| try { | ||
| withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = true)) { db => | ||
| while (!this.isInterrupted) { | ||
| db.cleanup() | ||
| Thread.sleep(1) | ||
| } | ||
| } | ||
| } catch { | ||
| case e: Exception => | ||
| val newException = new Exception(s"ThreadId ${this.getId} failed", e) | ||
| if (exception != null) { | ||
| exception = newException | ||
| } | ||
| throw e | ||
| } | ||
| } | ||
| } | ||
| updatingThreads.foreach(_.start()) | ||
| cleaningThread.start() | ||
| updatingThreads.foreach(_.join()) | ||
| cleaningThread.interrupt() | ||
| cleaningThread.join() | ||
| if (exception != null) { | ||
| fail(exception) | ||
| } | ||
| withDB(remoteDir, numUpdatesInEachThread) { db => | ||
| assert(toStr(db.get("a")) === numUpdatesInEachThread.toString) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("checkpoint metadata serde roundtrip") { | ||
| def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = { | ||
| assert(metadata.json == json) | ||
|
|
@@ -304,3 +497,24 @@ class RocksDBSuite extends SparkFunSuite { | |
|
|
||
| def listFiles(file: String): Seq[File] = listFiles(new File(file)) | ||
| } | ||
|
|
||
|
||
| object RocksDBSuite { | ||
| @volatile var singleton: RocksDB = _ | ||
|
|
||
| def withSingletonDB[T](func: => T): T = { | ||
| try { | ||
| singleton = new RocksDB( | ||
| dfsRootDir = Utils.createTempDir().getAbsolutePath, | ||
| conf = RocksDBConf().copy(compactOnCommit = false, minVersionsToRetain = 100), | ||
| hadoopConf = new Configuration(), | ||
| loggingId = s"[Thread-${Thread.currentThread.getId}]") | ||
|
|
||
| func | ||
| } finally { | ||
| if (singleton != null) { | ||
| singleton.close() | ||
| singleton = null | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When will we call this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be called in the
RocksDBStateStoreProvider.doMaintenace. I'll submit the state store provider PR (the last one) today.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.