From 64a9ed2b599c7ca52ab993943a87d45c44c34eaf Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Fri, 14 Aug 2015 16:14:58 -0700 Subject: [PATCH 1/2] Remove unnecessary code + minor code rewrites --- .../apache/spark/scheduler/DAGScheduler.scala | 50 ++++--------------- 1 file changed, 11 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f1c63d08766c..d8465e74dfe6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -237,11 +237,12 @@ class DAGScheduler( case Some(stage) => stage case None => // We are going to register ancestor shuffle dependencies - registerShuffleDependencies(shuffleDep, firstJobId) + getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep => + shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId) + } // Then register current shuffleDep val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) shuffleToMapStage(shuffleDep.shuffleId) = stage - stage } } @@ -352,16 +353,6 @@ class DAGScheduler( parents.toList } - /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */ - private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) { - val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) - while (parentsWithNoMapStage.nonEmpty) { - val currentShufDep = parentsWithNoMapStage.pop() - val stage = newOrUsedShuffleStage(currentShufDep, firstJobId) - shuffleToMapStage(currentShufDep.shuffleId) = stage - } - } - /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { val parents = new Stack[ShuffleDependency[_, _, _]] @@ -378,11 +369,9 @@ class DAGScheduler( if (!shuffleToMapStage.contains(shufDep.shuffleId)) { parents.push(shufDep) } - - waitingForVisit.push(shufDep.rdd) case _ => - waitingForVisit.push(dep.rdd) } + waitingForVisit.push(dep.rdd) } } } @@ -1039,39 +1028,22 @@ class DAGScheduler( // we registered these map outputs. mapOutputTracker.registerMapOutputs( shuffleStage.shuffleDep.shuffleId, - shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, + shuffleStage.outputLocs.map(_.headOption.orNull).toArray, changeEpoch = true) clearCacheLocs() + + // Some tasks had failed; let's resubmit this shuffleStage + // TODO: Lower-level scheduler should also deal with this if (shuffleStage.outputLocs.contains(Nil)) { - // Some tasks had failed; let's resubmit this shuffleStage - // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty) .map(_._2).mkString(", ")) submitStage(shuffleStage) - } else { - val newlyRunnable = new ArrayBuffer[Stage] - for (shuffleStage <- waitingStages) { - logInfo("Missing parents for " + shuffleStage + ": " + - getMissingParentStages(shuffleStage)) - } - for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty) - { - newlyRunnable += shuffleStage - } - waitingStages --= newlyRunnable - runningStages ++= newlyRunnable - for { - shuffleStage <- newlyRunnable.sortBy(_.id) - jobId <- activeJobForStage(shuffleStage) - } { - logInfo("Submitting " + shuffleStage + " (" + - shuffleStage.rdd + "), which is now runnable") - submitMissingTasks(shuffleStage, jobId) - } } + + // Note: newly runnable stages will be submitted below when we submit waiting stages } } @@ -1169,7 +1141,7 @@ class DAGScheduler( // TODO: This will be really slow if we keep accumulating shuffle map stages for ((shuffleId, stage) <- shuffleToMapStage) { stage.removeOutputsOnExecutor(execId) - val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray + val locs = stage.outputLocs.map(_.headOption.orNull).toArray mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true) } if (shuffleToMapStage.isEmpty) { From 57abca30cb595ee89f096aa5c2090681e3e78889 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Sep 2015 16:02:20 -0700 Subject: [PATCH 2/2] Move comment back into if case --- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index ecda655444f1..09e963f5cdf6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1046,9 +1046,9 @@ class DAGScheduler( clearCacheLocs() - // Some tasks had failed; let's resubmit this shuffleStage - // TODO: Lower-level scheduler should also deal with this if (shuffleStage.outputLocs.contains(Nil)) { + // Some tasks had failed; let's resubmit this shuffleStage + // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name + ") because some of its tasks had failed: " + shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)