Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark

import java.lang.ref.{ReferenceQueue, WeakReference}
import java.util.concurrent.{TimeUnit, ScheduledExecutorService}

import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Classes that represent cleaning tasks.
Expand Down Expand Up @@ -66,6 +67,20 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {

private val cleaningThread = new Thread() { override def run() { keepCleaning() }}

private val periodicGCService: ScheduledExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("context-cleaner-periodic-gc")

/**
* How often to trigger a garbage collection in this JVM.
*
* This context cleaner triggers cleanups only when weak references are garbage collected.
* In long-running applications with large driver JVMs, where there is little memory pressure
* on the driver, this may happen very occasionally or not at all. Not cleaning at all may
* lead to executors running out of disk space after a while.
*/
private val periodicGCInterval =
sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")

/**
* Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
* is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
Expand Down Expand Up @@ -104,6 +119,9 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
periodicGCService.scheduleAtFixedRate(new Runnable {
override def run(): Unit = System.gc()
}, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
}

/**
Expand All @@ -119,6 +137,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
cleaningThread.interrupt()
}
cleaningThread.join()
periodicGCService.shutdown()
}

/** Register a RDD for cleanup when it is garbage collected. */
Expand Down