Skip to content

Commit 1696a44

Browse files
zhpenggaarondav
authored andcommitted
[SPARK-1901] worker should make sure executor has exited before updating executor's info
https://issues.apache.org/jira/browse/SPARK-1901 Author: Zhen Peng <[email protected]> Closes #854 from zhpengg/bugfix-worker-kills-executor and squashes the following commits: 21d380b [Zhen Peng] add some error messages 506cea6 [Zhen Peng] add some docs for killProcess() a0b9860 [Zhen Peng] [SPARK-1901] worker should make sure executor has exited before updating executor's info
1 parent 80721fb commit 1696a44

File tree

1 file changed

+12
-8
lines changed

1 file changed

+12
-8
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,23 @@ private[spark] class ExecutorRunner(
6161
// Shutdown hook that kills actors on shutdown.
6262
shutdownHook = new Thread() {
6363
override def run() {
64-
killProcess()
64+
killProcess(Some("Worker shutting down"))
6565
}
6666
}
6767
Runtime.getRuntime.addShutdownHook(shutdownHook)
6868
}
6969

70-
private def killProcess() {
70+
/**
71+
* kill executor process, wait for exit and notify worker to update resource status
72+
*
73+
* @param message the exception message which caused the executor's death
74+
*/
75+
private def killProcess(message: Option[String]) {
7176
if (process != null) {
7277
logInfo("Killing process!")
7378
process.destroy()
74-
process.waitFor()
79+
val exitCode = process.waitFor()
80+
worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode))
7581
}
7682
}
7783

@@ -82,7 +88,6 @@ private[spark] class ExecutorRunner(
8288
workerThread.interrupt()
8389
workerThread = null
8490
state = ExecutorState.KILLED
85-
worker ! ExecutorStateChanged(appId, execId, state, None, None)
8691
Runtime.getRuntime.removeShutdownHook(shutdownHook)
8792
}
8893
}
@@ -148,14 +153,13 @@ private[spark] class ExecutorRunner(
148153
} catch {
149154
case interrupted: InterruptedException => {
150155
logInfo("Runner thread for executor " + fullId + " interrupted")
151-
killProcess()
156+
state = ExecutorState.KILLED
157+
killProcess(None)
152158
}
153159
case e: Exception => {
154160
logError("Error running executor", e)
155-
killProcess()
156161
state = ExecutorState.FAILED
157-
val message = e.getClass + ": " + e.getMessage
158-
worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
162+
killProcess(Some(e.toString))
159163
}
160164
}
161165
}

0 commit comments

Comments
 (0)