From dac8aa9c16e0af4495607105c0a610c2f3cd2164 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 30 Aug 2016 12:57:53 -0700 Subject: [PATCH 1/7] Don't block StandaloneSchedulerBackend.executorRemoved --- .../cluster/CoarseGrainedSchedulerBackend.scala | 9 ++++++++- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 8 ++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 8259923ce31c3..9f1a7cc82d19a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import scala.concurrent.Future import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.internal.Logging @@ -407,7 +408,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } // Called by subclasses when notified of a lost worker - def removeExecutor(executorId: String, reason: ExecutorLossReason) { + protected def removeExecutor(executorId: String, reason: ExecutorLossReason) { try { driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason)) } catch { @@ -416,6 +417,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } } + protected def removeExecutorAsync( + executorId: String, + reason: ExecutorLossReason): Future[Boolean] = { + driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)) + } + def sufficientResourcesRegistered(): Boolean = true override def isReady(): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 8382fbe9ddb80..e1c26dc87f409 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * A [[SchedulerBackend]] implementation for Spark's standalone cluster manager. @@ -148,13 +148,17 @@ private[spark] class StandaloneSchedulerBackend( fullId, hostPort, cores, Utils.megabytesToString(memory))) } + /** Note: this method should not block. See [[StandaloneAppClientListener]] */ override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) case None => SlaveLost(message) } logInfo("Executor %s removed: %s".format(fullId, message)) - removeExecutor(fullId.split("/")(1), reason) + // Only log the failure since we don't care about the result. + removeExecutorAsync(fullId.split("/")(1), reason).onFailure { case t => + logError(t.getMessage, t) + }(ThreadUtils.sameThread) } override def sufficientResourcesRegistered(): Boolean = { From 142de67e2eacec2be7fd4ef76a40dcafb339575e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 30 Aug 2016 16:52:38 -0700 Subject: [PATCH 2/7] Add the return type --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 9f1a7cc82d19a..c6b788dcf1055 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -408,7 +408,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } // Called by subclasses when notified of a lost worker - protected def removeExecutor(executorId: String, reason: ExecutorLossReason) { + protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { try { driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason)) } catch { From dcd15e8e818d006956b97af7264fc4622847674e Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 30 Aug 2016 17:03:53 -0700 Subject: [PATCH 3/7] Use send for removeExecutor --- .../CoarseGrainedSchedulerBackend.scala | 34 +++++++------------ .../cluster/StandaloneSchedulerBackend.scala | 6 +--- 2 files changed, 14 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c6b788dcf1055..11d0d4f0973fd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -143,6 +143,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } + + case RemoveExecutor(executorId, reason) => + // We will remove the executor's state and cannot restore it. However, the connection + // between the driver and the executor may be still alive so that the executor won't exit + // automatically, so try to tell the executor to stop itself. See SPARK-13519. + executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) + removeExecutor(executorId, reason) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -196,14 +203,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } context.reply(true) - case RemoveExecutor(executorId, reason) => - // We will remove the executor's state and cannot restore it. However, the connection - // between the driver and the executor may be still alive so that the executor won't exit - // automatically, so try to tell the executor to stop itself. See SPARK-13519. - executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) - removeExecutor(executorId, reason) - context.reply(true) - case RetrieveSparkProps => context.reply(sparkProperties) } @@ -407,20 +406,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } - // Called by subclasses when notified of a lost worker + /** + * Called by subclasses when notified of a lost worker. It just fires the message and returns + * at once. + */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { - try { - driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason)) - } catch { - case e: Exception => - throw new SparkException("Error notifying standalone scheduler's driver endpoint", e) - } - } - - protected def removeExecutorAsync( - executorId: String, - reason: ExecutorLossReason): Future[Boolean] = { - driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)) + // Only log the failure since we don't care about the result. + driverEndpoint.send(RemoveExecutor(executorId, reason)) } def sufficientResourcesRegistered(): Boolean = true diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index e1c26dc87f409..95252d9dc5077 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -148,17 +148,13 @@ private[spark] class StandaloneSchedulerBackend( fullId, hostPort, cores, Utils.megabytesToString(memory))) } - /** Note: this method should not block. See [[StandaloneAppClientListener]] */ override def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]) { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) case None => SlaveLost(message) } logInfo("Executor %s removed: %s".format(fullId, message)) - // Only log the failure since we don't care about the result. - removeExecutorAsync(fullId.split("/")(1), reason).onFailure { case t => - logError(t.getMessage, t) - }(ThreadUtils.sameThread) + removeExecutor(fullId.split("/")(1), reason) } override def sufficientResourcesRegistered(): Boolean = { From 50b831fccd25dc01cdfd16b27b965e4cc9f7f30b Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 30 Aug 2016 17:10:23 -0700 Subject: [PATCH 4/7] Remove wrong comment --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 11d0d4f0973fd..302ae699095b6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -411,7 +411,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * at once. */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { - // Only log the failure since we don't care about the result. driverEndpoint.send(RemoveExecutor(executorId, reason)) } From fa724dc3edc33c8876f410d885c4f1ed14e93d28 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 30 Aug 2016 17:11:13 -0700 Subject: [PATCH 5/7] remove imports --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 1 - .../spark/scheduler/cluster/StandaloneSchedulerBackend.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 302ae699095b6..937efbfac624e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import scala.concurrent.Future import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} import org.apache.spark.internal.Logging diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 95252d9dc5077..8382fbe9ddb80 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle} import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler._ -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.Utils /** * A [[SchedulerBackend]] implementation for Spark's standalone cluster manager. From 0cfc2829a5b14d726d5399e6c525acc544ee10a7 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 30 Aug 2016 17:20:44 -0700 Subject: [PATCH 6/7] Change back to use ask --- .../CoarseGrainedSchedulerBackend.scala | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 937efbfac624e..513bd12460791 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -142,13 +142,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } - - case RemoveExecutor(executorId, reason) => - // We will remove the executor's state and cannot restore it. However, the connection - // between the driver and the executor may be still alive so that the executor won't exit - // automatically, so try to tell the executor to stop itself. See SPARK-13519. - executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) - removeExecutor(executorId, reason) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -202,6 +195,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } context.reply(true) + case RemoveExecutor(executorId, reason) => + // We will remove the executor's state and cannot restore it. However, the connection + // between the driver and the executor may be still alive so that the executor won't exit + // automatically, so try to tell the executor to stop itself. See SPARK-13519. + executorDataMap.get(executorId).foreach(_.executorEndpoint.send(StopExecutor)) + removeExecutor(executorId, reason) + context.reply(true) + case RetrieveSparkProps => context.reply(sparkProperties) } @@ -410,7 +411,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * at once. */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { - driverEndpoint.send(RemoveExecutor(executorId, reason)) + // Only log the failure since we don't care about the result. + driverEndpoint.ask(RemoveExecutor(executorId, reason)).onFailure { case t => + logError(t.getMessage, t) + } } def sufficientResourcesRegistered(): Boolean = true From 489dd8d09064fb5594a8413d4f353dd355d89948 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 30 Aug 2016 17:31:29 -0700 Subject: [PATCH 7/7] Add ExecutionContext --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 513bd12460791..2db3a3bb81f61 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -414,7 +414,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Only log the failure since we don't care about the result. driverEndpoint.ask(RemoveExecutor(executorId, reason)).onFailure { case t => logError(t.getMessage, t) - } + }(ThreadUtils.sameThread) } def sufficientResourcesRegistered(): Boolean = true