@@ -237,11 +237,12 @@ class DAGScheduler(
237237 case Some (stage) => stage
238238 case None =>
239239 // We are going to register ancestor shuffle dependencies
240- registerShuffleDependencies(shuffleDep, firstJobId)
240+ getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
241+ shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
242+ }
241243 // Then register current shuffleDep
242244 val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
243245 shuffleToMapStage(shuffleDep.shuffleId) = stage
244-
245246 stage
246247 }
247248 }
@@ -352,16 +353,6 @@ class DAGScheduler(
352353 parents.toList
353354 }
354355
355- /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
356- private def registerShuffleDependencies (shuffleDep : ShuffleDependency [_, _, _], firstJobId : Int ) {
357- val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
358- while (parentsWithNoMapStage.nonEmpty) {
359- val currentShufDep = parentsWithNoMapStage.pop()
360- val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
361- shuffleToMapStage(currentShufDep.shuffleId) = stage
362- }
363- }
364-
365356 /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
366357 private def getAncestorShuffleDependencies (rdd : RDD [_]): Stack [ShuffleDependency [_, _, _]] = {
367358 val parents = new Stack [ShuffleDependency [_, _, _]]
@@ -378,11 +369,9 @@ class DAGScheduler(
378369 if (! shuffleToMapStage.contains(shufDep.shuffleId)) {
379370 parents.push(shufDep)
380371 }
381-
382- waitingForVisit.push(shufDep.rdd)
383372 case _ =>
384- waitingForVisit.push(dep.rdd)
385373 }
374+ waitingForVisit.push(dep.rdd)
386375 }
387376 }
388377 }
@@ -1039,39 +1028,22 @@ class DAGScheduler(
10391028 // we registered these map outputs.
10401029 mapOutputTracker.registerMapOutputs(
10411030 shuffleStage.shuffleDep.shuffleId,
1042- shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head ).toArray,
1031+ shuffleStage.outputLocs.map(_.headOption.orNull ).toArray,
10431032 changeEpoch = true )
10441033
10451034 clearCacheLocs()
1035+
1036+ // Some tasks had failed; let's resubmit this shuffleStage
1037+ // TODO: Lower-level scheduler should also deal with this
10461038 if (shuffleStage.outputLocs.contains(Nil )) {
1047- // Some tasks had failed; let's resubmit this shuffleStage
1048- // TODO: Lower-level scheduler should also deal with this
10491039 logInfo(" Resubmitting " + shuffleStage + " (" + shuffleStage.name +
10501040 " ) because some of its tasks had failed: " +
10511041 shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
10521042 .map(_._2).mkString(" , " ))
10531043 submitStage(shuffleStage)
1054- } else {
1055- val newlyRunnable = new ArrayBuffer [Stage ]
1056- for (shuffleStage <- waitingStages) {
1057- logInfo(" Missing parents for " + shuffleStage + " : " +
1058- getMissingParentStages(shuffleStage))
1059- }
1060- for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
1061- {
1062- newlyRunnable += shuffleStage
1063- }
1064- waitingStages --= newlyRunnable
1065- runningStages ++= newlyRunnable
1066- for {
1067- shuffleStage <- newlyRunnable.sortBy(_.id)
1068- jobId <- activeJobForStage(shuffleStage)
1069- } {
1070- logInfo(" Submitting " + shuffleStage + " (" +
1071- shuffleStage.rdd + " ), which is now runnable" )
1072- submitMissingTasks(shuffleStage, jobId)
1073- }
10741044 }
1045+
1046+ // Note: newly runnable stages will be submitted below when we submit waiting stages
10751047 }
10761048 }
10771049
@@ -1169,7 +1141,7 @@ class DAGScheduler(
11691141 // TODO: This will be really slow if we keep accumulating shuffle map stages
11701142 for ((shuffleId, stage) <- shuffleToMapStage) {
11711143 stage.removeOutputsOnExecutor(execId)
1172- val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head ).toArray
1144+ val locs = stage.outputLocs.map(_.headOption.orNull ).toArray
11731145 mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true )
11741146 }
11751147 if (shuffleToMapStage.isEmpty) {
0 commit comments