Skip to content

Commit 74d8cf1

Browse files
iRaksondongjoon-hyun
authored andcommitted
[SPARK-29152][2.4][CORE] Executor Plugin shutdown when dynamic allocation is enabled
### What changes were proposed in this pull request? Added `shutdownHook` for shutdown method of executor plugin. This will ensure that shutdown method will be called always. ### Why are the changes needed? Whenever executors are not going down gracefully, i.e getting killed due to idle time or getting killed forcefully, shutdown method of executors plugin is not getting called. Shutdown method can be used to release any resources that plugin has acquired during its initialisation. So its important to make sure that every time a executor goes down shutdown method of plugin gets called. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested Manually Closes #26841 from iRakson/SPARK-29152_2.4. Authored-by: root1 <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 0884766 commit 74d8cf1

File tree

1 file changed

+23
-16
lines changed

1 file changed

+23
-16
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.net.{URI, URL}
2424
import java.nio.ByteBuffer
2525
import java.util.Properties
2626
import java.util.concurrent._
27+
import java.util.concurrent.atomic.AtomicBoolean
2728
import javax.annotation.concurrent.GuardedBy
2829

2930
import scala.collection.JavaConverters._
@@ -72,6 +73,10 @@ private[spark] class Executor(
7273

7374
private val conf = env.conf
7475

76+
private val executorShutdown = new AtomicBoolean(false)
77+
ShutdownHookManager.addShutdownHook(
78+
() => stop()
79+
)
7580
// No ip or host:port - just hostname
7681
Utils.checkHost(executorHostname)
7782
// must not have port specified.
@@ -244,24 +249,26 @@ private[spark] class Executor(
244249
}
245250

246251
def stop(): Unit = {
247-
env.metricsSystem.report()
248-
heartbeater.shutdown()
249-
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
250-
threadPool.shutdown()
251-
252-
// Notify plugins that executor is shutting down so they can terminate cleanly
253-
Utils.withContextClassLoader(replClassLoader) {
254-
executorPlugins.foreach { plugin =>
255-
try {
256-
plugin.shutdown()
257-
} catch {
258-
case e: Exception =>
259-
logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
252+
if (!executorShutdown.getAndSet(true)) {
253+
env.metricsSystem.report()
254+
heartbeater.shutdown()
255+
heartbeater.awaitTermination(10, TimeUnit.SECONDS)
256+
threadPool.shutdown()
257+
258+
// Notify plugins that executor is shutting down so they can terminate cleanly
259+
Utils.withContextClassLoader(replClassLoader) {
260+
executorPlugins.foreach { plugin =>
261+
try {
262+
plugin.shutdown()
263+
} catch {
264+
case e: Exception =>
265+
logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
266+
}
260267
}
261268
}
262-
}
263-
if (!isLocal) {
264-
env.stop()
269+
if (!isLocal) {
270+
env.stop()
271+
}
265272
}
266273
}
267274

0 commit comments

Comments
 (0)