File tree Expand file tree Collapse file tree 2 files changed +4
-0
lines changed
main/scala/org/apache/spark/scheduler
test/scala/org/apache/spark/scheduler Expand file tree Collapse file tree 2 files changed +4
-0
lines changed Original file line number Diff line number Diff line change @@ -1046,6 +1046,7 @@ class DAGScheduler(
10461046
10471047 if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
10481048 markStageAsFinished(shuffleStage)
1049+ println(s " marking $shuffleStage as finished " )
10491050 logInfo(" looking for newly runnable stages" )
10501051 logInfo(" running: " + runningStages)
10511052 logInfo(" waiting: " + waitingStages)
@@ -1072,6 +1073,7 @@ class DAGScheduler(
10721073 .map(_._2).mkString(" , " ))
10731074 submitStage(shuffleStage)
10741075 } else {
1076+ println(s " looking for newly runnable stage " )
10751077 val newlyRunnable = new ArrayBuffer [Stage ]
10761078 for (shuffleStage <- waitingStages) {
10771079 logInfo(" Missing parents for " + shuffleStage + " : " +
@@ -1081,6 +1083,7 @@ class DAGScheduler(
10811083 {
10821084 newlyRunnable += shuffleStage
10831085 }
1086+ println(s " newly runnable stages = $newlyRunnable" )
10841087 waitingStages --= newlyRunnable
10851088 runningStages ++= newlyRunnable
10861089 for {
Original file line number Diff line number Diff line change @@ -88,6 +88,7 @@ class DAGSchedulerSuite
8888 // normally done by TaskSetManager
8989 taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
9090 taskSets += taskSet
91+ println(s " submitting taskSet $taskSet. taskSets = $taskSets" )
9192 }
9293 override def cancelTasks (stageId : Int , interruptThread : Boolean ) {
9394 cancelledStages += stageId
You can’t perform that action at this time.
0 commit comments