Skip to content

Commit c1f344f

Browse files
scwfzsxwing
authored andcommitted
[SPARK-17929][CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset
## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-17929 Now `CoarseGrainedSchedulerBackend` reset will get the lock, ``` protected def reset(): Unit = synchronized { numPendingExecutors = 0 executorsPendingToRemove.clear() // Remove all the lingering executors that should be removed but not yet. The reason might be // because (1) disconnected event is not yet received; (2) executors die silently. executorDataMap.toMap.foreach { case (eid, _) => driverEndpoint.askWithRetry[Boolean]( RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))) } } ``` but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock. ``` private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { logDebug(s"Asked to remove executor $executorId with reason $reason") executorDataMap.get(executorId) match { case Some(executorInfo) => // This must be synchronized because variables mutated // in this block are read when requesting executors val killed = CoarseGrainedSchedulerBackend.this.synchronized { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId executorsPendingToRemove.remove(executorId).getOrElse(false) } ... ## How was this patch tested? manual test. Author: w00228970 <[email protected]> Closes #15481 from scwf/spark-17929.
1 parent 7a531e3 commit c1f344f

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -386,15 +386,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
386386
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
387387
* be called in the yarn-client mode when AM re-registers after a failure.
388388
* */
389-
protected def reset(): Unit = synchronized {
390-
numPendingExecutors = 0
391-
executorsPendingToRemove.clear()
389+
protected def reset(): Unit = {
390+
val executors = synchronized {
391+
numPendingExecutors = 0
392+
executorsPendingToRemove.clear()
393+
Set() ++ executorDataMap.keys
394+
}
392395

393396
// Remove all the lingering executors that should be removed but not yet. The reason might be
394397
// because (1) disconnected event is not yet received; (2) executors die silently.
395-
executorDataMap.toMap.foreach { case (eid, _) =>
396-
driverEndpoint.askWithRetry[Boolean](
397-
RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
398+
executors.foreach { eid =>
399+
removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
398400
}
399401
}
400402

0 commit comments

Comments
 (0)