Skip to content

Commit c96d5e2

Browse files
author
Andrew Or
committed
Run a GC every 30 minutes (configurable)
1 parent 6a8cf80 commit c96d5e2

File tree

1 file changed

+20
-1
lines changed

1 file changed

+20
-1
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818
package org.apache.spark
1919

2020
import java.lang.ref.{ReferenceQueue, WeakReference}
21+
import java.util.concurrent.{TimeUnit, ScheduledExecutorService}
2122

2223
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
2324

2425
import org.apache.spark.broadcast.Broadcast
2526
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
26-
import org.apache.spark.util.Utils
27+
import org.apache.spark.util.{ThreadUtils, Utils}
2728

2829
/**
2930
* Classes that represent cleaning tasks.
@@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6667

6768
private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
6869

70+
private val periodicGCService: ScheduledExecutorService =
71+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")
72+
73+
/**
74+
* How often to trigger a garbage collection in this JVM.
75+
*
76+
* This context cleaner triggers cleanups only when weak references are garbage collected.
77+
* In long-running applications with large driver JVMs, where there is little memory pressure
78+
* on the driver, this may happen very occasionally or not at all. Not cleaning at all may
79+
* lead to executors running out of disk space after a while.
80+
*/
81+
private val periodicGCInterval =
82+
sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
83+
6984
/**
7085
* Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
7186
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
@@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
104119
cleaningThread.setDaemon(true)
105120
cleaningThread.setName("Spark Context Cleaner")
106121
cleaningThread.start()
122+
periodicGCService.scheduleAtFixedRate(new Runnable {
123+
override def run(): Unit = System.gc()
124+
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
107125
}
108126

109127
/**
@@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
119137
cleaningThread.interrupt()
120138
}
121139
cleaningThread.join()
140+
periodicGCService.shutdown()
122141
}
123142

124143
/** Register a RDD for cleanup when it is garbage collected. */

0 commit comments

Comments
 (0)