From a0b9860ed0312d12bbe64bcf60ce2ea2ffce88fb Mon Sep 17 00:00:00 2001 From: Zhen Peng Date: Thu, 22 May 2014 16:09:39 +0800 Subject: [PATCH 1/3] [SPARK-1901] worker should make sure executor has exited before updating executor's info --- .../spark/deploy/worker/ExecutorRunner.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2051403682737..7bf89d987ce13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -61,17 +61,18 @@ private[spark] class ExecutorRunner( // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { - killProcess() + killProcess(None) } } Runtime.getRuntime.addShutdownHook(shutdownHook) } - private def killProcess() { + private def killProcess(message: Option[String]) { if (process != null) { logInfo("Killing process!") process.destroy() - process.waitFor() + val exitCode = process.waitFor() + worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode)) } } @@ -82,7 +83,6 @@ private[spark] class ExecutorRunner( workerThread.interrupt() workerThread = null state = ExecutorState.KILLED - worker ! ExecutorStateChanged(appId, execId, state, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) } } @@ -148,14 +148,13 @@ private[spark] class ExecutorRunner( } catch { case interrupted: InterruptedException => { logInfo("Runner thread for executor " + fullId + " interrupted") - killProcess() + state = ExecutorState.KILLED + killProcess(None) } case e: Exception => { logError("Error running executor", e) - killProcess() state = ExecutorState.FAILED - val message = e.getClass + ": " + e.getMessage - worker ! ExecutorStateChanged(appId, execId, state, Some(message), None) + killProcess(Some(e.getClass + ":" + e.getMessage)) } } } From 506cea60ecd2bfa0246987d54592df62aaa9ef37 Mon Sep 17 00:00:00 2001 From: Zhen Peng Date: Fri, 23 May 2014 10:18:53 +0800 Subject: [PATCH 2/3] add some docs for killProcess() --- .../org/apache/spark/deploy/worker/ExecutorRunner.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 7bf89d987ce13..277ea4db095db 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -67,6 +67,11 @@ private[spark] class ExecutorRunner( Runtime.getRuntime.addShutdownHook(shutdownHook) } + /** + * kill executor process, wait for exit and notify worker to update resource status + * + * @param message the exception message which caused the executor's death + */ private def killProcess(message: Option[String]) { if (process != null) { logInfo("Killing process!") From 21d380b8aeb5e7d1db4a584cff2604d228f806b6 Mon Sep 17 00:00:00 2001 From: Zhen Peng Date: Thu, 29 May 2014 10:43:11 +0800 Subject: [PATCH 3/3] add some error messages --- .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 277ea4db095db..d27e0e1f15c65 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -61,7 +61,7 @@ private[spark] class ExecutorRunner( // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { - killProcess(None) + killProcess(Some("Worker shutting down")) } } Runtime.getRuntime.addShutdownHook(shutdownHook) @@ -159,7 +159,7 @@ private[spark] class ExecutorRunner( case e: Exception => { logError("Error running executor", e) state = ExecutorState.FAILED - killProcess(Some(e.getClass + ":" + e.getMessage)) + killProcess(Some(e.toString)) } } }