Skip to content

Commit ca6acf0

Browse files
xuanyuankingviirya
authored andcommitted
[SPARK-35785][SS] Cleanup support for RocksDB instance
### What changes were proposed in this pull request? Add the functionality of cleaning up files of old versions for the RocksDB instance and RocksDBFileManager. ### Why are the changes needed? Part of the implementation of RocksDB state store. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. Closes #32933 from xuanyuanking/SPARK-35785. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Liang-Chi Hsieh <[email protected]>
1 parent 79a6e00 commit ca6acf0

File tree

3 files changed

+328
-1
lines changed

3 files changed

+328
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,13 @@ class RocksDB(
253253
logInfo(s"Rolled back to $loadedVersion")
254254
}
255255

256+
def cleanup(): Unit = {
257+
val cleanupTime = timeTakenMs {
258+
fileManager.deleteOldVersions(conf.minVersionsToRetain)
259+
}
260+
logInfo(s"Cleaned old data, time taken: $cleanupTime ms")
261+
}
262+
256263
/** Release all resources */
257264
def close(): Unit = {
258265
try {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap
2525
import java.util.zip.{ZipEntry, ZipOutputStream}
2626

2727
import scala.collection.JavaConverters._
28+
import scala.collection.mutable
2829

2930
import com.fasterxml.jackson.annotation.JsonInclude.Include
3031
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
@@ -198,6 +199,99 @@ class RocksDBFileManager(
198199
}
199200
}
200201

202+
/**
203+
* Delete old versions by deleting the associated version and SST files.
204+
* At a high-level, this method finds which versions to delete, and which SST files that were
205+
* last used in those versions. It's safe to delete these SST files because a SST file can
206+
* be reused only in successive versions. Therefore, if a SST file F was last used in version
207+
* V, then it won't be used in version V+1 or later, and if version V can be deleted, then
208+
* F can safely be deleted as well.
209+
*
210+
* To find old files, it does the following.
211+
* - List all the existing [version].zip files
212+
* - Find the min version that needs to be retained based on the given `numVersionsToRetain`.
213+
* - Accordingly decide which versions should be deleted.
214+
* - Resolve all SSTs files of all the existing versions, if not already resolved.
215+
* - Find what was the latest version in which each SST file was used.
216+
* - Delete the files that were last used in the to-be-deleted versions as we will not
217+
* need those files any more.
218+
*
219+
* Note that it only deletes files that it knows are safe to delete.
220+
* It may not delete the following files.
221+
* - Partially written SST files
222+
* - SST files that were used in a version, but that version got overwritten with a different
223+
* set of SST files.
224+
*/
225+
def deleteOldVersions(numVersionsToRetain: Int): Unit = {
226+
val path = new Path(dfsRootDir)
227+
228+
// All versions present in DFS, sorted
229+
val sortedVersions = fm.list(path, onlyZipFiles)
230+
.map(_.getPath.getName.stripSuffix(".zip"))
231+
.map(_.toLong)
232+
.sorted
233+
234+
// Return if no versions generated yet
235+
if (sortedVersions.isEmpty) return
236+
237+
// Find the versions to delete
238+
val maxVersionPresent = sortedVersions.last
239+
val minVersionPresent = sortedVersions.head
240+
val minVersionToRetain =
241+
math.max(minVersionPresent, maxVersionPresent - numVersionsToRetain + 1)
242+
val versionsToDelete = sortedVersions.takeWhile(_ < minVersionToRetain).toSet[Long]
243+
244+
// Return if no version to delete
245+
if (versionsToDelete.isEmpty) return
246+
247+
logInfo(
248+
s"Versions present: (min $minVersionPresent, max $maxVersionPresent), " +
249+
s"cleaning up all versions older than $minVersionToRetain to retain last " +
250+
s"$numVersionsToRetain versions")
251+
252+
// Resolve RocksDB files for all the versions and find the max version each file is used
253+
val fileToMaxUsedVersion = new mutable.HashMap[RocksDBImmutableFile, Long]
254+
sortedVersions.foreach { version =>
255+
val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
256+
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
257+
versionToRocksDBFiles.put(version, newResolvedFiles)
258+
newResolvedFiles
259+
}
260+
files.foreach(f => fileToMaxUsedVersion(f) = version)
261+
}
262+
263+
// Best effort attempt to delete SST files that were last used in to-be-deleted versions
264+
val filesToDelete = fileToMaxUsedVersion.filter { case (_, v) => versionsToDelete.contains(v) }
265+
logInfo(s"Deleting ${filesToDelete.size} files not used in versions >= $minVersionToRetain")
266+
var failedToDelete = 0
267+
filesToDelete.foreach { case (file, maxUsedVersion) =>
268+
try {
269+
val dfsFile = dfsFilePath(file.dfsFileName)
270+
fm.delete(dfsFile)
271+
logDebug(s"Deleted file $file that was last used in version $maxUsedVersion")
272+
} catch {
273+
case e: Exception =>
274+
failedToDelete += 1
275+
logWarning(s"Error deleting file $file, last used in version $maxUsedVersion", e)
276+
}
277+
}
278+
279+
// Delete the version files and forget about them
280+
versionsToDelete.foreach { version =>
281+
val versionFile = dfsBatchZipFile(version)
282+
try {
283+
fm.delete(versionFile)
284+
versionToRocksDBFiles.remove(version)
285+
logDebug(s"Deleted version $version")
286+
} catch {
287+
case e: Exception =>
288+
logWarning(s"Error deleting version file $versionFile for version $version", e)
289+
}
290+
}
291+
logInfo(s"Deleted ${filesToDelete.size - failedToDelete} files (failed to delete" +
292+
s"$failedToDelete files) not used in versions >= $minVersionToRetain")
293+
}
294+
201295
/** Save immutable files to DFS directory */
202296
private def saveImmutableFilesToDfs(
203297
version: Long,
@@ -295,6 +389,16 @@ class RocksDBFileManager(
295389
s"$filesReused files reused.")
296390
}
297391

