Skip to content

Commit 00112ba

Browse files
Brennon Yorkankurdave
authored andcommitted
[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing
Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or `leftJoin`ed and have different partition sizes they fail under the `zipPartitions` method. This fix tests whether the partitions are equal or not and, if not, will repartition the other to match the partition size of the calling VertexRDD. Author: Brennon York <[email protected]> Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits: 0882590 [Brennon York] updated to properly handle differently-partitioned vertexRDDs (cherry picked from commit 9f603fc) Signed-off-by: Ankur Dave <[email protected]>
1 parent a9abcaa commit 00112ba

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,14 @@ class VertexRDDImpl[VD] private[graphx] (
9494
this.mapVertexPartitions(_.map(f))
9595

9696
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
97+
val otherPartition = other match {
98+
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
99+
other.partitionsRDD
100+
case _ =>
101+
VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
102+
}
97103
val newPartitionsRDD = partitionsRDD.zipPartitions(
98-
other.partitionsRDD, preservesPartitioning = true
104+
otherPartition, preservesPartitioning = true
99105
) { (thisIter, otherIter) =>
100106
val thisPart = thisIter.next()
101107
val otherPart = otherIter.next()
@@ -123,7 +129,7 @@ class VertexRDDImpl[VD] private[graphx] (
123129
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
124130
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin
125131
other match {
126-
case other: VertexRDD[_] =>
132+
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
127133
leftZipJoin(other)(f)
128134
case _ =>
129135
this.withPartitionsRDD[VD3](
@@ -152,7 +158,7 @@ class VertexRDDImpl[VD] private[graphx] (
152158
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
153159
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
154160
other match {
155-
case other: VertexRDD[_] =>
161+
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
156162
innerZipJoin(other)(f)
157163
case _ =>
158164
this.withPartitionsRDD(

0 commit comments

Comments
 (0)