Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,9 @@ private[spark] class ExecutorAllocationManager(
val taskIndex = taskEnd.taskInfo.index
val stageId = taskEnd.stageId
allocationManager.synchronized {
numRunningTasks -= 1
if (stageIdToNumTasks.contains(stageId)) {
numRunningTasks -= 1
}
// If the executor is no longer running any scheduled tasks, mark it as idle
if (executorIdToTaskIds.contains(executorId)) {
executorIdToTaskIds(executorId) -= taskId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,33 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager) === Map.empty)
}

test("SPARK-18981: maxNumExecutorsNeeded should properly handle speculated tasks") {
sc = createSparkContext()
val manager = sc.executorAllocationManager.get
assert(maxNumExecutorsNeeded(manager) === 0)

val stageInfo = createStageInfo(0, 1)
sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo))
assert(maxNumExecutorsNeeded(manager) === 1)

val taskInfo = createTaskInfo(0, 0, "executor-1")
val speculatedTaskInfo = createTaskInfo(1, 0, "executor-1")
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo))
assert(maxNumExecutorsNeeded(manager) === 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests looks wrong - taskIndex is higher than numTasks ?
Would be better for the test to :

  • Launch stage with 1 task.
  • Launch a normal task and 1 speculative task - with same taskIndex, but different taskId's
  • Finish normal task.
  • Ensure stage is completed.
  • Now finish speculative task and check if bug is not reproduced (it should be reproduced without this fix).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the warning info 'No stages are running, but numRunningTasks != 0' is printed and at that time the #numRunningTasks is set to 0. But after that the speculated task end event is arrived and the #numRunningTasks will plus 1.
The tests are wrong, I will fix it.


sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, speculatedTaskInfo))
assert(maxNumExecutorsNeeded(manager) === 2)

sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, taskInfo, null))
assert(maxNumExecutorsNeeded(manager) === 1)

sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo))
assert(maxNumExecutorsNeeded(manager) === 0)

sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, TaskKilled, speculatedTaskInfo, null))
assert(maxNumExecutorsNeeded(manager) === 0)
}

private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
Expand Down