Skip to content

Commit 89a59b6

Browse files
committed
more printlns ...
1 parent 9601b47 commit 89a59b6

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,6 +835,7 @@ class DAGScheduler(
835835
stage.pendingTasks.clear()
836836

837837
// First figure out the indexes of partition ids to compute.
838+
println(s"finding partitions to compute for $stage")
838839
val partitionsToCompute: Seq[Int] = {
839840
stage match {
840841
case stage: ShuffleMapStage =>
@@ -928,6 +929,7 @@ class DAGScheduler(
928929
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
929930
}
930931
logDebug(debugString)
932+
println(debugString)
931933
}
932934
}
933935

@@ -1083,7 +1085,8 @@ class DAGScheduler(
10831085
{
10841086
newlyRunnable += shuffleStage
10851087
}
1086-
println(s"newly runnable stages = $newlyRunnable")
1088+
val newlyRunnableWithJob = newlyRunnable.map{x => x -> activeJobForStage(x)}
1089+
println(s"newly runnable stages = $newlyRunnableWithJob")
10871090
waitingStages --= newlyRunnable
10881091
runningStages ++= newlyRunnable
10891092
for {

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,13 @@ class DAGSchedulerSuite
563563
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
564564
val shuffleId = shuffleDep.shuffleId
565565
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
566-
submit(reduceRdd, Array(0, 1))
566+
val jobId = submit(reduceRdd, Array(0, 1))
567+
println(s"late fetch failure: jobId = $jobId")
568+
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
569+
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
570+
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
571+
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
572+
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
567573

568574
val mapStageId = 0
569575
def countSubmittedMapStageAttempts(): Int = {
@@ -574,6 +580,11 @@ class DAGSchedulerSuite
574580
assert(countSubmittedMapStageAttempts() === 1)
575581

576582
println("late fetch failure: taskSets = " + taskSets)
583+
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
584+
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
585+
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
586+
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
587+
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
577588
complete(taskSets(0), Seq(
578589
(Success, makeMapStatus("hostA", 1)),
579590
(Success, makeMapStatus("hostB", 1))))
@@ -582,6 +593,13 @@ class DAGSchedulerSuite
582593
Array("hostA", "hostB"))
583594

584595
println("late fetch failure: taskSets = " + taskSets)
596+
println("late fetch failure: submittedStages = " + sparkListener.submittedStageInfos)
597+
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
598+
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
599+
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
600+
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
601+
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
602+
585603
// The first result task fails, with a fetch failure for the output from the first mapper.
586604
runEvent(CompletionEvent(
587605
taskSets(1).tasks(0),

0 commit comments

Comments
 (0)