@@ -811,6 +811,58 @@ class DAGSchedulerSuite
811811 assert(stage2TaskSet.stageAttemptId == 0 )
812812 }
813813
814+ /**
815+ * We lose an executor after completing some shuffle map tasks on it. Those tasks get
816+ * resubmitted, and when they finish the job completes normally
817+ */
818+ test(" register map outputs correctly after ExecutorLost and task Resubmitted" ) {
819+ val firstRDD = new MyRDD (sc, 3 , Nil )
820+ val firstShuffleDep = new ShuffleDependency (firstRDD, null )
821+ val reduceRdd = new MyRDD (sc, 5 , List (firstShuffleDep))
822+ submit(reduceRdd, Array (0 ))
823+
824+ // complete some of the tasks from the first stage, on one host
825+ runEvent(CompletionEvent (
826+ taskSets(0 ).tasks(0 ), Success ,
827+ makeMapStatus(" hostA" , reduceRdd.partitions.length), null , createFakeTaskInfo(), null ))
828+ runEvent(CompletionEvent (
829+ taskSets(0 ).tasks(1 ), Success ,
830+ makeMapStatus(" hostA" , reduceRdd.partitions.length), null , createFakeTaskInfo(), null ))
831+
832+ // now that host goes down
833+ runEvent(ExecutorLost (" exec-hostA" ))
834+
835+ // so we resubmit those tasks
836+ runEvent(CompletionEvent (
837+ taskSets(0 ).tasks(0 ), Resubmitted , null , null , createFakeTaskInfo(), null ))
838+ runEvent(CompletionEvent (
839+ taskSets(0 ).tasks(1 ), Resubmitted , null , null , createFakeTaskInfo(), null ))
840+
841+ // now complete everything on a different host
842+ complete(taskSets(0 ), Seq (
843+ (Success , makeMapStatus(" hostB" , reduceRdd.partitions.length)),
844+ (Success , makeMapStatus(" hostB" , reduceRdd.partitions.length)),
845+ (Success , makeMapStatus(" hostB" , reduceRdd.partitions.length))
846+ ))
847+
848+ // now we should submit stage 1, and the map output from stage 0 should be registered
849+
850+ // check that we have all the map output for stage 0
851+ (0 until reduceRdd.partitions.length).foreach { reduceIdx =>
852+ val statuses = mapOutputTracker.getMapSizesByExecutorId(0 , reduceIdx)
853+ // really we should have already thrown an exception rather than fail either of these
854+ // asserts, but just to be extra defensive let's double check the statuses are OK
855+ assert(statuses != null )
856+ assert(statuses.nonEmpty)
857+ }
858+
859+ // and check that stage 1 has been submitted
860+ assert(taskSets.size == 2 )
861+ val stage1TaskSet = taskSets(1 )
862+ assert(stage1TaskSet.stageId == 1 )
863+ assert(stage1TaskSet.stageAttemptId == 0 )
864+ }
865+
814866 /**
815867 * Makes sure that failures of stage used by multiple jobs are correctly handled.
816868 *
0 commit comments