From 9886f697bc3ab6fe83aa2aa4ba8a3e92ae0ec051 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 1 Apr 2015 09:49:47 -0700 Subject: [PATCH 1/4] [SPARK-6650] [core] Stop ExecutorAllocationManager when context stops. This fixes the thread leak. I also changed the unit test to keep track of allocated contexts and making sure they're closed after tests are run; this is needed since some tests use this pattern: val sc = createContext() doSomethingThatMayThrow() sc.stop() --- .../spark/ExecutorAllocationManager.scala | 38 +++++++++--------- .../scala/org/apache/spark/SparkContext.scala | 3 +- .../ExecutorAllocationManagerSuite.scala | 39 ++++++++++++------- 3 files changed, 46 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 21c6e6ffa6666..c0659ed29e40d 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -17,10 +17,12 @@ package org.apache.spark +import java.util.concurrent.{TimeUnit, Executors} + import scala.collection.mutable import org.apache.spark.scheduler._ -import org.apache.spark.util.{SystemClock, Clock} +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager( // Listener for Spark events that impact the allocation policy private val listener = new ExecutorAllocationListener + // Executor that handles the scheduling task. + private val executor = Executors.newSingleThreadScheduledExecutor( + Utils.namedThreadFactory("spark-dynamic-executor-allocation")) + /** * Verify that the settings specified through the config are valid. * If not, throw an appropriate exception. @@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager( } /** - * Register for scheduler callbacks to decide when to add and remove executors. + * Register for scheduler callbacks to decide when to add and remove executors, and start + * the scheduling task. */ def start(): Unit = { listenerBus.addListener(listener) - startPolling() + + val scheduleTask = new Runnable() { + override def run(): Unit = Utils.logUncaughtExceptions(schedule()) + } + executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) } /** - * Start the main polling thread that keeps track of when to add and remove executors. + * Stop the allocation manager. */ - private def startPolling(): Unit = { - val t = new Thread { - override def run(): Unit = { - while (true) { - try { - schedule() - } catch { - case e: Exception => logError("Exception in dynamic executor allocation thread!", e) - } - Thread.sleep(intervalMillis) - } - } - } - t.setName("spark-dynamic-executor-allocation") - t.setDaemon(true) - t.start() + def stop(): Unit = { + executor.shutdown() + executor.awaitTermination(10, TimeUnit.SECONDS) } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a70be16f77eeb..1f902cded6698 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1138,7 +1138,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Return whether dynamically adjusting the amount of resources allocated to * this application is supported. This is currently only available for YARN. */ - private[spark] def supportDynamicAllocation = + private[spark] def supportDynamicAllocation = master.contains("yarn") || dynamicAllocationTesting /** @@ -1406,6 +1406,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli dagScheduler = null listenerBus.stop() eventLogger.foreach(_.stop()) + executorAllocationManager.foreach(_.stop()) env.actorSystem.stop(heartbeatReceiver) progressBar.foreach(_.stop()) taskScheduler = null diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index abfcee75728dc..b69c91a1219ae 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark import scala.collection.mutable -import org.scalatest.{FunSuite, PrivateMethodTester} +import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo @@ -28,10 +28,20 @@ import org.apache.spark.util.ManualClock /** * Test add and remove behavior of ExecutorAllocationManager. */ -class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { +class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter { import ExecutorAllocationManager._ import ExecutorAllocationManagerSuite._ + private val contexts = new mutable.ListBuffer[SparkContext]() + + before { + contexts.clear() + } + + after { + contexts.foreach(_.stop()) + } + test("verify min/max executors") { val conf = new SparkConf() .setMaster("local") @@ -665,16 +675,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { assert(removeTimes(manager).contains("executor-2")) assert(!removeTimes(manager).contains("executor-1")) } -} - -/** - * Helper methods for testing ExecutorAllocationManager. - * This includes methods to access private methods and fields in ExecutorAllocationManager. - */ -private object ExecutorAllocationManagerSuite extends PrivateMethodTester { - private val schedulerBacklogTimeout = 1L - private val sustainedSchedulerBacklogTimeout = 2L - private val executorIdleTimeout = 3L private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = { val conf = new SparkConf() @@ -688,9 +688,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester { sustainedSchedulerBacklogTimeout.toString) .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString) .set("spark.dynamicAllocation.testing", "true") - new SparkContext(conf) + val sc = new SparkContext(conf) + contexts += sc + sc } +} + +/** + * Helper methods for testing ExecutorAllocationManager. + * This includes methods to access private methods and fields in ExecutorAllocationManager. + */ +private object ExecutorAllocationManagerSuite extends PrivateMethodTester { + private val schedulerBacklogTimeout = 1L + private val sustainedSchedulerBacklogTimeout = 2L + private val executorIdleTimeout = 3L + private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = { new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details") } From cc5a7441a8726af9f8a3ac8a04883172a107c109 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 1 Apr 2015 10:39:17 -0700 Subject: [PATCH 2/4] Stop alloc manager before scheduler. --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1f902cded6698..e257c468c59e2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1402,11 +1402,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli env.metricsSystem.report() metadataCleaner.cancel() cleaner.foreach(_.stop()) + executorAllocationManager.foreach(_.stop()) dagScheduler.stop() dagScheduler = null listenerBus.stop() eventLogger.foreach(_.stop()) - executorAllocationManager.foreach(_.stop()) env.actorSystem.stop(heartbeatReceiver) progressBar.foreach(_.stop()) taskScheduler = null From 5711512fd708ccefc1f583cd0f6edc12d6a5dc09 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 1 Apr 2015 12:55:13 -0700 Subject: [PATCH 3/4] More exception safety. --- .../org/apache/spark/ExecutorAllocationManagerSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index b69c91a1219ae..3ded1e4af8742 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -49,18 +49,19 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit .set("spark.dynamicAllocation.enabled", "true") .set("spark.dynamicAllocation.testing", "true") val sc0 = new SparkContext(conf) + contexts += sc0 assert(sc0.executorAllocationManager.isDefined) sc0.stop() // Min < 0 val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1") - intercept[SparkException] { new SparkContext(conf1) } + intercept[SparkException] { contexts += new SparkContext(conf1) } SparkEnv.get.stop() SparkContext.clearActiveContext() // Max < 0 val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1") - intercept[SparkException] { new SparkContext(conf2) } + intercept[SparkException] { contexts += new SparkContext(conf2) } SparkEnv.get.stop() SparkContext.clearActiveContext() From 652c73b7d35ab80f84d11879c7d5f1d6c613d8cd Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 1 Apr 2015 14:17:27 -0700 Subject: [PATCH 4/4] Nits. --- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c0659ed29e40d..9385f557c4614 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -17,7 +17,7 @@ package org.apache.spark -import java.util.concurrent.{TimeUnit, Executors} +import java.util.concurrent.{Executors, TimeUnit} import scala.collection.mutable