Skip to content

Commit 7c07d17

Browse files
robbinspgzsxwing
authored andcommitted
[SPARK-15606][CORE] Use non-blocking removeExecutor call to avoid deadlocks
## What changes were proposed in this pull request? Set minimum number of dispatcher threads to 3 to avoid deadlocks on machines with only 2 cores ## How was this patch tested? Spark test builds Author: Pete Robbins <[email protected]> Closes #13355 from robbinspg/SPARK-13906.
1 parent 63b7f12 commit 7c07d17

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
295295
// manager to reregister itself. If that happens, the block manager master will know
296296
// about the executor, but the scheduler will not. Therefore, we should remove the
297297
// executor from the block manager when we hit this case.
298-
scheduler.sc.env.blockManager.master.removeExecutor(executorId)
298+
scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId)
299299
logInfo(s"Asked to remove non-existent executor $executorId")
300300
}
301301
}

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ class BlockManagerMaster(
4242
logInfo("Removed " + execId + " successfully in removeExecutor")
4343
}
4444

45+
/** Request removal of a dead executor from the driver endpoint.
46+
* This is only called on the driver side. Non-blocking
47+
*/
48+
def removeExecutorAsync(execId: String) {
49+
driverEndpoint.ask[Boolean](RemoveExecutor(execId))
50+
logInfo("Removal of executor " + execId + " requested")
51+
}
52+
4553
/** Register the BlockManager's id with the driver. */
4654
def registerBlockManager(
4755
blockManagerId: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit = {

0 commit comments

Comments
 (0)