Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -650,27 +650,64 @@ class DAGSchedulerSuite
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))

// pretend we were told hostA went away
val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA"))
val newEpoch = mapOutputTracker.getEpoch
assert(newEpoch > oldEpoch)

// now start completing some tasks in the shuffle map stage, under different hosts
// and epochs, and make sure scheduler updates its state correctly
val taskSet = taskSets(0)
val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage]
assert(shuffleStage.numAvailableOutputs === 0)

// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a non-failed host
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(
taskSet.tasks(0),
Success,
makeMapStatus("hostA", reduceRdd.partitions.size),
null,
createFakeTaskInfo(),
null))
assert(shuffleStage.numAvailableOutputs === 0)

// should work because it's a non-failed host (so the available map outputs will increase)
runEvent(CompletionEvent(
taskSet.tasks(0),
Success,
makeMapStatus("hostB", reduceRdd.partitions.size),
null,
createFakeTaskInfo(),
null))
assert(shuffleStage.numAvailableOutputs === 1)

// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
// should work because it's a new epoch
runEvent(CompletionEvent(
taskSet.tasks(0),
Success,
makeMapStatus("hostA", reduceRdd.partitions.size),
null,
createFakeTaskInfo(),
null))
assert(shuffleStage.numAvailableOutputs === 1)

// should work because it's a new epoch, which will increase the number of available map
// outputs, and also finish the stage
taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
runEvent(CompletionEvent(
taskSet.tasks(1),
Success,
makeMapStatus("hostA", reduceRdd.partitions.size),
null,
createFakeTaskInfo(),
null))
assert(shuffleStage.numAvailableOutputs === 2)
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))

// finish the next stage normally, which completes the job
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
assertDataStructuresEmpty()
Expand Down