@@ -88,7 +88,6 @@ class DAGSchedulerSuite
8888 // normally done by TaskSetManager
8989 taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
9090 taskSets += taskSet
91- println(s " submitting taskSet $taskSet. taskSets = $taskSets" )
9291 }
9392 override def cancelTasks (stageId : Int , interruptThread : Boolean ) {
9493 cancelledStages += stageId
@@ -558,18 +557,11 @@ class DAGSchedulerSuite
558557 /** This tests the case where another FetchFailed comes in while the map stage is getting
559558 * re-run. */
560559 test(" late fetch failures don't cause multiple concurrent attempts for the same map stage" ) {
561- println(" begin late fetch failure" )
562560 val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
563561 val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
564562 val shuffleId = shuffleDep.shuffleId
565563 val reduceRdd = new MyRDD (sc, 2 , List (shuffleDep))
566- val jobId = submit(reduceRdd, Array (0 , 1 ))
567- println(s " late fetch failure: jobId = $jobId" )
568- println(s " late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}" )
569- println(s " late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}" )
570- println(s " late fetch failure: waitingStages = ${scheduler.waitingStages}" )
571- println(s " late fetch failure: runningStages = ${scheduler.runningStages}" )
572- println(s " late fetch failure: failedStages = ${scheduler.failedStages}" )
564+ submit(reduceRdd, Array (0 , 1 ))
573565
574566 val mapStageId = 0
575567 def countSubmittedMapStageAttempts (): Int = {
@@ -579,26 +571,14 @@ class DAGSchedulerSuite
579571 // The map stage should have been submitted.
580572 assert(countSubmittedMapStageAttempts() === 1 )
581573
582- println(" late fetch failure: taskSets = " + taskSets)
583- println(s " late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}" )
584- println(s " late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}" )
585- println(s " late fetch failure: waitingStages = ${scheduler.waitingStages}" )
586- println(s " late fetch failure: runningStages = ${scheduler.runningStages}" )
587- println(s " late fetch failure: failedStages = ${scheduler.failedStages}" )
588574 complete(taskSets(0 ), Seq (
589- (Success , makeMapStatus(" hostA" , 1 )),
590- (Success , makeMapStatus(" hostB" , 1 ))))
575+ (Success , makeMapStatus(" hostA" , 2 )),
576+ (Success , makeMapStatus(" hostB" , 2 ))))
591577 // The MapOutputTracker should know about both map output locations.
592578 assert(mapOutputTracker.getServerStatuses(shuffleId, 0 ).map(_._1.host) ===
593579 Array (" hostA" , " hostB" ))
594-
595- println(" late fetch failure: taskSets = " + taskSets)
596- println(" late fetch failure: submittedStages = " + sparkListener.submittedStageInfos)
597- println(s " late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}" )
598- println(s " late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}" )
599- println(s " late fetch failure: waitingStages = ${scheduler.waitingStages}" )
600- println(s " late fetch failure: runningStages = ${scheduler.runningStages}" )
601- println(s " late fetch failure: failedStages = ${scheduler.failedStages}" )
580+ assert(mapOutputTracker.getServerStatuses(shuffleId, 1 ).map(_._1.host) ===
581+ Array (" hostA" , " hostB" ))
602582
603583 // The first result task fails, with a fetch failure for the output from the first mapper.
604584 runEvent(CompletionEvent (
@@ -643,7 +623,6 @@ class DAGSchedulerSuite
643623 */
644624 test(" extremely late fetch failures don't cause multiple concurrent attempts for " +
645625 " the same stage" ) {
646- println(" begin extremely late fetch failure" )
647626 val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
648627 val shuffleDep = new ShuffleDependency (shuffleMapRdd, null )
649628 val shuffleId = shuffleDep.shuffleId
@@ -661,17 +640,15 @@ class DAGSchedulerSuite
661640 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
662641 assert(countSubmittedMapStageAttempts() === 1 )
663642
664- println(" extremely late fetch failure: taskSets = " + taskSets)
665643 // Complete the map stage.
666644 complete(taskSets(0 ), Seq (
667- (Success , makeMapStatus(" hostA" , 1 )),
668- (Success , makeMapStatus(" hostB" , 1 ))))
645+ (Success , makeMapStatus(" hostA" , 2 )),
646+ (Success , makeMapStatus(" hostB" , 2 ))))
669647
670648 // The reduce stage should have been submitted.
671649 sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
672650 assert(countSubmittedReduceStageAttempts() === 1 )
673651
674- println(" extremely late fetch failure: taskSets = " + taskSets)
675652 // The first result task fails, with a fetch failure for the output from the first mapper.
676653 runEvent(CompletionEvent (
677654 taskSets(1 ).tasks(0 ),
0 commit comments