Skip to content

Commit c17334e

Browse files
zsxwingMarcelo Vanzin
authored andcommitted
[SPARK-17316][CORE] Make CoarseGrainedSchedulerBackend.removeExecutor non-blocking
## What changes were proposed in this pull request? StandaloneSchedulerBackend.executorRemoved is a blocking call right now. It may cause some deadlock since it's called inside StandaloneAppClient.ClientEndpoint. This PR just changed CoarseGrainedSchedulerBackend.removeExecutor to be non-blocking. It's safe since the only two usages (StandaloneSchedulerBackend and YarnSchedulerEndpoint) don't need the return value). ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <[email protected]> Closes #14882 from zsxwing/SPARK-17316. (cherry picked from commit 9bcb33c) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 021aa28 commit c17334e

File tree

1 file changed

+9
-8
lines changed

1 file changed

+9
-8
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -406,14 +406,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
406406
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
407407
}
408408

409-
// Called by subclasses when notified of a lost worker
410-
def removeExecutor(executorId: String, reason: ExecutorLossReason) {
411-
try {
412-
driverEndpoint.askWithRetry[Boolean](RemoveExecutor(executorId, reason))
413-
} catch {
414-
case e: Exception =>
415-
throw new SparkException("Error notifying standalone scheduler's driver endpoint", e)
416-
}
409+
/**
410+
* Called by subclasses when notified of a lost worker. It just fires the message and returns
411+
* at once.
412+
*/
413+
protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
414+
// Only log the failure since we don't care about the result.
415+
driverEndpoint.ask(RemoveExecutor(executorId, reason)).onFailure { case t =>
416+
logError(t.getMessage, t)
417+
}(ThreadUtils.sameThread)
417418
}
418419

419420
def sufficientResourcesRegistered(): Boolean = true

0 commit comments

Comments
 (0)