Skip to content

Commit c402a4a

Browse files
kanzhangrxin
authored andcommitted
[SPARK-1817] RDD.zip() should verify partition sizes for each partition
RDD.zip() will throw an exception if it finds partition sizes are not the same. Author: Kan Zhang <[email protected]> Closes apache#944 from kanzhang/SPARK-1817 and squashes the following commits: c073848 [Kan Zhang] [SPARK-1817] Cosmetic updates 524c670 [Kan Zhang] [SPARK-1817] RDD.zip() should verify partition sizes for each partition
1 parent 4ca0625 commit c402a4a

File tree

5 files changed

+33
-100
lines changed

5 files changed

+33
-100
lines changed

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,19 @@ abstract class RDD[T: ClassTag](
654654
* partitions* and the *same number of elements in each partition* (e.g. one was made through
655655
* a map on the other).
656656
*/
657-
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
657+
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
658+
zipPartitions(other, true) { (thisIter, otherIter) =>
659+
new Iterator[(T, U)] {
660+
def hasNext = (thisIter.hasNext, otherIter.hasNext) match {
661+
case (true, true) => true
662+
case (false, false) => false
663+
case _ => throw new SparkException("Can only zip RDDs with " +
664+
"same number of elements in each partition")
665+
}
666+
def next = (thisIter.next, otherIter.next)
667+
}
668+
}
669+
}
658670

659671
/**
660672
* Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by

core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala

Lines changed: 0 additions & 87 deletions
This file was deleted.

core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -167,26 +167,28 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
167167
})
168168
}
169169

170-
test("ZippedRDD") {
171-
testRDD(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
172-
testRDDPartitions(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
170+
test("ZippedPartitionsRDD") {
171+
testRDD(rdd => rdd.zip(rdd.map(x => x)))
172+
testRDDPartitions(rdd => rdd.zip(rdd.map(x => x)))
173173

174-
// Test that the ZippedPartition updates parent partitions
175-
// after the parent RDD has been checkpointed and parent partitions have been changed.
176-
// Note that this test is very specific to the current implementation of ZippedRDD.
174+
// Test that ZippedPartitionsRDD updates parent partitions after parent RDDs have
175+
// been checkpointed and parent partitions have been changed.
176+
// Note that this test is very specific to the implementation of ZippedPartitionsRDD.
177177
val rdd = generateFatRDD()
178-
val zippedRDD = new ZippedRDD(sc, rdd, rdd.map(x => x))
178+
val zippedRDD = rdd.zip(rdd.map(x => x)).asInstanceOf[ZippedPartitionsRDD2[_, _, _]]
179179
zippedRDD.rdd1.checkpoint()
180180
zippedRDD.rdd2.checkpoint()
181181
val partitionBeforeCheckpoint =
182-
serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
182+
serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartitionsPartition])
183183
zippedRDD.count()
184184
val partitionAfterCheckpoint =
185-
serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
185+
serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartitionsPartition])
186186
assert(
187-
partitionAfterCheckpoint.partition1.getClass != partitionBeforeCheckpoint.partition1.getClass &&
188-
partitionAfterCheckpoint.partition2.getClass != partitionBeforeCheckpoint.partition2.getClass,
189-
"ZippedRDD.partition1 and ZippedRDD.partition2 not updated after parent RDD is checkpointed"
187+
partitionAfterCheckpoint.partitions(0).getClass !=
188+
partitionBeforeCheckpoint.partitions(0).getClass &&
189+
partitionAfterCheckpoint.partitions(1).getClass !=
190+
partitionBeforeCheckpoint.partitions(1).getClass,
191+
"ZippedPartitionsRDD partition 0 (or 1) not updated after parent RDDs are checkpointed"
190192
)
191193
}
192194

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
350350
intercept[IllegalArgumentException] {
351351
nums.zip(sc.parallelize(1 to 4, 1)).collect()
352352
}
353+
354+
intercept[SparkException] {
355+
nums.zip(sc.parallelize(1 to 5, 2)).collect()
356+
}
353357
}
354358

355359
test("partition pruning") {

project/MimaExcludes.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ object MimaExcludes {
5454
ProblemFilters.exclude[MissingMethodProblem](
5555
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1")
5656
) ++
57+
MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
58+
MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
5759
MimaBuild.excludeSparkClass("util.SerializableHyperLogLog")
5860
case v if v.startsWith("1.0") =>
5961
Seq(

0 commit comments

Comments
 (0)