@@ -224,7 +224,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
224224 assert(fs.exists(path))
225225
226226 // the checkpoint is not cleaned by default (without the configuration set)
227- var postGCTester = new CleanerTester (sc, Seq (rddId), Nil , Nil )
227+ var postGCTester = new CleanerTester (sc, Seq (rddId), Nil , Nil , Nil )
228228 rdd = null // Make RDD out of scope
229229 runGC()
230230 postGCTester.assertCleanup()
@@ -245,7 +245,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
245245 assert(fs.exists(RDDCheckpointData .rddCheckpointDataPath(sc, rddId).get))
246246
247247 // Test that GC causes checkpoint data cleanup after dereferencing the RDD
248- postGCTester = new CleanerTester (sc, Seq (rddId), Nil , Nil )
248+ postGCTester = new CleanerTester (sc, Seq (rddId), Nil , Nil , Seq (rddId) )
249249 rdd = null // Make RDD out of scope
250250 runGC()
251251 postGCTester.assertCleanup()
@@ -406,12 +406,14 @@ class CleanerTester(
406406 sc : SparkContext ,
407407 rddIds : Seq [Int ] = Seq .empty,
408408 shuffleIds : Seq [Int ] = Seq .empty,
409- broadcastIds : Seq [Long ] = Seq .empty)
409+ broadcastIds : Seq [Long ] = Seq .empty,
410+ checkpointIds : Seq [Long ] = Seq .empty)
410411 extends Logging {
411412
412413 val toBeCleanedRDDIds = new HashSet [Int ] with SynchronizedSet [Int ] ++= rddIds
413414 val toBeCleanedShuffleIds = new HashSet [Int ] with SynchronizedSet [Int ] ++= shuffleIds
414415 val toBeCleanedBroadcstIds = new HashSet [Long ] with SynchronizedSet [Long ] ++= broadcastIds
416+ val toBeCheckpointIds = new HashSet [Long ] with SynchronizedSet [Long ] ++= checkpointIds
415417 val isDistributed = ! sc.isLocal
416418
417419 val cleanerListener = new CleanerListener {
@@ -427,12 +429,17 @@ class CleanerTester(
427429
428430 def broadcastCleaned (broadcastId : Long ): Unit = {
429431 toBeCleanedBroadcstIds -= broadcastId
430- logInfo(" Broadcast" + broadcastId + " cleaned" )
432+ logInfo(" Broadcast " + broadcastId + " cleaned" )
431433 }
432434
433435 def accumCleaned (accId : Long ): Unit = {
434436 logInfo(" Cleaned accId " + accId + " cleaned" )
435437 }
438+
439+ def checkpointCleaned (rddId : Long ): Unit = {
440+ toBeCheckpointIds -= rddId
441+ logInfo(" checkpoint " + rddId + " cleaned" )
442+ }
436443 }
437444
438445 val MAX_VALIDATION_ATTEMPTS = 10
@@ -456,7 +463,8 @@ class CleanerTester(
456463
457464 /** Verify that RDDs, shuffles, etc. occupy resources */
458465 private def preCleanupValidate () {
459- assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty, " Nothing to cleanup" )
466+ assert(rddIds.nonEmpty || shuffleIds.nonEmpty || broadcastIds.nonEmpty ||
467+ checkpointIds.nonEmpty, " Nothing to cleanup" )
460468
461469 // Verify the RDDs have been persisted and blocks are present
462470 rddIds.foreach { rddId =>
@@ -547,7 +555,8 @@ class CleanerTester(
547555 private def isAllCleanedUp =
548556 toBeCleanedRDDIds.isEmpty &&
549557 toBeCleanedShuffleIds.isEmpty &&
550- toBeCleanedBroadcstIds.isEmpty
558+ toBeCleanedBroadcstIds.isEmpty &&
559+ toBeCheckpointIds.isEmpty
551560
552561 private def getRDDBlocks (rddId : Int ): Seq [BlockId ] = {
553562 blockManager.master.getMatchingBlockIds( _ match {
0 commit comments