|
18 | 18 | package org.apache.spark |
19 | 19 |
|
20 | 20 | import java.lang.ref.{ReferenceQueue, WeakReference} |
21 | | -import java.util.concurrent.{ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} |
| 21 | +import java.util.Collections |
| 22 | +import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, ScheduledExecutorService, TimeUnit} |
22 | 23 |
|
23 | 24 | import scala.collection.JavaConverters._ |
24 | 25 |
|
@@ -58,7 +59,12 @@ private class CleanupTaskWeakReference( |
58 | 59 | */ |
59 | 60 | private[spark] class ContextCleaner(sc: SparkContext) extends Logging { |
60 | 61 |
|
61 | | - private val referenceBuffer = new ConcurrentLinkedQueue[CleanupTaskWeakReference]() |
| 62 | + /** |
| 63 | + * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they |
| 64 | + * have not been handled by the reference queue. |
| 65 | + */ |
| 66 | + private val referenceBuffer = |
| 67 | + Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap) |
62 | 68 |
|
63 | 69 | private val referenceQueue = new ReferenceQueue[AnyRef] |
64 | 70 |
|
@@ -176,10 +182,10 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { |
176 | 182 | .map(_.asInstanceOf[CleanupTaskWeakReference]) |
177 | 183 | // Synchronize here to avoid being interrupted on stop() |
178 | 184 | synchronized { |
179 | | - reference.map(_.task).foreach { task => |
180 | | - logDebug("Got cleaning task " + task) |
181 | | - referenceBuffer.remove(reference.get) |
182 | | - task match { |
| 185 | + reference.foreach { ref => |
| 186 | + logDebug("Got cleaning task " + ref.task) |
| 187 | + referenceBuffer.remove(ref) |
| 188 | + ref.task match { |
183 | 189 | case CleanRDD(rddId) => |
184 | 190 | doCleanupRDD(rddId, blocking = blockOnCleanupTasks) |
185 | 191 | case CleanShuffle(shuffleId) => |
|
0 commit comments