@@ -324,55 +324,49 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
324324
325325 /**
326326 * This test ensures that DAGScheduler build stage graph correctly.
327- * Here, we submit an RDD[F] having a linage of RDDs as follows:
328327 *
329- * <--------------------
330- * / \
331- * [A] <--(1)-- [B] <--(2)-- [C] <--(3)-- [D] <--(4)-- [E] <--(5)-- [F]
332- * \ /
333- * <--------------------
328+ * Suppose you have the following DAG:
329+ *
330+ * [A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
331+ * \ /
332+ * <-------------
333+ *
334+ * Here, RDD B has a shuffle dependency on RDD A, and RDD C has shuffle dependency on both B and A.
335+ * The shuffle dependency IDs are numbers in the DAGScheduler, but to make the example easier to
336+ * understand, let's call the shuffled data from A shuffle dependency ID s_A and the shuffled data
337+ * from B shuffle dependency ID s_B.
334338 *
335- * then check if all stages have correct parent stages.
336339 * Note: [] means an RDD, () means a shuffle dependency.
337340 */
338- test(" [SPARK-13902] parent stages " ) {
341+ test(" [SPARK-13902] not to create duplicate stage. " ) {
339342 val rddA = new MyRDD (sc, 1 , Nil )
343+ val shuffleDepA = new ShuffleDependency (rddA, new HashPartitioner (1 ))
344+ val s_A = shuffleDepA.shuffleId
340345
341- val shuffleDef1 = new ShuffleDependency (rddA, new HashPartitioner (1 ))
342- val rddB = new MyRDD (sc, 1 , List (shuffleDef1), tracker = mapOutputTracker)
346+ val rddB = new MyRDD (sc, 1 , List (shuffleDepA), tracker = mapOutputTracker)
347+ val shuffleDepB = new ShuffleDependency (rddB, new HashPartitioner (1 ))
348+ val s_B = shuffleDepB.shuffleId
343349
344- val shuffleDef2 = new ShuffleDependency (rddB, new HashPartitioner (1 ))
345- val rddC = new MyRDD (sc, 1 , List (shuffleDef2), tracker = mapOutputTracker)
350+ val rddC = new MyRDD (sc, 1 , List (shuffleDepA, shuffleDepB), tracker = mapOutputTracker)
351+ val shuffleDepC = new ShuffleDependency (rddC, new HashPartitioner (1 ))
352+ val s_C = shuffleDepC.shuffleId
346353
347- val shuffleDef3 = new ShuffleDependency (rddC, new HashPartitioner (1 ))
348- val rddD = new MyRDD (sc, 1 , List (shuffleDef3, new OneToOneDependency (rddB)),
349- tracker = mapOutputTracker)
354+ val rddD = new MyRDD (sc, 1 , List (shuffleDepC), tracker = mapOutputTracker)
350355
351- val shuffleDef4 = new ShuffleDependency (rddD, new HashPartitioner (1 ))
352- val rddE = new MyRDD (sc, 1 , List (new OneToOneDependency (rddC), shuffleDef4),
353- tracker = mapOutputTracker)
354-
355- val shuffleDef5 = new ShuffleDependency (rddE, new HashPartitioner (1 ))
356- val rddF = new MyRDD (sc, 1 , List (shuffleDef5),
357- tracker = mapOutputTracker)
358- submit(rddF, Array (0 ))
356+ submit(rddD, Array (0 ))
359357
360- assert(scheduler.shuffleToMapStage.size === 5 )
358+ assert(scheduler.shuffleToMapStage.size === 3 )
361359 assert(scheduler.activeJobs.size === 1 )
362360
363- val mapStage1 = scheduler.shuffleToMapStage(shuffleDef1.shuffleId)
364- val mapStage2 = scheduler.shuffleToMapStage(shuffleDef2.shuffleId)
365- val mapStage3 = scheduler.shuffleToMapStage(shuffleDef3.shuffleId)
366- val mapStage4 = scheduler.shuffleToMapStage(shuffleDef4.shuffleId)
367- val mapStage5 = scheduler.shuffleToMapStage(shuffleDef5.shuffleId)
361+ val mapStageA = scheduler.shuffleToMapStage(s_A)
362+ val mapStageB = scheduler.shuffleToMapStage(s_B)
363+ val mapStageC = scheduler.shuffleToMapStage(s_C)
368364 val finalStage = scheduler.activeJobs.head.finalStage
369365
370- assert(mapStage1.parents.isEmpty)
371- assert(mapStage2.parents === List (mapStage1))
372- assert(mapStage3.parents === List (mapStage2))
373- assert(mapStage4.parents === List (mapStage1, mapStage3))
374- assert(mapStage5.parents === List (mapStage2, mapStage4))
375- assert(finalStage.parents === List (mapStage5))
366+ assert(mapStageA.parents.isEmpty)
367+ assert(mapStageB.parents === List (mapStageA))
368+ assert(mapStageC.parents === List (mapStageA, mapStageB))
369+ assert(finalStage.parents === List (mapStageC))
376370 }
377371
378372 test(" zero split job" ) {
0 commit comments