Skip to content

Commit c010a44

Browse files
committed
partitions.length
1 parent 51f3c47 commit c010a44

File tree

1 file changed

+16
-16
lines changed

1 file changed

+16
-16
lines changed

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,8 @@ class DAGSchedulerSuite
453453
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
454454
submit(reduceRdd, Array(0, 1))
455455
complete(taskSets(0), Seq(
456-
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
457-
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
456+
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
457+
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
458458
// the 2nd ResultTask failed
459459
complete(taskSets(1), Seq(
460460
(Success, 42),
@@ -464,7 +464,7 @@ class DAGSchedulerSuite
464464
// ask the scheduler to try it again
465465
scheduler.resubmitFailedStages()
466466
// have the 2nd attempt pass
467-
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
467+
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length))))
468468
// we can see both result blocks now
469469
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
470470
HashSet("hostA", "hostB"))
@@ -480,8 +480,8 @@ class DAGSchedulerSuite
480480
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
481481
submit(reduceRdd, Array(0, 1))
482482
complete(taskSets(0), Seq(
483-
(Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
484-
(Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
483+
(Success, makeMapStatus("hostA", reduceRdd.partitions.length)),
484+
(Success, makeMapStatus("hostB", reduceRdd.partitions.length))))
485485
// The MapOutputTracker should know about both map output locations.
486486
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
487487
HashSet("hostA", "hostB"))
@@ -659,17 +659,17 @@ class DAGSchedulerSuite
659659
val taskSet = taskSets(0)
660660
// should be ignored for being too old
661661
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
662-
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
662+
reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
663663
// should work because it's a non-failed host
664664
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
665-
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
665+
reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
666666
// should be ignored for being too old
667667
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
668-
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
668+
reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
669669
// should work because it's a new epoch
670670
taskSet.tasks(1).epoch = newEpoch
671671
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
672-
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
672+
reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
673673
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
674674
HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
675675
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -707,9 +707,9 @@ class DAGSchedulerSuite
707707

708708
// things start out smoothly, stage 0 completes with no issues
709709
complete(taskSets(0), Seq(
710-
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)),
711-
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.size)),
712-
(Success, makeMapStatus("hostA", shuffleMapRdd.partitions.size))
710+
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
711+
(Success, makeMapStatus("hostB", shuffleMapRdd.partitions.length)),
712+
(Success, makeMapStatus("hostA", shuffleMapRdd.partitions.length))
713713
))
714714

715715
// then one executor dies, and a task fails in stage 1
@@ -726,7 +726,7 @@ class DAGSchedulerSuite
726726
val task = stage0Resubmit.tasks(0)
727727
assert(task.partitionId === 2)
728728
runEvent(CompletionEvent(task, Success,
729-
makeMapStatus("hostC", shuffleMapRdd.partitions.size), null, createFakeTaskInfo(), null))
729+
makeMapStatus("hostC", shuffleMapRdd.partitions.length), null, createFakeTaskInfo(), null))
730730

731731
// now here is where things get tricky : we will now have a task set representing
732732
// the second attempt for stage 1, but we *also* have some tasks for the first attempt for
@@ -740,12 +740,12 @@ class DAGSchedulerSuite
740740
// so that we actually have all stage outputs, though no attempt has completed all its
741741
// tasks
742742
runEvent(CompletionEvent(taskSets(3).tasks(0), Success,
743-
makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
743+
makeMapStatus("hostC", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
744744
runEvent(CompletionEvent(taskSets(3).tasks(1), Success,
745-
makeMapStatus("hostC", reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
745+
makeMapStatus("hostC", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
746746
// late task finish from the first attempt
747747
runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
748-
makeMapStatus("hostB", reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
748+
makeMapStatus("hostB", reduceRdd.partitions.length), null, createFakeTaskInfo(), null))
749749

750750
// What should happen now is that we submit stage 2. However, we might not see an error
751751
// b/c of DAGScheduler's error handling (it tends to swallow errors and just log them). But

0 commit comments

Comments
 (0)