From 3681fae6b5364a5cf55700e1510473d8d9b77cd3 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 14 Oct 2016 17:24:30 +0800 Subject: [PATCH 1/5] use send --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index edc3c199376ef..ef243714b8a31 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -393,7 +393,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => - driverEndpoint.askWithRetry[Boolean]( + driverEndpoint.send( RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } From 2997ccb25dd1bb7dfcef44054f91d5d1132cd686 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Mon, 17 Oct 2016 13:57:04 +0800 Subject: [PATCH 2/5] do not call ask, and copy the code of remove executor --- .../CoarseGrainedSchedulerBackend.scala | 30 ++++++++++++++----- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ef243714b8a31..e2a7f86064097 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,19 +92,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 + // Executors that have been lost, but for which we don't yet know the real exit reason. + protected val executorsPendingLossReason = new HashSet[String] + + protected val addressToExecutorId = new HashMap[RpcAddress, String] + class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { - // Executors that have been lost, but for which we don't yet know the real exit reason. - protected val executorsPendingLossReason = new HashSet[String] - // If this DriverEndpoint is changed to support multiple threads, // then this may need to be changed so that we don't share the serializer // instance across threads private val ser = SparkEnv.get.closureSerializer.newInstance() - protected val addressToExecutorId = new HashMap[RpcAddress, String] - private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") @@ -392,9 +392,23 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. - executorDataMap.toMap.foreach { case (eid, _) => - driverEndpoint.send( - RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) + // Note: here copy the code of remove executor from DriverEndpoint to avoid deadlock(reset + // and removeExecutor both to get the lock of schedulerbackend.) + val reason = SlaveLost("Stale executor after cluster manager re-registered.") + executorDataMap.toMap.foreach { case (executorId, executorInfo) => + executorInfo.executorEndpoint.send(StopExecutor) + logDebug(s"Asked to remove executor $executorId with reason $reason") + val killed = { + addressToExecutorId -= executorInfo.executorAddress + executorDataMap -= executorId + executorsPendingLossReason -= executorId + executorsPendingToRemove.remove(executorId).getOrElse(false) + } + totalCoreCount.addAndGet(-executorInfo.totalCores) + totalRegisteredExecutors.addAndGet(-1) + scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) + listenerBus.post( + SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) } } From af6072a9846e418ece86e03c94a1a8da0f0cd928 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 19 Oct 2016 09:15:50 +0800 Subject: [PATCH 3/5] Revert "do not call ask, and copy the code of remove executor" This reverts commit 2997ccb25dd1bb7dfcef44054f91d5d1132cd686. --- .../CoarseGrainedSchedulerBackend.scala | 30 +++++-------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e2a7f86064097..ef243714b8a31 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,19 +92,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // The num of current max ExecutorId used to re-register appMaster @volatile protected var currentExecutorIdCounter = 0 - // Executors that have been lost, but for which we don't yet know the real exit reason. - protected val executorsPendingLossReason = new HashSet[String] - - protected val addressToExecutorId = new HashMap[RpcAddress, String] - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) extends ThreadSafeRpcEndpoint with Logging { + // Executors that have been lost, but for which we don't yet know the real exit reason. + protected val executorsPendingLossReason = new HashSet[String] + // If this DriverEndpoint is changed to support multiple threads, // then this may need to be changed so that we don't share the serializer // instance across threads private val ser = SparkEnv.get.closureSerializer.newInstance() + protected val addressToExecutorId = new HashMap[RpcAddress, String] + private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") @@ -392,23 +392,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. - // Note: here copy the code of remove executor from DriverEndpoint to avoid deadlock(reset - // and removeExecutor both to get the lock of schedulerbackend.) - val reason = SlaveLost("Stale executor after cluster manager re-registered.") - executorDataMap.toMap.foreach { case (executorId, executorInfo) => - executorInfo.executorEndpoint.send(StopExecutor) - logDebug(s"Asked to remove executor $executorId with reason $reason") - val killed = { - addressToExecutorId -= executorInfo.executorAddress - executorDataMap -= executorId - executorsPendingLossReason -= executorId - executorsPendingToRemove.remove(executorId).getOrElse(false) - } - totalCoreCount.addAndGet(-executorInfo.totalCores) - totalRegisteredExecutors.addAndGet(-1) - scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) - listenerBus.post( - SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) + executorDataMap.toMap.foreach { case (eid, _) => + driverEndpoint.send( + RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } From 7d86054cf97c81abfcbf3737a555bbd91c77e8a4 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 19 Oct 2016 13:51:58 +0800 Subject: [PATCH 4/5] comment fix --- .../cluster/CoarseGrainedSchedulerBackend.scala | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ef243714b8a31..ef85110cc7632 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -145,6 +145,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } + + case RemoveExecutor(executorId, reason) => + removeExecutor(executorId, reason) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -386,13 +389,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only * be called in the yarn-client mode when AM re-registers after a failure. * */ - protected def reset(): Unit = synchronized { - numPendingExecutors = 0 - executorsPendingToRemove.clear() + protected def reset(): Unit = { + val executors = synchronized { + numPendingExecutors = 0 + executorsPendingToRemove.clear() + Set() ++ executorDataMap.keys + } // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. - executorDataMap.toMap.foreach { case (eid, _) => + executors.foreach { eid => driverEndpoint.send( RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } From 7bf3bf8606f261e2eb1bebd3c6e3c3ff8600e140 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Fri, 21 Oct 2016 08:49:41 +0800 Subject: [PATCH 5/5] comment fix --- .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ef85110cc7632..037c71319bea8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -145,9 +145,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Ignoring the task kill since the executor is not registered. logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } - - case RemoveExecutor(executorId, reason) => - removeExecutor(executorId, reason) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { @@ -399,8 +396,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. executors.foreach { eid => - driverEndpoint.send( - RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) + removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")) } }