From 17a7c4782f39a187fa7bb8d6aabd4dc841b531b0 Mon Sep 17 00:00:00 2001 From: root1 Date: Wed, 4 Dec 2019 18:31:25 +0530 Subject: [PATCH] [SPARK-29152]Executor Plugin shutdown when dynamic allocation is enabled --- .../org/apache/spark/executor/Executor.scala | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 8cd98e47b8a4..9c035c14ba6f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -24,6 +24,7 @@ import java.net.{URI, URL} import java.nio.ByteBuffer import java.util.Properties import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ @@ -65,6 +66,10 @@ private[spark] class Executor( logInfo(s"Starting executor ID $executorId on host $executorHostname") + private val executorShutdown = new AtomicBoolean(false) + ShutdownHookManager.addShutdownHook( + () => stop() + ) // Application dependencies (added through SparkContext) that we've fetched so far on this node. // Each map holds the master's timestamp for the version of that file or JAR we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() @@ -249,27 +254,29 @@ private[spark] class Executor( } def stop(): Unit = { - env.metricsSystem.report() - try { - metricsPoller.stop() - } catch { - case NonFatal(e) => - logWarning("Unable to stop executor metrics poller", e) - } - try { - heartbeater.stop() - } catch { - case NonFatal(e) => - logWarning("Unable to stop heartbeater", e) - } - threadPool.shutdown() + if (!executorShutdown.getAndSet(true)) { + env.metricsSystem.report() + try { + metricsPoller.stop() + } catch { + case NonFatal(e) => + logWarning("Unable to stop executor metrics poller", e) + } + try { + heartbeater.stop() + } catch { + case NonFatal(e) => + logWarning("Unable to stop heartbeater", e) + } + threadPool.shutdown() - // Notify plugins that executor is shutting down so they can terminate cleanly - Utils.withContextClassLoader(replClassLoader) { - plugins.foreach(_.shutdown()) - } - if (!isLocal) { - env.stop() + // Notify plugins that executor is shutting down so they can terminate cleanly + Utils.withContextClassLoader(replClassLoader) { + plugins.foreach(_.shutdown()) + } + if (!isLocal) { + env.stop() + } } }