Skip to content

Commit 93c8feb

Browse files
committed
Add a failure test for CheckpointingIterator
1 parent 49248c7 commit 93c8feb

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,15 @@ class CheckpointSuite extends SparkFunSuite with LocalSparkContext with Logging
283283
assert(rdd.collect() === (1 to 100))
284284
}
285285

286+
runTest("call RDD.iterator lazily") { reliableCheckpoint: Boolean =>
287+
val parCollection = sc.makeRDD(1 to 10, 1)
288+
checkpoint(parCollection, reliableCheckpoint)
289+
val lazyRDD = new LazyRDD(parCollection)
290+
checkpoint(lazyRDD, reliableCheckpoint)
291+
lazyRDD.take(5)
292+
assert(lazyRDD.collect() === (1 to 10))
293+
}
294+
286295
// Utility test methods
287296

288297
/** Checkpoint the RDD either locally or reliably. */
@@ -501,6 +510,20 @@ class FatRDD(parent: RDD[Int]) extends RDD[Int](parent) {
501510
}
502511
}
503512

513+
class LazyRDD(parent: RDD[Int]) extends RDD[Int](parent) {
514+
515+
protected def getPartitions: Array[Partition] = parent.partitions
516+
517+
def compute(split: Partition, context: TaskContext): Iterator[Int] = new Iterator[Int] {
518+
519+
lazy val iter = parent.iterator(split, context)
520+
521+
override def hasNext: Boolean = iter.hasNext
522+
523+
override def next(): Int = iter.next()
524+
}
525+
}
526+
504527
/** Pair RDD that has large serialized size. */
505528
class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int, Int)](parent) {
506529
val bigData = new Array[Byte](100000)

0 commit comments

Comments
 (0)