Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,19 @@ abstract class RDD[T: ClassTag](
* partitions* and the *same number of elements in each partition* (e.g. one was made through
* a map on the other).
*/
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = {
zipPartitions(other, true) { (thisIter, otherIter) =>
new Iterator[(T, U)] {
def hasNext = (thisIter.hasNext, otherIter.hasNext) match {
case (true, true) => true
case (false, false) => false
case _ => throw new SparkException("Can only zip RDDs with " +
"same number of elements in each partition")
}
def next = (thisIter.next, otherIter.next)
}
}
}

/**
* Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
Expand Down
87 changes: 0 additions & 87 deletions core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala

This file was deleted.

26 changes: 14 additions & 12 deletions core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,26 +167,28 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
})
}

test("ZippedRDD") {
testRDD(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
testRDDPartitions(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
test("ZippedPartitionsRDD") {
testRDD(rdd => rdd.zip(rdd.map(x => x)))
testRDDPartitions(rdd => rdd.zip(rdd.map(x => x)))

// Test that the ZippedPartition updates parent partitions
// after the parent RDD has been checkpointed and parent partitions have been changed.
// Note that this test is very specific to the current implementation of ZippedRDD.
// Test that ZippedPartitionsRDD updates parent partitions after parent RDDs have
// been checkpointed and parent partitions have been changed.
// Note that this test is very specific to the implementation of ZippedPartitionsRDD.
val rdd = generateFatRDD()
val zippedRDD = new ZippedRDD(sc, rdd, rdd.map(x => x))
val zippedRDD = rdd.zip(rdd.map(x => x)).asInstanceOf[ZippedPartitionsRDD2[_, _, _]]
zippedRDD.rdd1.checkpoint()
zippedRDD.rdd2.checkpoint()
val partitionBeforeCheckpoint =
serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartitionsPartition])
zippedRDD.count()
val partitionAfterCheckpoint =
serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartitionsPartition])
assert(
partitionAfterCheckpoint.partition1.getClass != partitionBeforeCheckpoint.partition1.getClass &&
partitionAfterCheckpoint.partition2.getClass != partitionBeforeCheckpoint.partition2.getClass,
"ZippedRDD.partition1 and ZippedRDD.partition2 not updated after parent RDD is checkpointed"
partitionAfterCheckpoint.partitions(0).getClass !=
partitionBeforeCheckpoint.partitions(0).getClass &&
partitionAfterCheckpoint.partitions(1).getClass !=
partitionBeforeCheckpoint.partitions(1).getClass,
"ZippedPartitionsRDD partition 0 (or 1) not updated after parent RDDs are checkpointed"
)
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ class RDDSuite extends FunSuite with SharedSparkContext {
intercept[IllegalArgumentException] {
nums.zip(sc.parallelize(1 to 4, 1)).collect()
}

intercept[SparkException] {
nums.zip(sc.parallelize(1 to 5, 2)).collect()
}
}

test("partition pruning") {
Expand Down
2 changes: 2 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ object MimaExcludes {
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaDoubleRDD.countApproxDistinct$default$1")
) ++
MimaBuild.excludeSparkClass("rdd.ZippedRDD") ++
MimaBuild.excludeSparkClass("rdd.ZippedPartition") ++
MimaBuild.excludeSparkClass("util.SerializableHyperLogLog")
case v if v.startsWith("1.0") =>
Seq(
Expand Down