Skip to content

Commit b528283

Browse files
committed
[SPARK-29152][2.4]Executor Plugin shutdown when dynamic allocation is enabled
1 parent 0884766 commit b528283

File tree

1 file changed

+23
-16
lines changed

1 file changed

+23
-16
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.net.{URI, URL}
2424
import java.nio.ByteBuffer
2525
import java.util.Properties
2626
import java.util.concurrent._
27+
import java.util.concurrent.atomic.AtomicBoolean
2728
import javax.annotation.concurrent.GuardedBy
2829

2930
import scala.collection.JavaConverters._
@@ -72,6 +73,10 @@ private[spark] class Executor(
7273

7374
private val conf = env.conf
7475

76+
private val executorShutdown = new AtomicBoolean(false)
77+
ShutdownHookManager.addShutdownHook(
78+
() => stop()
79+
)
7580
// No ip or host:port - just hostname
7681
Utils.checkHost(executorHostname)
7782
// must not have port specified.
@@ -244,24 +249,26 @@ private[spark] class Executor(
244249
}
245250

246251
def stop(): Unit = {
247-
env.metricsSystem.report()
248-
heartbeater.shutdown()
249-
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
250-
threadPool.shutdown()
251-
252-
// Notify plugins that executor is shutting down so they can terminate cleanly
253-
Utils.withContextClassLoader(replClassLoader) {
254-
executorPlugins.foreach { plugin =>
255-
try {
256-
plugin.shutdown()
257-
} catch {
258-
case e: Exception =>
259-
logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
252+
if (!executorShutdown.getAndSet(true)) {
253+
env.metricsSystem.report()
254+
heartbeater.shutdown()
255+
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
256+
threadPool.shutdown()
257+
258+
// Notify plugins that executor is shutting down so they can terminate cleanly
259+
Utils.withContextClassLoader(replClassLoader) {
260+
executorPlugins.foreach { plugin =>
261+
try {
262+
plugin.shutdown()
263+
} catch {
264+
case e: Exception =>
265+
logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
266+
}
260267
}
261268
}
262-
}
263-
if (!isLocal) {
264-
env.stop()
269+
if (!isLocal) {
270+
env.stop()
271+
}
265272
}
266273
}
267274

0 commit comments

Comments
 (0)