From c4d37111c3bbf1db4eddb7dbaaff487fe82d81ae Mon Sep 17 00:00:00 2001 From: iRakson Date: Sat, 18 Apr 2020 22:46:58 +0530 Subject: [PATCH] call stop() before killing executors --- .../org/apache/spark/executor/Executor.scala | 46 ++++++++----------- .../CoarseGrainedSchedulerBackend.scala | 2 + 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2bfa1cea4b26..fce8daceb3f7 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -69,10 +69,6 @@ private[spark] class Executor( logInfo(s"Starting executor ID $executorId on host $executorHostname") - private val executorShutdown = new AtomicBoolean(false) - ShutdownHookManager.addShutdownHook( - () => stop() - ) // Application dependencies (added through SparkContext) that we've fetched so far on this node. // Each map holds the master's timestamp for the version of that file or JAR we got. private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]() @@ -282,29 +278,27 @@ private[spark] class Executor( } def stop(): Unit = { - if (!executorShutdown.getAndSet(true)) { - env.metricsSystem.report() - try { - metricsPoller.stop() - } catch { - case NonFatal(e) => - logWarning("Unable to stop executor metrics poller", e) - } - try { - heartbeater.stop() - } catch { - case NonFatal(e) => - logWarning("Unable to stop heartbeater", e) - } - threadPool.shutdown() + env.metricsSystem.report() + try { + metricsPoller.stop() + } catch { + case NonFatal(e) => + logWarning("Unable to stop executor metrics poller", e) + } + try { + heartbeater.stop() + } catch { + case NonFatal(e) => + logWarning("Unable to stop heartbeater", e) + } + threadPool.shutdown() - // Notify plugins that executor is shutting down so they can terminate cleanly - Utils.withContextClassLoader(replClassLoader) { - plugins.foreach(_.shutdown()) - } - if (!isLocal) { - env.stop() - } + // Notify plugins that executor is shutting down so they can terminate cleanly + Utils.withContextClassLoader(replClassLoader) { + plugins.foreach(_.shutdown()) + } + if (!isLocal) { + env.stop() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 701d69ba4349..f5475b87931c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -769,6 +769,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val killExecutors: Boolean => Future[Boolean] = if (executorsToKill.nonEmpty) { + executorsToKill.foreach(id => + executorDataMap.get(id).foreach(_.executorEndpoint.send(StopExecutor))) _ => doKillExecutors(executorsToKill) } else { _ => Future.successful(false)