@@ -22,7 +22,7 @@ import java.lang.ref.{ReferenceQueue, WeakReference}
2222import scala .collection .mutable .{ArrayBuffer , SynchronizedBuffer }
2323
2424import org .apache .spark .broadcast .Broadcast
25- import org .apache .spark .rdd .RDD
25+ import org .apache .spark .rdd .{ RDDCheckpointData , RDD }
2626import org .apache .spark .util .Utils
2727
2828/**
@@ -33,6 +33,7 @@ private case class CleanRDD(rddId: Int) extends CleanupTask
3333private case class CleanShuffle (shuffleId : Int ) extends CleanupTask
3434private case class CleanBroadcast (broadcastId : Long ) extends CleanupTask
3535private case class CleanAccum (accId : Long ) extends CleanupTask
36+ private case class CleanCheckpoint (rddId : Int ) extends CleanupTask
3637
3738/**
3839 * A WeakReference associated with a CleanupTask.
@@ -94,12 +95,12 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
9495 @ volatile private var stopped = false
9596
9697 /** Attach a listener object to get information of when objects are cleaned. */
97- def attachListener (listener : CleanerListener ) {
98+ def attachListener (listener : CleanerListener ): Unit = {
9899 listeners += listener
99100 }
100101
101102 /** Start the cleaner. */
102- def start () {
103+ def start (): Unit = {
103104 cleaningThread.setDaemon(true )
104105 cleaningThread.setName(" Spark Context Cleaner" )
105106 cleaningThread.start()
@@ -108,7 +109,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
108109 /**
109110 * Stop the cleaning thread and wait until the thread has finished running its current task.
110111 */
111- def stop () {
112+ def stop (): Unit = {
112113 stopped = true
113114 // Interrupt the cleaning thread, but wait until the current task has finished before
114115 // doing so. This guards against the race condition where a cleaning thread may
@@ -121,7 +122,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
121122 }
122123
123124 /** Register a RDD for cleanup when it is garbage collected. */
124- def registerRDDForCleanup (rdd : RDD [_]) {
125+ def registerRDDForCleanup (rdd : RDD [_]): Unit = {
125126 registerForCleanup(rdd, CleanRDD (rdd.id))
126127 }
127128
@@ -130,17 +131,22 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
130131 }
131132
132133 /** Register a ShuffleDependency for cleanup when it is garbage collected. */
133- def registerShuffleForCleanup (shuffleDependency : ShuffleDependency [_, _, _]) {
134+ def registerShuffleForCleanup (shuffleDependency : ShuffleDependency [_, _, _]): Unit = {
134135 registerForCleanup(shuffleDependency, CleanShuffle (shuffleDependency.shuffleId))
135136 }
136137
137138 /** Register a Broadcast for cleanup when it is garbage collected. */
138- def registerBroadcastForCleanup [T ](broadcast : Broadcast [T ]) {
139+ def registerBroadcastForCleanup [T ](broadcast : Broadcast [T ]): Unit = {
139140 registerForCleanup(broadcast, CleanBroadcast (broadcast.id))
140141 }
141142
143+ /** Register a RDDCheckpointData for cleanup when it is garbage collected. */
144+ def registerRDDCheckpointDataForCleanup [T ](rdd : RDD [_], parentId : Int ): Unit = {
145+ registerForCleanup(rdd, CleanCheckpoint (parentId))
146+ }
147+
142148 /** Register an object for cleanup. */
143- private def registerForCleanup (objectForCleanup : AnyRef , task : CleanupTask ) {
149+ private def registerForCleanup (objectForCleanup : AnyRef , task : CleanupTask ): Unit = {
144150 referenceBuffer += new CleanupTaskWeakReference (task, objectForCleanup, referenceQueue)
145151 }
146152
@@ -164,6 +170,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
164170 doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
165171 case CleanAccum (accId) =>
166172 doCleanupAccum(accId, blocking = blockOnCleanupTasks)
173+ case CleanCheckpoint (rddId) =>
174+ doCleanCheckpoint(rddId)
167175 }
168176 }
169177 }
@@ -175,7 +183,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
175183 }
176184
177185 /** Perform RDD cleanup. */
178- def doCleanupRDD (rddId : Int , blocking : Boolean ) {
186+ def doCleanupRDD (rddId : Int , blocking : Boolean ): Unit = {
179187 try {
180188 logDebug(" Cleaning RDD " + rddId)
181189 sc.unpersistRDD(rddId, blocking)
@@ -187,7 +195,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
187195 }
188196
189197 /** Perform shuffle cleanup, asynchronously. */
190- def doCleanupShuffle (shuffleId : Int , blocking : Boolean ) {
198+ def doCleanupShuffle (shuffleId : Int , blocking : Boolean ): Unit = {
191199 try {
192200 logDebug(" Cleaning shuffle " + shuffleId)
193201 mapOutputTrackerMaster.unregisterShuffle(shuffleId)
@@ -200,7 +208,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
200208 }
201209
202210 /** Perform broadcast cleanup. */
203- def doCleanupBroadcast (broadcastId : Long , blocking : Boolean ) {
211+ def doCleanupBroadcast (broadcastId : Long , blocking : Boolean ): Unit = {
204212 try {
205213 logDebug(s " Cleaning broadcast $broadcastId" )
206214 broadcastManager.unbroadcast(broadcastId, true , blocking)
@@ -212,7 +220,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
212220 }
213221
214222 /** Perform accumulator cleanup. */
215- def doCleanupAccum (accId : Long , blocking : Boolean ) {
223+ def doCleanupAccum (accId : Long , blocking : Boolean ): Unit = {
216224 try {
217225 logDebug(" Cleaning accumulator " + accId)
218226 Accumulators .remove(accId)
@@ -223,6 +231,18 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
223231 }
224232 }
225233
234+ /** Perform checkpoint cleanup. */
235+ def doCleanCheckpoint (rddId : Int ): Unit = {
236+ try {
237+ logDebug(" Cleaning rdd checkpoint data " + rddId)
238+ RDDCheckpointData .clearRDDCheckpointData(sc, rddId)
239+ logInfo(" Cleaned rdd checkpoint data " + rddId)
240+ }
241+ catch {
242+ case e : Exception => logError(" Error cleaning rdd checkpoint data " + rddId, e)
243+ }
244+ }
245+
226246 private def blockManagerMaster = sc.env.blockManager.master
227247 private def broadcastManager = sc.env.broadcastManager
228248 private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf [MapOutputTrackerMaster ]
0 commit comments