From 1d511c810aacc0a517c1f800b527025edb0401ac Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 27 Feb 2014 10:22:30 -0500 Subject: [PATCH 1/5] kill Process in workerThread --- .../spark/deploy/worker/ExecutorRunner.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2edd921066876..4a93a244d4447 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -58,30 +58,28 @@ private[spark] class ExecutorRunner( override def run() { fetchAndRunExecutor() } } workerThread.start() - // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { - if (process != null) { - logInfo("Shutdown hook killing child process.") - process.destroy() - process.waitFor() - } + killProcess() } } Runtime.getRuntime.addShutdownHook(shutdownHook) } + private def killProcess() { + if (process != null) { + logInfo("Killing process!") + process.destroy() + process.waitFor() + } + } + /** Stop this executor runner, including killing the process it launched */ def kill() { if (workerThread != null) { workerThread.interrupt() workerThread = null - if (process != null) { - logInfo("Killing process!") - process.destroy() - process.waitFor() - } state = ExecutorState.KILLED worker ! ExecutorStateChanged(appId, execId, state, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) @@ -126,7 +124,6 @@ private[spark] class ExecutorRunner( // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") process = builder.start() - val header = "Spark Executor Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) @@ -146,9 +143,10 @@ private[spark] class ExecutorRunner( val message = "Command exited with code " + exitCode worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)) } catch { - case interrupted: InterruptedException => + case interrupted: InterruptedException => { logInfo("Runner thread for executor " + fullId + " interrupted") - + killProcess() + } case e: Exception => { logError("Error running executor", e) if (process != null) { From 0accf2f1c5380f6ceadbd385c11145e1c69f6fa3 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 27 Feb 2014 15:29:24 -0500 Subject: [PATCH 2/5] set process to null after killed it --- .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 4a93a244d4447..2afeeb970bf28 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -72,6 +72,7 @@ private[spark] class ExecutorRunner( logInfo("Killing process!") process.destroy() process.waitFor() + process = null } } From eb615baacfbb794cec45424392ce4267b0487511 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Tue, 4 Mar 2014 08:38:28 -0500 Subject: [PATCH 3/5] kill the process when the error happens --- .../org/apache/spark/deploy/worker/ExecutorRunner.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2afeeb970bf28..1be7a3e646895 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -140,6 +140,7 @@ private[spark] class ExecutorRunner( // long-lived processes only. However, in the future, we might restart the executor a few // times on the same machine. val exitCode = process.waitFor() + killProcess() state = ExecutorState.FAILED val message = "Command exited with code " + exitCode worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)) @@ -150,9 +151,7 @@ private[spark] class ExecutorRunner( } case e: Exception => { logError("Error running executor", e) - if (process != null) { - process.destroy() - } + killProcess() state = ExecutorState.FAILED val message = e.getClass + ": " + e.getMessage worker ! ExecutorStateChanged(appId, execId, state, Some(message), None) From 3107aeb0f787c8668168b2a2c3a9f5bc6ed93f37 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 24 Apr 2014 07:36:01 -0400 Subject: [PATCH 4/5] address Aaron's comments --- .../apache/spark/deploy/worker/ExecutorRunner.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 1be7a3e646895..4d01c4ac9982b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -68,17 +68,15 @@ private[spark] class ExecutorRunner( } private def killProcess() { - if (process != null) { - logInfo("Killing process!") - process.destroy() - process.waitFor() - process = null - } + logInfo("Killing process!") + process.destroy() + process.waitFor() } /** Stop this executor runner, including killing the process it launched */ def kill() { if (workerThread != null) { + // the workerThread will kill the child process when interrupted workerThread.interrupt() workerThread = null state = ExecutorState.KILLED From 85767da99b92b1134ab85ca493c439060787326e Mon Sep 17 00:00:00 2001 From: CodingCat Date: Thu, 24 Apr 2014 14:32:25 -0400 Subject: [PATCH 5/5] add null checking and remove unnecessary killProce --- .../org/apache/spark/deploy/worker/ExecutorRunner.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 4d01c4ac9982b..caa0b6d75f159 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -68,9 +68,11 @@ private[spark] class ExecutorRunner( } private def killProcess() { - logInfo("Killing process!") - process.destroy() - process.waitFor() + if (process != null) { + logInfo("Killing process!") + process.destroy() + process.waitFor() + } } /** Stop this executor runner, including killing the process it launched */ @@ -138,7 +140,6 @@ private[spark] class ExecutorRunner( // long-lived processes only. However, in the future, we might restart the executor a few // times on the same machine. val exitCode = process.waitFor() - killProcess() state = ExecutorState.FAILED val message = "Command exited with code " + exitCode worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))