From 31fad2b914a788dd59372dce29ea404b61334b09 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 3 Jun 2021 18:49:54 +0800 Subject: [PATCH 1/9] implementation for the load path --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b1df7bd03d98..06fe9b37b9ff 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -34,8 +34,8 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.zip.{GZIPInputStream, ZipInputStream} import scala.annotation.tailrec +import scala.collection.{mutable, Map, Seq} import scala.collection.JavaConverters._ -import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag From a5aa59153ce3c376cf1c25fd73d0dc5b7fde617f Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 16 Jun 2021 17:02:23 +0800 Subject: [PATCH 2/9] Add the implementation for RocksDB instance --- .../apache/spark/sql/util/RocksDBLoader.scala | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala new file mode 100644 index 000000000000..55ff9e164201 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.util + +import org.rocksdb._ + +import org.apache.spark.internal.Logging +import org.apache.spark.util.UninterruptibleThread + +/** + * A wrapper for RocksDB library loading using an uninterruptible thread, as the native RocksDB + * code will throw an error when interrupted. + */ +object RocksDBLoader extends Logging { + /** + * Keep tracks of the exception thrown from the loading thread, if any. + */ + private var exception: Option[Throwable] = null + + private val loadLibraryThread = new UninterruptibleThread("RocksDB") { + override def run(): Unit = { + try { + runUninterruptibly { + RocksDB.loadLibrary() + exception = None + } + } catch { + case e: Throwable => + exception = Some(e) + } + } + } + + def loadLibrary(): Unit = synchronized { + if (exception == null) { + loadLibraryThread.start() + logInfo("RocksDB library loading thread started") + loadLibraryThread.join() + exception.foreach(throw _) + logInfo("RocksDB library loading thread finished successfully") + } else { + exception.foreach(throw _) + } + } +} + From c29acc9b0b99d62c06cca3b2f775e0d8d55201d2 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 16 Jun 2021 18:42:21 +0800 Subject: [PATCH 3/9] Cleanup support for RocksDB instance --- .../execution/streaming/state/RocksDB.scala | 7 + .../streaming/state/RocksDBFileManager.scala | 103 +++++++++ .../streaming/state/RocksDBSuite.scala | 218 +++++++++++++++++- 3 files changed, 327 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 82aa1663f85d..f6400180014e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -253,6 +253,13 @@ class RocksDB( logInfo(s"Rolled back to $loadedVersion") } + def cleanup(): Unit = { + val cleanupTime = timeTakenMs { + fileManager.deleteOldVersions(conf.minVersionsToRetain) + } + logInfo(s"Cleaned old data, time taken: $cleanupTime ms") + } + /** Release all resources */ def close(): Unit = { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 4731f4d5a06c..dff515633c86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -24,6 +24,7 @@ import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.zip.{ZipEntry, ZipOutputStream} +import scala.collection.{mutable, Seq} import scala.collection.JavaConverters._ import com.fasterxml.jackson.annotation.JsonInclude.Include @@ -198,6 +199,96 @@ class RocksDBFileManager( } } + /** + * Delete old versions by deleting the associated version and SST files. + * At a high-level, this method finds which versions to delete, and which SST files that were + * last used in those versions. Its safe to delete these SST files because a SST file can + * be reused only in successive versions. Therefore, if a SST file F was last used in version + * V, then it wont be used in version V+1 or later, and if version V can be deleted, then + * F can safely be deleted as well. + * + * To find old files, it does the following. + * - List all the existing [version].zip files + * - Find the min version that needs to be retained based on the given `numVersionsToRetain`. + * - Accordingly decide which versions should be deleted. + * - Resolve all SSTs files of all the existing versions, if not already resolved. + * - Find what was the latest version in which each SST file was used. + * - Delete the files that were last used in the to-be-deleted versions as we will not + * need those files any more. + * + * Note that it only deletes files that it knows are safe to delete. + * It may not delete the following files. + * - Partially written SST files + * - SST files that were used in a version, but that version got overwritten with a different + * set of SST files. + */ + def deleteOldVersions(numVersionsToRetain: Int): Unit = { + val path = new Path(dfsRootDir) + + // All versions present in DFS, sorted + val sortedVersions = fm.list(path, onlyZipFiles) + .map(_.getPath.getName.stripSuffix(".zip")) + .map(_.toLong) + .sorted + + // Return if no versions generated yet + if (sortedVersions.isEmpty) return + + // Find the versions to delete + val maxVersionPresent = sortedVersions.last + val minVersionPresent = sortedVersions.head + val minVersionToRetain = + math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1) + val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long] + + // Return if no version to delete + if (versionsToDelete.isEmpty) return + + logInfo( + s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " + + s"cleaning up all versions older than $minVersionToRetain to retain last " + + s"$numVersionsToRetain versions") + + // Resolve RocksDB files for all the versions and find the max version each file is used + val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long] + sortedVersions.foreach { version => + val files = Option(versionToRocksDBFiles.get(version)).getOrElse { + val newResolvedFiles = getImmutableFilesFromVersionZip(version) + versionToRocksDBFiles.put(version, newResolvedFiles) + newResolvedFiles + } + files.foreach(f => fileToMaxUsedVersion(f) = version) + } + + // Best effort attempt to delete SST files that were last used in to-be-deleted versions + val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v)} + logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain") + filesToDelete.foreach { case (file, maxUsedVersion) => + try { + val dfsFile = dfsFilePath(file.dfsFileName) + fm.delete(dfsFile) + logDebug(s"Deleted file $file that was last used in version $maxUsedVersion") + } catch { + case e: Exception => + logWarning(s"Error deleting file $file, last used in version $maxVersionPresent", e) + } + } + + // Delete the version files and forget about them + versionsToDelete.foreach { version => + try { + val versionFile = dfsBatchZipFile(version) + fm.delete(versionFile) + versionToRocksDBFiles.remove(version) + logDebug(s"Deleted version $version") + } catch { + case e: Exception => + logWarning(s"Error deleting version file $version", e) + } + } + logInfo(s"Deleted ${filesToDelete.size} files not used in versions >= $minVersionToRetain") + } + /** Save immutable files to DFS directory */ private def saveImmutableFilesToDfs( version: Long, @@ -295,6 +386,16 @@ class RocksDBFileManager( s"$filesReused files reused.") } + /** Get the SST files required for a version from the version zip file in DFS */ + private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = { + Utils.deleteRecursively(localTempDir) + localTempDir.mkdirs() + Utils.unzipFromFile(fs, dfsBatchZipFile(version), localTempDir) + val metadataFile = localMetadataFile(localTempDir) + val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) + metadata.immutableFiles + } + /** * Compress files to a single zip file in DFS. Only the file names are embedded in the zip. * Any error while writing will ensure that the file is not written. @@ -347,6 +448,8 @@ class RocksDBFileManager( private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata") + override protected def logName: String = s"${super.logName} $loggingId" + private def dfsFilePath(fileName: String): Path = { if (isSstFile(fileName)) { new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index a11eb8a33d82..d4287829f202 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} class RocksDBSuite extends SparkFunSuite { @@ -102,6 +102,72 @@ class RocksDBSuite extends SparkFunSuite { } } + test("RocksDB: cleanup old files") { + val remoteDir = Utils.createTempDir().toString + val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain = 10) + + def versionsPresent: Seq[Long] = { + remoteDir.listFiles.filter(_.getName.endsWith(".zip")) + .map(_.getName.stripSuffix(".zip")) + .map(_.toLong) + .sorted + } + + withDB(remoteDir, conf = conf) { db => + // Generate versions without cleaning up + for (version <- 1 to 50) { + db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... + db.commit() + } + + // Clean up and verify version files and SST files were deleted + require(versionsPresent === (1L to 50L)) + val sstDir = new File(remoteDir, "SSTs") + val numSstFiles = listFiles(sstDir).length + db.cleanup() + assert(versionsPresent === (41L to 50L)) + assert(listFiles(sstDir).length < numSstFiles) + + // Verify data in retained vesions. + versionsPresent.foreach { version => + db.load(version) + val data = db.iterator().map(toStr).toSet + assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet) + } + } + } + + test("RocksDB: handle commit failures and aborts") { + val hadoopConf = new Configuration() + hadoopConf.set( + SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key, + classOf[CreateAtomicTestManager].getName) + val remoteDir = Utils.createTempDir().getAbsolutePath + val conf = RocksDBConf().copy(compactOnCommit = true) + withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db => + // Disable failure of output stream and generate versions + CreateAtomicTestManager.shouldFailInCreateAtomic = false + for (version <- 1 to 10) { + db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ... + db.commit() + } + val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet + + // Fail commit for next version and verify that reloading resets the files + CreateAtomicTestManager.shouldFailInCreateAtomic = true + db.put("11", "11") + intercept[IOException] { quietly { db.commit() } } + assert(db.load(10).iterator().map(toStr).toSet === version10Data) + CreateAtomicTestManager.shouldFailInCreateAtomic = false + + // Abort commit for next version and verify that reloading resets the files + db.load(10) + db.put("11", "11") + db.rollback() + assert(db.load(10).iterator().map(toStr).toSet === version10Data) + } + } + test("RocksDBFileManager: upload only new immutable files") { withTempDir { dir => val dfsRootDir = dir.getAbsolutePath @@ -207,6 +273,134 @@ class RocksDBSuite extends SparkFunSuite { } } + test("disallow concurrent updates to the same RocksDB instance") { + quietly { + withDB( + Utils.createTempDir().toString, + conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db => + // DB has been loaded so current thread has alread acquired the lock on the RocksDB instance + + db.load(0) // Current thread should be able to load again + + // Another thread should not be able to load while current thread is using it + val ex = intercept[IllegalStateException] { + ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) } + } + // Assert that the error message contains the stack trace + assert(ex.getMessage.contains("Thread holding the lock has trace:")) + assert(ex.getMessage.contains("runInNewThread")) + + // Commit should release the instance allowing other threads to load new version + db.commit() + ThreadUtils.runInNewThread("concurrent-test-thread-2") { + db.load(1) + db.commit() + } + + // Another thread should not be able to load while current thread is using it + db.load(2) + intercept[IllegalStateException] { + ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) } + } + + // Rollback should release the instance allowing other threads to load new version + db.rollback() + ThreadUtils.runInNewThread("concurrent-test-thread-3") { + db.load(1) + db.commit() + } + } + } + } + + test("ensure concurrent access lock is released after Spark task completes") { + val conf = new SparkConf().setAppName("test").setMaster("local") + val sc = new SparkContext(conf) + + try { + RocksDBSuite.withSingletonDB { + // Load a RocksDB instance, that is, get a lock inside a task and then fail + quietly { + intercept[Exception] { + sc.makeRDD[Int](1 to 1, 1).map { i => + RocksDBSuite.singleton.load(0) + throw new Exception("fail this task to test lock release") + }.count() + } + } + + // Test whether you can load again, that is, will it successfully lock again + RocksDBSuite.singleton.load(0) + } + } finally { + sc.stop() + } + } + + test("ensure that concurrent update and cleanup consistent versions") { + quietly { + val numThreads = 20 + val numUpdatesInEachThread = 20 + val remoteDir = Utils.createTempDir().toString + @volatile var exception: Exception = null + val updatingThreads = Array.fill(numThreads) { + new Thread() { + override def run() { + try { + for (version <- 0 to numUpdatesInEachThread) { + withDB( + remoteDir, + version = version) { db => + val prevValue = Option(toStr(db.get("a"))).getOrElse("0").toInt + db.put("a", (prevValue + 1).toString) + db.commit() + } + } + } catch { + case e: Exception => + val newException = new Exception(s"ThreadId ${this.getId} failed", e) + if (exception != null) { + exception = newException + } + throw e + } + + } + } + } + val cleaningThread = new Thread() { + override def run(): Unit = { + try { + withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = true)) { db => + while (!this.isInterrupted) { + db.cleanup() + Thread.sleep(1) + } + } + } catch { + case e: Exception => + val newException = new Exception(s"ThreadId ${this.getId} failed", e) + if (exception != null) { + exception = newException + } + throw e + } + } + } + updatingThreads.foreach(_.start()) + cleaningThread.start() + updatingThreads.foreach(_.join()) + cleaningThread.interrupt() + cleaningThread.join() + if (exception != null) { + fail(exception) + } + withDB(remoteDir, numUpdatesInEachThread) { db => + assert(toStr(db.get("a")) === numUpdatesInEachThread.toString) + } + } + } + test("checkpoint metadata serde roundtrip") { def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = { assert(metadata.json == json) @@ -304,3 +498,25 @@ class RocksDBSuite extends SparkFunSuite { def listFiles(file: String): Seq[File] = listFiles(new File(file)) } + + +object RocksDBSuite { + @volatile var singleton: RocksDB = _ + + def withSingletonDB[T](func: => T): T = { + try { + singleton = new RocksDB( + dfsRootDir = Utils.createTempDir().getAbsolutePath, + conf = RocksDBConf().copy(compactOnCommit = false, minVersionsToRetain = 100), + hadoopConf = new Configuration(), + loggingId = s"[Thread-${Thread.currentThread.getId}]") + + func + } finally { + if (singleton != null) { + singleton.close() + singleton = null + } + } + } +} From d975fcb887b303a8dfa1a96e81c364a2c32c4980 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 28 Jun 2021 20:37:45 +0800 Subject: [PATCH 4/9] fix --- .../spark/sql/execution/streaming/state/RocksDBFileManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index dff515633c86..10ca692cbcc8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -24,7 +24,6 @@ import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.zip.{ZipEntry, ZipOutputStream} -import scala.collection.{mutable, Seq} import scala.collection.JavaConverters._ import com.fasterxml.jackson.annotation.JsonInclude.Include From 49c457272ed2ad767cc6c09a12533064ee855b41 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 30 Jun 2021 17:37:22 +0800 Subject: [PATCH 5/9] fix --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 06fe9b37b9ff..b1df7bd03d98 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -34,8 +34,8 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.zip.{GZIPInputStream, ZipInputStream} import scala.annotation.tailrec -import scala.collection.{mutable, Map, Seq} import scala.collection.JavaConverters._ +import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag From c9d274d793bd814b910ce92105726a63503c957d Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Wed, 30 Jun 2021 22:05:41 +0800 Subject: [PATCH 6/9] address comments --- .../streaming/state/RocksDBFileManager.scala | 20 +++++++++++-------- .../streaming/state/RocksDBSuite.scala | 1 - 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 10ca692cbcc8..bfdace3949f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConverters._ +import scala.collection.mutable import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} @@ -201,9 +202,9 @@ class RocksDBFileManager( /** * Delete old versions by deleting the associated version and SST files. * At a high-level, this method finds which versions to delete, and which SST files that were - * last used in those versions. Its safe to delete these SST files because a SST file can + * last used in those versions. It's safe to delete these SST files because a SST file can * be reused only in successive versions. Therefore, if a SST file F was last used in version - * V, then it wont be used in version V+1 or later, and if version V can be deleted, then + * V, then it won't be used in version V+1 or later, and if version V can be deleted, then * F can safely be deleted as well. * * To find old files, it does the following. @@ -260,8 +261,9 @@ class RocksDBFileManager( } // Best effort attempt to delete SST files that were last used in to-be-deleted versions - val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v)} + val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v) } logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain") + var failedToDelete = 0 filesToDelete.foreach { case (file, maxUsedVersion) => try { val dfsFile = dfsFilePath(file.dfsFileName) @@ -269,23 +271,25 @@ class RocksDBFileManager( logDebug(s"Deleted file $file that was last used in version $maxUsedVersion") } catch { case e: Exception => - logWarning(s"Error deleting file $file, last used in version $maxVersionPresent", e) + failedToDelete += 1 + logWarning(s"Error deleting file $file, last used in version $maxUsedVersion", e) } } // Delete the version files and forget about them versionsToDelete.foreach { version => + val versionFile = dfsBatchZipFile(version) try { - val versionFile = dfsBatchZipFile(version) fm.delete(versionFile) versionToRocksDBFiles.remove(version) logDebug(s"Deleted version $version") } catch { case e: Exception => - logWarning(s"Error deleting version file $version", e) + logWarning(s"Error deleting version file $versionFile for version $version", e) } } - logInfo(s"Deleted ${filesToDelete.size} files not used in versions >= $minVersionToRetain") + logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" + + s"$failedToDelete files) not used in versions >= $minVersionToRetain") } /** Save immutable files to DFS directory */ @@ -389,7 +393,7 @@ class RocksDBFileManager( private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = { Utils.deleteRecursively(localTempDir) localTempDir.mkdirs() - Utils.unzipFromFile(fs, dfsBatchZipFile(version), localTempDir) + Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localTempDir) val metadataFile = localMetadataFile(localTempDir) val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile) metadata.immutableFiles diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index d4287829f202..3fcb8cfb4bdc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -499,7 +499,6 @@ class RocksDBSuite extends SparkFunSuite { def listFiles(file: String): Seq[File] = listFiles(new File(file)) } - object RocksDBSuite { @volatile var singleton: RocksDB = _ From 41ba563eec7df77e783a27c2b9470ab37dee400e Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 1 Jul 2021 00:19:33 +0800 Subject: [PATCH 7/9] fix --- .../spark/sql/execution/streaming/state/RocksDBSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 3fcb8cfb4bdc..4659a3708c23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -345,7 +345,7 @@ class RocksDBSuite extends SparkFunSuite { @volatile var exception: Exception = null val updatingThreads = Array.fill(numThreads) { new Thread() { - override def run() { + override def run(): Unit = { try { for (version <- 0 to numUpdatesInEachThread) { withDB( @@ -364,7 +364,6 @@ class RocksDBSuite extends SparkFunSuite { } throw e } - } } } From 9c99cdfadb44e499e6cbeab4f1d042914ec4b475 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 1 Jul 2021 15:35:14 +0800 Subject: [PATCH 8/9] delete the redundent code --- .../apache/spark/sql/util/RocksDBLoader.scala | 61 ------------------- 1 file changed, 61 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala b/sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala deleted file mode 100644 index 55ff9e164201..000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/util/RocksDBLoader.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.util - -import org.rocksdb._ - -import org.apache.spark.internal.Logging -import org.apache.spark.util.UninterruptibleThread - -/** - * A wrapper for RocksDB library loading using an uninterruptible thread, as the native RocksDB - * code will throw an error when interrupted. - */ -object RocksDBLoader extends Logging { - /** - * Keep tracks of the exception thrown from the loading thread, if any. - */ - private var exception: Option[Throwable] = null - - private val loadLibraryThread = new UninterruptibleThread("RocksDB") { - override def run(): Unit = { - try { - runUninterruptibly { - RocksDB.loadLibrary() - exception = None - } - } catch { - case e: Throwable => - exception = Some(e) - } - } - } - - def loadLibrary(): Unit = synchronized { - if (exception == null) { - loadLibraryThread.start() - logInfo("RocksDB library loading thread started") - loadLibraryThread.join() - exception.foreach(throw _) - logInfo("RocksDB library loading thread finished successfully") - } else { - exception.foreach(throw _) - } - } -} - From ea949836c26be0124e37df0eb641be806628d94d Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Thu, 1 Jul 2021 17:28:31 +0800 Subject: [PATCH 9/9] trigger test