392+
/** Get the SST files required for a version from the version zip file in DFS */
393+
private def getImmutableFilesFromVersionZip(version: Long): Seq[RocksDBImmutableFile] = {
394+
Utils.deleteRecursively(localTempDir)
395+
localTempDir.mkdirs()
396+
Utils.unzipFilesFromFile(fs, dfsBatchZipFile(version), localTempDir)
397+
val metadataFile = localMetadataFile(localTempDir)
398+
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
399+
metadata.immutableFiles
400+
}
401+
298402
/**
299403
* Compress files to a single zip file in DFS. Only the file names are embedded in the zip.
300404
* Any error while writing will ensure that the file is not written.
@@ -347,6 +451,8 @@ class RocksDBFileManager(
347451

348452
private def localMetadataFile(parentDir: File): File = new File(parentDir, "metadata")
349453

454+
override protected def logName: String = s"${super.logName} $loggingId"
455+
350456
private def dfsFilePath(fileName: String): Path = {
351457
if (isSstFile(fileName)) {
352458
new Path(new Path(dfsRootDir, SST_FILES_DFS_SUBDIR), fileName)

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 215 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark._
2929
import org.apache.spark.sql.catalyst.util.quietly
3030
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
3131
import org.apache.spark.sql.internal.SQLConf
32-
import org.apache.spark.util.Utils
32+
import org.apache.spark.util.{ThreadUtils, Utils}
3333

3434
class RocksDBSuite extends SparkFunSuite {
3535

@@ -102,6 +102,72 @@ class RocksDBSuite extends SparkFunSuite {
102102
}
103103
}
104104

105+
test("RocksDB: cleanup old files") {
106+
val remoteDir = Utils.createTempDir().toString
107+
val conf = RocksDBConf().copy(compactOnCommit = true, minVersionsToRetain = 10)
108+
109+
def versionsPresent: Seq[Long] = {
110+
remoteDir.listFiles.filter(_.getName.endsWith(".zip"))
111+
.map(_.getName.stripSuffix(".zip"))
112+
.map(_.toLong)
113+
.sorted
114+
}
115+
116+
withDB(remoteDir, conf = conf) { db =>
117+
// Generate versions without cleaning up
118+
for (version <- 1 to 50) {
119+
db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
120+
db.commit()
121+
}
122+
123+
// Clean up and verify version files and SST files were deleted
124+
require(versionsPresent === (1L to 50L))
125+
val sstDir = new File(remoteDir, "SSTs")
126+
val numSstFiles = listFiles(sstDir).length
127+
db.cleanup()
128+
assert(versionsPresent === (41L to 50L))
129+
assert(listFiles(sstDir).length < numSstFiles)
130+
131+
// Verify data in retained vesions.
132+
versionsPresent.foreach { version =>
133+
db.load(version)
134+
val data = db.iterator().map(toStr).toSet
135+
assert(data === (1L to version).map(_.toString).map(x => x -> x).toSet)
136+
}
137+
}
138+
}
139+
140+
test("RocksDB: handle commit failures and aborts") {
141+
val hadoopConf = new Configuration()
142+
hadoopConf.set(
143+
SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
144+
classOf[CreateAtomicTestManager].getName)
145+
val remoteDir = Utils.createTempDir().getAbsolutePath
146+
val conf = RocksDBConf().copy(compactOnCommit = true)
147+
withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
148+
// Disable failure of output stream and generate versions
149+
CreateAtomicTestManager.shouldFailInCreateAtomic = false
150+
for (version <- 1 to 10) {
151+
db.put(version.toString, version.toString) // update "1" -> "1", "2" -> "2", ...
152+
db.commit()
153+
}
154+
val version10Data = (1L to 10).map(_.toString).map(x => x -> x).toSet
155+
156+
// Fail commit for next version and verify that reloading resets the files
157+
CreateAtomicTestManager.shouldFailInCreateAtomic = true
158+
db.put("11", "11")
159+
intercept[IOException] { quietly { db.commit() } }
160+
assert(db.load(10).iterator().map(toStr).toSet === version10Data)
161+
CreateAtomicTestManager.shouldFailInCreateAtomic = false
162+
163+
// Abort commit for next version and verify that reloading resets the files
164+
db.load(10)
165+
db.put("11", "11")
166+
db.rollback()
167+
assert(db.load(10).iterator().map(toStr).toSet === version10Data)
168+
}
169+
}
170+
105171
test("RocksDBFileManager: upload only new immutable files") {
106172
withTempDir { dir =>
107173
val dfsRootDir = dir.getAbsolutePath
@@ -207,6 +273,133 @@ class RocksDBSuite extends SparkFunSuite {
207273
}
208274
}
209275

276+
test("disallow concurrent updates to the same RocksDB instance") {
277+
quietly {
278+
withDB(
279+
Utils.createTempDir().toString,
280+
conf = RocksDBConf().copy(lockAcquireTimeoutMs = 20)) { db =>
281+
// DB has been loaded so current thread has alread acquired the lock on the RocksDB instance
282+
283+
db.load(0) // Current thread should be able to load again
284+
285+
// Another thread should not be able to load while current thread is using it
286+
val ex = intercept[IllegalStateException] {
287+
ThreadUtils.runInNewThread("concurrent-test-thread-1") { db.load(0) }
288+
}
289+
// Assert that the error message contains the stack trace
290+
assert(ex.getMessage.contains("Thread holding the lock has trace:"))
291+
assert(ex.getMessage.contains("runInNewThread"))
292+
293+
// Commit should release the instance allowing other threads to load new version
294+
db.commit()
295+
ThreadUtils.runInNewThread("concurrent-test-thread-2") {
296+
db.load(1)
297+
db.commit()
298+
}
299+
300+
// Another thread should not be able to load while current thread is using it
301+
db.load(2)
302+
intercept[IllegalStateException] {
303+
ThreadUtils.runInNewThread("concurrent-test-thread-2") { db.load(2) }
304+
}
305+
306+
// Rollback should release the instance allowing other threads to load new version
307+
db.rollback()
308+
ThreadUtils.runInNewThread("concurrent-test-thread-3") {
309+
db.load(1)
310+
db.commit()
311+
}
312+
}
313+
}
314+
}
315+
316+
test("ensure concurrent access lock is released after Spark task completes") {
317+
val conf = new SparkConf().setAppName("test").setMaster("local")
318+
val sc = new SparkContext(conf)
319+
320+
try {
321+
RocksDBSuite.withSingletonDB {
322+
// Load a RocksDB instance, that is, get a lock inside a task and then fail
323+
quietly {
324+
intercept[Exception] {
325+
sc.makeRDD[Int](1 to 1, 1).map { i =>
326+
RocksDBSuite.singleton.load(0)
327+
throw new Exception("fail this task to test lock release")
328+
}.count()
329+
}
330+
}
331+
332+
// Test whether you can load again, that is, will it successfully lock again
333+
RocksDBSuite.singleton.load(0)
334+
}
335+
} finally {
336+
sc.stop()
337+
}
338+
}
339+
340+
test("ensure that concurrent update and cleanup consistent versions") {
341+
quietly {
342+
val numThreads = 20
343+
val numUpdatesInEachThread = 20
344+
val remoteDir = Utils.createTempDir().toString
345+
@volatile var exception: Exception = null
346+
val updatingThreads = Array.fill(numThreads) {
347+
new Thread() {
348+
override def run(): Unit = {
349+
try {
350+
for (version <- 0 to numUpdatesInEachThread) {
351+
withDB(
352+
remoteDir,
353+
version = version) { db =>
354+
val prevValue = Option(toStr(db.get("a"))).getOrElse("0").toInt
355+
db.put("a", (prevValue + 1).toString)
356+
db.commit()
357+
}
358+
}
359+
} catch {
360+
case e: Exception =>
361+
val newException = new Exception(s"ThreadId ${this.getId} failed", e)
362+
if (exception != null) {
363+
exception = newException
364+
}
365+
throw e
366+
}
367+
}
368+
}
369+
}
370+
val cleaningThread = new Thread() {
371+
override def run(): Unit = {
372+
try {
373+
withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = true)) { db =>
374+
while (!this.isInterrupted) {
375+
db.cleanup()
376+
Thread.sleep(1)
377+
}
378+
}
379+
} catch {
380+
case e: Exception =>
381+
val newException = new Exception(s"ThreadId ${this.getId} failed", e)
382+
if (exception != null) {
383+
exception = newException
384+
}
385+
throw e
386+
}
387+
}
388+
}
389+
updatingThreads.foreach(_.start())
390+
cleaningThread.start()
391+
updatingThreads.foreach(_.join())
392+
cleaningThread.interrupt()
393+
cleaningThread.join()
394+
if (exception != null) {
395+
fail(exception)
396+
}
397+
withDB(remoteDir, numUpdatesInEachThread) { db =>
398+
assert(toStr(db.get("a")) === numUpdatesInEachThread.toString)
399+
}
400+
}
401+
}
402+
210403
test("checkpoint metadata serde roundtrip") {
211404
def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = {
212405
assert(metadata.json == json)
@@ -304,3 +497,24 @@ class RocksDBSuite extends SparkFunSuite {
304497

305498
def listFiles(file: String): Seq[File] = listFiles(new File(file))
306499
}
500+
501+
object RocksDBSuite {
502+
@volatile var singleton: RocksDB = _
503+
504+
def withSingletonDB[T](func: => T): T = {
505+
try {
506+
singleton = new RocksDB(
507+
dfsRootDir = Utils.createTempDir().getAbsolutePath,
508+
conf = RocksDBConf().copy(compactOnCommit = false, minVersionsToRetain = 100),
509+
hadoopConf = new Configuration(),
510+
loggingId = s"[Thread-${Thread.currentThread.getId}]")
511+
512+
func
513+
} finally {
514+
if (singleton != null) {
515+
singleton.close()
516+
singleton = null
517+
}
518+
}
519+
}
520+
}

0 commit comments

Comments
 (0)