diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 8cd98e47b8a4..9c035c14ba6f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -24,6 +24,7 @@ import java.net.{URI, URL} import java.nio.ByteBuffer import java.util.Properties import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ @@ -65,6 +66,10 @@ private[spark] class Executor( logInfo(s"Starting executor ID $executorId on host $executorHostname") + private val executorShutdown = new AtomicBoolean(false) + ShutdownHookManager.addShutdownHook( + () => stop() + ) // Application dependencies (added through SparkContext) that we've fetched so far on this node. // Each map holds the master's timestamp for the version of that file or JAR we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() @@ -249,27 +254,29 @@ private[spark] class Executor( } def stop(): Unit = { - env.metricsSystem.report() - try { - metricsPoller.stop() - } catch { - case NonFatal(e) => - logWarning("Unable to stop executor metrics poller", e) - } - try { - heartbeater.stop() - } catch { - case NonFatal(e) => - logWarning("Unable to stop heartbeater", e) - } - threadPool.shutdown() + if (!executorShutdown.getAndSet(true)) { + env.metricsSystem.report() + try { + metricsPoller.stop() + } catch { + case NonFatal(e) => + logWarning("Unable to stop executor metrics poller", e) + } + try { + heartbeater.stop() + } catch { + case NonFatal(e) => + logWarning("Unable to stop heartbeater", e) + } + threadPool.shutdown() - // Notify plugins that executor is shutting down so they can terminate cleanly - Utils.withContextClassLoader(replClassLoader) { - plugins.foreach(_.shutdown()) - } - if (!isLocal) { - env.stop() + // Notify plugins that executor is shutting down so they can terminate cleanly + Utils.withContextClassLoader(replClassLoader) { + plugins.foreach(_.shutdown()) + } + if (!isLocal) { + env.stop() + } } }