From d6effee4ee967f15210d0d57526beab4e3f9c8e2 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 5 Mar 2014 13:30:27 +0530 Subject: [PATCH 1/4] SPARK-1165 Implemented RDD.intersection in python. --- python/pyspark/rdd.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index be23f87f5ed2..f667916d550c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -319,6 +319,19 @@ def union(self, other): return RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx, self.ctx.serializer) + def intersection(self, other): + """ + Return the intersection of this RDD and another one. + + >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) + >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) + >>> rdd1.intersection(rdd2).collect() + [1, 2, 3] + """ + return self.map(lambda v: (v, None)).cogroup( + other.map(lambda v: (v, None))).filter( + lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)).keys() + def _reserialize(self): if self._jrdd_deserializer == self.ctx.serializer: return self From d0c71f3a24ea1cec336c9bb4820a6f3fb317953a Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Wed, 5 Mar 2014 14:10:01 +0530 Subject: [PATCH 2/4] SPARK-1165 RDD.intersection in java --- .../apache/spark/api/java/JavaDoubleRDD.scala | 6 ++++ .../apache/spark/api/java/JavaPairRDD.scala | 8 +++++ .../org/apache/spark/api/java/JavaRDD.scala | 7 +++++ .../java/org/apache/spark/JavaAPISuite.java | 31 +++++++++++++++++++ python/pyspark/rdd.py | 3 ++ 5 files changed, 55 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index d1787061bc64..948f95e8e6f9 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -140,6 +140,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja */ def union(other: JavaDoubleRDD): JavaDoubleRDD = fromRDD(srdd.union(other.srdd)) + /** + * Return the intersection of this RDD and another one. The output will not contain any duplicate + * elements, even if the input RDDs did. + */ + def intersection(other: JavaDoubleRDD): JavaDoubleRDD = fromRDD(srdd.intersection(other.srdd)) + // Double RDD functions /** Add up the elements in this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 857626fe84af..61749fb3c88e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -126,6 +126,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def union(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.union(other.rdd)) + /** + * Return the intersection of this RDD and another one. The output will not contain any duplicate + * elements, even if the input RDDs did. + */ + def intersection(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] = + new JavaPairRDD[K, V](rdd.intersection(other.rdd)) + + // first() has to be overridden here so that the generated method has the signature // 'public scala.Tuple2 first()'; if the trait's definition is used, // then the method has the signature 'public java.lang.Object first()', diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index e973c46edd1c..23ce8652c5c8 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -106,6 +106,13 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) */ def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd)) + + /** + * Return the intersection of this RDD and another one. The output will not contain any duplicate + * elements, even if the input RDDs did. + */ + def intersection(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.intersection(other.rdd)) + /** * Return an RDD with the elements from `this` that are not in `other`. * diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index c7d0e2d57772..40e853c39ca9 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -110,6 +110,37 @@ public void sparkContextUnion() { Assert.assertEquals(4, pUnion.count()); } + @SuppressWarnings("unchecked") + @Test + public void intersection() { + List ints1 = Arrays.asList(1, 10, 2, 3, 4, 5); + List ints2 = Arrays.asList(1, 6, 2, 3, 7, 8); + JavaRDD s1 = sc.parallelize(ints1); + JavaRDD s2 = sc.parallelize(ints2); + + JavaRDD intersections = s1.intersection(s2); + Assert.assertEquals(3, intersections.count()); + + ArrayList list = new ArrayList(); + JavaRDD empty = sc.parallelize(list); + JavaRDD emptyIntersection = empty.intersection(s2); + Assert.assertEquals(0, emptyIntersection.count()); + + List doubles = Arrays.asList(1.0, 2.0); + JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles); + JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles); + JavaDoubleRDD dIntersection = d1.intersection(d2); + Assert.assertEquals(2, dIntersection.count()); + + List> pairs = new ArrayList>(); + pairs.add(new Tuple2(1, 2)); + pairs.add(new Tuple2(3, 4)); + JavaPairRDD p1 = sc.parallelizePairs(pairs); + JavaPairRDD p2 = sc.parallelizePairs(pairs); + JavaPairRDD pIntersection = p1.intersection(p2); + Assert.assertEquals(2, pIntersection.count()); + } + @Test public void sortByKey() { List> pairs = new ArrayList>(); diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f667916d550c..389c1db17b0c 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -323,6 +323,9 @@ def intersection(self, other): """ Return the intersection of this RDD and another one. + Note: The output will not contain any duplicate elements, even if the + input RDDs did. + >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) >>> rdd1.intersection(rdd2).collect() From 1fea8133669f2e7926364659179fa6023a3d06f6 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 7 Mar 2014 06:55:36 +0530 Subject: [PATCH 3/4] correct the lines wrapping --- python/pyspark/rdd.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 389c1db17b0c..c447a1e17571 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -331,9 +331,10 @@ def intersection(self, other): >>> rdd1.intersection(rdd2).collect() [1, 2, 3] """ - return self.map(lambda v: (v, None)).cogroup( - other.map(lambda v: (v, None))).filter( - lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)).keys() + return self.map(lambda v: (v, None)) \ + .cogroup(other.map(lambda v: (v, None))) \ + .filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \ + .keys() def _reserialize(self): if self._jrdd_deserializer == self.ctx.serializer: From 9b015e9c8b095f71ebbbbb6f277ae0520e589944 Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Sat, 8 Mar 2014 08:02:06 +0530 Subject: [PATCH 4/4] Added a note, shuffle is required for intersection. --- .../scala/org/apache/spark/api/java/JavaDoubleRDD.scala | 2 ++ .../scala/org/apache/spark/api/java/JavaPairRDD.scala | 2 ++ .../main/scala/org/apache/spark/api/java/JavaRDD.scala | 2 ++ python/pyspark/rdd.py | 8 ++++---- 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index 948f95e8e6f9..f816bb43a5b4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -143,6 +143,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja /** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. + * + * Note that this method performs a shuffle internally. */ def intersection(other: JavaDoubleRDD): JavaDoubleRDD = fromRDD(srdd.intersection(other.srdd)) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 61749fb3c88e..0ff428c12035 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -129,6 +129,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. + * + * Note that this method performs a shuffle internally. */ def intersection(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.intersection(other.rdd)) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 23ce8652c5c8..91bf404631f4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -110,6 +110,8 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) /** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. + * + * Note that this method performs a shuffle internally. */ def intersection(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.intersection(other.rdd)) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c447a1e17571..d8dc0cee1c2f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -321,10 +321,10 @@ def union(self, other): def intersection(self, other): """ - Return the intersection of this RDD and another one. - - Note: The output will not contain any duplicate elements, even if the - input RDDs did. + Return the intersection of this RDD and another one. The output will not + contain any duplicate elements, even if the input RDDs did. + + Note that this method performs a shuffle internally. >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])