1818package org .apache .spark
1919
2020import java .lang .ref .{ReferenceQueue , WeakReference }
21+ import java .util .concurrent .{TimeUnit , ScheduledExecutorService }
2122
2223import scala .collection .mutable .{ArrayBuffer , SynchronizedBuffer }
2324
2425import org .apache .spark .broadcast .Broadcast
2526import org .apache .spark .rdd .{RDD , ReliableRDDCheckpointData }
26- import org .apache .spark .util .Utils
27+ import org .apache .spark .util .{ ThreadUtils , Utils }
2728
2829/**
2930 * Classes that represent cleaning tasks.
@@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6667
6768 private val cleaningThread = new Thread () { override def run () { keepCleaning() }}
6869
70+ private val periodicGCService : ScheduledExecutorService =
71+ ThreadUtils .newDaemonSingleThreadScheduledExecutor(" context-cleaner-periodic-gc" )
72+
73+ /**
74+ * How often to trigger a garbage collection in this JVM.
75+ *
76+ * This context cleaner triggers cleanups only when weak references are garbage collected.
77+ * In long-running applications with large driver JVMs, where there is little memory pressure
78+ * on the driver, this may happen very occasionally or not at all. Not cleaning at all may
79+ * lead to executors running out of disk space after a while.
80+ */
81+ private val periodicGCInterval =
82+ sc.conf.getTimeAsSeconds(" spark.cleaner.periodicGC.interval" , " 30min" )
83+
6984 /**
7085 * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
7186 * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
@@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
104119 cleaningThread.setDaemon(true )
105120 cleaningThread.setName(" Spark Context Cleaner" )
106121 cleaningThread.start()
122+ periodicGCService.scheduleAtFixedRate(new Runnable {
123+ override def run (): Unit = System .gc()
124+ }, periodicGCInterval, periodicGCInterval, TimeUnit .SECONDS )
107125 }
108126
109127 /**
@@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
119137 cleaningThread.interrupt()
120138 }
121139 cleaningThread.join()
140+ periodicGCService.shutdown()
122141 }
123142
124143 /** Register a RDD for cleanup when it is garbage collected. */
0 commit comments