Skip to content

Commit 674eb2a

Browse files
KaiXinXiaoLeisrowen
authored andcommitted
[SPARK-8974] Catch exceptions in allocation schedule task.
I meet a problem. When I submit some tasks, the thread spark-dynamic-executor-allocation should seed the message about "requestTotalExecutors", and the new executor should start. But I meet a problem about this thread, like: 2015-07-14 19:02:17,461 | WARN | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:57) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:351) at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1382) at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:343) at org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:295) at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:248) when after some minutes, I find a new ApplicationMaster start, and tasks submitted start to run. The tasks Completed. And after long time (eg, ten minutes), the number of executor does not reduce to zero. I use the default value of "spark.dynamicAllocation.minExecutors". Author: KaiXinXiaoLei <[email protected]> Closes apache#7352 from KaiXinXiaoLei/dym and squashes the following commits: 3603631 [KaiXinXiaoLei] change logError to logWarning efc4f24 [KaiXinXiaoLei] change file
1 parent b9a922e commit 674eb2a

File tree

1 file changed

+11
-1
lines changed

1 file changed

+11
-1
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark
2020
import java.util.concurrent.TimeUnit
2121

2222
import scala.collection.mutable
23+
import scala.util.control.ControlThrowable
2324

2425
import com.codahale.metrics.{Gauge, MetricRegistry}
2526

@@ -211,7 +212,16 @@ private[spark] class ExecutorAllocationManager(
211212
listenerBus.addListener(listener)
212213

213214
val scheduleTask = new Runnable() {
214-
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
215+
override def run(): Unit = {
216+
try {
217+
schedule()
218+
} catch {
219+
case ct: ControlThrowable =>
220+
throw ct
221+
case t: Throwable =>
222+
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
223+
}
224+
}
215225
}
216226
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
217227
}

0 commit comments

Comments
 (0)