Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,12 @@ private[spark] class ExecutorAllocationManager(

private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
// Number of tasks currently running on the cluster including speculative tasks.
// Should be 0 when no stages are active.
private var numRunningTasks: Int = _

// Structure to track stage and numTasks running on executors, this is used to 1) calculate
// the total running tasks; 2) track if there're tasks running on specific executor, if not
// mark this executor as idle.
private val executorIdToStageAndNumTasks =
new mutable.HashMap[String, mutable.HashMap[Int, Int]]

// Number of speculative tasks to be scheduled in each stage
private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
Expand Down Expand Up @@ -656,29 +658,33 @@ private[spark] class ExecutorAllocationManager(
stageIdToSpeculativeTaskIndices -= stageId
stageIdToExecutorPlacementHints -= stageId

// Update the executor to task number statistics
executorIdToStageAndNumTasks.foreach { case (_, stageToNumTasks) =>
stageToNumTasks -= stageId
}
val idleExecutorIds = executorIdToStageAndNumTasks.filter(_._2.isEmpty).keySet
idleExecutorIds.foreach { id =>
executorIdToStageAndNumTasks.remove(id)
allocationManager.onExecutorIdle(id)
}

// Update the executor placement hints
updateExecutorPlacementHints()

// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
if (numRunningTasks != 0) {
logWarning("No stages are running, but numRunningTasks != 0")
numRunningTasks = 0
}
}
}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageId = taskStart.stageId
val taskId = taskStart.taskInfo.taskId
val taskIndex = taskStart.taskInfo.index
val executorId = taskStart.taskInfo.executorId

allocationManager.synchronized {
numRunningTasks += 1
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
Expand All @@ -698,24 +704,30 @@ private[spark] class ExecutorAllocationManager(
}

// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
val stageToNumTasks = executorIdToStageAndNumTasks.getOrElseUpdate(
executorId, new mutable.HashMap[Int, Int])
stageToNumTasks(stageId) = stageToNumTasks.getOrElse(stageId, 0) + 1
allocationManager.onExecutorBusy(executorId)
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val executorId = taskEnd.taskInfo.executorId
val taskId = taskEnd.taskInfo.taskId
val taskIndex = taskEnd.taskInfo.index
val stageId = taskEnd.stageId
allocationManager.synchronized {
numRunningTasks -= 1
// If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
if (executorIdToTaskIds(executorId).isEmpty) {
executorIdToTaskIds -= executorId
allocationManager.onExecutorIdle(executorId)
if (executorIdToStageAndNumTasks.contains(executorId) &&
executorIdToStageAndNumTasks(executorId).contains(stageId)) {
val taskNum = executorIdToStageAndNumTasks(executorId)(stageId) - 1
if (taskNum <= 0) {
executorIdToStageAndNumTasks(executorId) -= stageId
if (executorIdToStageAndNumTasks(executorId).isEmpty) {
executorIdToStageAndNumTasks -= executorId
allocationManager.onExecutorIdle(executorId)
}
} else {
executorIdToStageAndNumTasks(executorId)(stageId) = taskNum
}
}

Expand Down Expand Up @@ -787,15 +799,17 @@ private[spark] class ExecutorAllocationManager(
/**
* The number of tasks currently running across all stages.
*/
def totalRunningTasks(): Int = numRunningTasks
def totalRunningTasks(): Int = {
executorIdToStageAndNumTasks.values.map(_.values.sum).sum
}

/**
* Return true if an executor is not currently running a task, and false otherwise.
*
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
*/
def isExecutorIdle(executorId: String): Boolean = {
!executorIdToTaskIds.contains(executorId)
!executorIdToStageAndNumTasks.contains(executorId)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,43 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager) === Map.empty)
}

test("correctly handle task failure introduces stage abortion scenario") {
sc = createSparkContext()
val manager = sc.executorAllocationManager.get
assert(maxNumExecutorsNeeded(manager) === 0)

val stageInfo = createStageInfo(0, 3)
post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo))
assert(maxNumExecutorsNeeded(manager) === 3)

val taskInfos = Seq(
createTaskInfo(1, 1, "executor-1"),
createTaskInfo(2, 2, "executor-1"),
createTaskInfo(3, 3, "executor-1"))

taskInfos.foreach(task => post(sc.listenerBus, SparkListenerTaskStart(0, 0, task)))
assert(maxNumExecutorsNeeded(manager) === 3)

// Simulate task 1 failed due to exception more than 4 times
post(sc.listenerBus,
SparkListenerTaskEnd(
0, 0, null, ExceptionFailure(null, null, null, null, None), taskInfos(0), null))
// 1 pending + 2 running
assert(maxNumExecutorsNeeded(manager) === 3)

// Stage will be aborted when task is failed more than 4 times
post(sc.listenerBus, SparkListenerStageCompleted(stageInfo))
// When stage is completed, all the related tasks should be finished
assert(maxNumExecutorsNeeded(manager) === 0)

// Simulate task 2 is killed intentionally because of stage abortion
post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, TaskKilled(""), taskInfos(1), null))
assert(maxNumExecutorsNeeded(manager) === 0)

// TaskEnd event may never be delivered, still we should guarantee executor-1 can be removed.
assert(removeTimes(manager).keySet === Set("executor-1"))
}

private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
Expand Down