From f18356e8ac4f62e3d08359f8bd97956d7042bbac Mon Sep 17 00:00:00 2001 From: Brennon York Date: Mon, 23 Feb 2015 13:58:13 -0800 Subject: [PATCH 1/7] changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)] --- graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 4 +++- .../scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 09ae3f9f6c09b..93a38495ed47e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -124,8 +124,10 @@ abstract class VertexRDD[VD]( /** * Hides vertices that are the same between `this` and `other`; for vertices that are different, * keeps the values from `other`. + * + * @param other the other VertexRDD with which to diff. */ - def diff(other: VertexRDD[VD]): VertexRDD[VD] + def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] /** * Left joins this RDD with another VertexRDD with the same index. This function will fail if diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 6dad167fa7411..41e677c38bf26 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -103,9 +103,9 @@ class VertexRDDImpl[VD] private[graphx] ( override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) - override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { + override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = { val newPartitionsRDD = partitionsRDD.zipPartitions( - other.partitionsRDD, preservesPartitioning = true + VertexRDD(other).partitionsRDD, preservesPartitioning = true ) { (thisIter, otherIter) => val thisPart = thisIter.next() val otherPart = otherIter.next() From 93186f3e9d2b09219f1136772f68cd8c998a1b0e Mon Sep 17 00:00:00 2001 From: Brennon York Date: Tue, 24 Feb 2015 11:17:49 -0800 Subject: [PATCH 2/7] added back the original diff method to sustain binary compatibility --- .../scala/org/apache/spark/graphx/VertexRDD.scala | 10 ++++++++++ .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 93a38495ed47e..f9fc425ee9493 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -129,6 +129,16 @@ abstract class VertexRDD[VD]( */ def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] + /** + * Hides vertices that are the same between `this` and `other`; for vertices that are different, + * keeps the values from `other`. + * + * SPARK-5922: Deprecates this method call in place of diff(other: RDD[(VertexID, VD)]) + * + * @param other the other VertexRDD with which to diff. + */ + def diff(other: VertexRDD[VD]): VertexRDD[VD] + /** * Left joins this RDD with another VertexRDD with the same index. This function will fail if * both VertexRDDs do not share the same index. The resulting vertex set contains an entry for diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 41e677c38bf26..6155cfdbe118f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -114,6 +114,17 @@ class VertexRDDImpl[VD] private[graphx] ( this.withPartitionsRDD(newPartitionsRDD) } + override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { + val newPartitionsRDD = partitionsRDD.zipPartitions( + VertexRDD(other).partitionsRDD, preservesPartitioning = true + ) { (thisIter, otherIter) => + val thisPart = thisIter.next() + val otherPart = otherIter.next() + Iterator(thisPart.diff(otherPart)) + } + this.withPartitionsRDD(newPartitionsRDD) + } + override def leftZipJoin[VD2: ClassTag, VD3: ClassTag] (other: VertexRDD[VD2])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] = { val newPartitionsRDD = partitionsRDD.zipPartitions( From 2c678c6cc696d242be52b31f2def971d541141e4 Mon Sep 17 00:00:00 2001 From: Brennon York Date: Tue, 24 Feb 2015 16:27:50 -0800 Subject: [PATCH 3/7] added mima exclude to exclude new public diff method from VertexRDD --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ee6229aa6bbe1..3dcbf85eaf11a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -153,6 +153,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock") + ) ++ Seq( + // SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff") ) case v if v.startsWith("1.2") => From 2af0b8840782710988011e748fabbc91e95bf21c Mon Sep 17 00:00:00 2001 From: Brennon York Date: Wed, 25 Feb 2015 15:11:26 -0800 Subject: [PATCH 4/7] removed deprecation line --- graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index f9fc425ee9493..1835a2cb25857 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -133,8 +133,6 @@ abstract class VertexRDD[VD]( * Hides vertices that are the same between `this` and `other`; for vertices that are different, * keeps the values from `other`. * - * SPARK-5922: Deprecates this method call in place of diff(other: RDD[(VertexID, VD)]) - * * @param other the other VertexRDD with which to diff. */ def diff(other: VertexRDD[VD]): VertexRDD[VD] From aac1810fe6ba0fed4ff663b96715c957f477fd72 Mon Sep 17 00:00:00 2001 From: Brennon York Date: Fri, 13 Mar 2015 10:59:54 -0700 Subject: [PATCH 5/7] updated to aggregateUsingIndex and added test to ensure that method works properly --- .../apache/spark/graphx/impl/VertexRDDImpl.scala | 2 +- .../org/apache/spark/graphx/VertexRDDSuite.scala | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 6f7a6b088419f..125692ddaad83 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -104,7 +104,7 @@ class VertexRDDImpl[VD] private[graphx] ( this.mapVertexPartitions(_.map(f)) override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = { - diff(VertexRDD(other)) + diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a)) } override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 131959cea3ef7..03eae715be3cb 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.graphx +import org.apache.spark.rdd.RDD import org.scalatest.FunSuite import org.apache.spark.SparkContext @@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { } } + test("diff with RDD[(VertexId, VD)]") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n).cache() + val flipEvens: RDD[(VertexId, Int)] = + sc.parallelize(0L to 100L) + .map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache() + // diff should keep only the changed vertices + assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet) + } + } + test("leftJoin") { withSpark { sc => val n = 100 From f86375c151f76c49bd336d96007843ef53f94897 Mon Sep 17 00:00:00 2001 From: Brennon York Date: Sat, 14 Mar 2015 09:41:09 -0700 Subject: [PATCH 6/7] fixed minor include line --- .../src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 03eae715be3cb..c41d50957aac2 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.graphx -import org.apache.spark.rdd.RDD import org.scalatest.FunSuite +import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel From e800f08639092fdc933bee9fbc8c23a98f387281 Mon Sep 17 00:00:00 2001 From: Brennon York Date: Sun, 15 Mar 2015 11:13:00 -0700 Subject: [PATCH 7/7] fixed merge conflicts --- .../test/scala/org/apache/spark/graphx/VertexRDDSuite.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 4e99008643030..4f7a442ab503d 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -19,12 +19,8 @@ package org.apache.spark.graphx import org.scalatest.FunSuite -<<<<<<< HEAD -import org.apache.spark.rdd.RDD -import org.apache.spark.SparkContext -======= import org.apache.spark.{HashPartitioner, SparkContext} ->>>>>>> upstream/master +import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel class VertexRDDSuite extends FunSuite with LocalSparkContext {