From b5282836a3ab7c2c47651d6b72df1d81a3e4e9c5 Mon Sep 17 00:00:00 2001 From: root1 Date: Wed, 11 Dec 2019 04:33:43 +0530 Subject: [PATCH] [SPARK-29152][2.4]Executor Plugin shutdown when dynamic allocation is enabled --- .../org/apache/spark/executor/Executor.scala | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f7ff0b8ac333..10317c39ebff 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -24,6 +24,7 @@ import java.net.{URI, URL} import java.nio.ByteBuffer import java.util.Properties import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ @@ -72,6 +73,10 @@ private[spark] class Executor( private val conf = env.conf + private val executorShutdown = new AtomicBoolean(false) + ShutdownHookManager.addShutdownHook( + () => stop() + ) // No ip or host:port - just hostname Utils.checkHost(executorHostname) // must not have port specified. @@ -244,24 +249,26 @@ private[spark] class Executor( } def stop(): Unit = { - env.metricsSystem.report() - heartbeater.shutdown() - heartbeater.awaitTermination(10, TimeUnit.SECONDS) - threadPool.shutdown() - - // Notify plugins that executor is shutting down so they can terminate cleanly - Utils.withContextClassLoader(replClassLoader) { - executorPlugins.foreach { plugin => - try { - plugin.shutdown() - } catch { - case e: Exception => - logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e) + if (!executorShutdown.getAndSet(true)) { + env.metricsSystem.report() + heartbeater.shutdown() + heartbeater.awaitTermination(10, TimeUnit.SECONDS) + threadPool.shutdown() + + // Notify plugins that executor is shutting down so they can terminate cleanly + Utils.withContextClassLoader(replClassLoader) { + executorPlugins.foreach { plugin => + try { + plugin.shutdown() + } catch { + case e: Exception => + logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e) + } } } - } - if (!isLocal) { - env.stop() + if (!isLocal) { + env.stop() + } } }