From 33ecb17fb8d3189fd5c0461690b08ac593936f18 Mon Sep 17 00:00:00 2001 From: Egor Pakhomov Date: Sun, 16 Mar 2014 16:10:53 +0400 Subject: [PATCH 1/6] SPARK-1259 Make RDD locally iterable --- .../main/scala/org/apache/spark/rdd/RDD.scala | 16 ++++++++++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 1 + 2 files changed, 17 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3fe56963e0008..1aa494f6a4125 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -663,6 +663,22 @@ abstract class RDD[T: ClassTag]( Array.concat(results: _*) } + /** + * Return a Stream that contains all of the elements in this RDD. + * + * In case of iterating it consumes memory as the biggest partition in cluster. + */ + def toStream(): Stream[T] = { + def collectPartition(p: Int): Array[T] = sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head + var buffer = Stream.empty[T] + for (p <- 0 until this.partitions.length) { + buffer = buffer #::: { + collectPartition(p).toStream + } + } + buffer + } + /** * Return an array that contains all of the elements in this RDD. */ diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 60bcada55245b..ef88f2bc467e3 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("basic operations") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) + assert(nums.toStream().toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) assert(dups.distinct().count() === 4) assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses? From 8be3dcf8d21bd4e886b4f21c040e94bc09a4c85b Mon Sep 17 00:00:00 2001 From: Egor Pakhomov Date: Tue, 18 Mar 2014 12:12:57 +0400 Subject: [PATCH 2/6] SPARK-1259 Make RDD locally iterable --- .../apache/spark/api/java/JavaRDDLike.scala | 18 +++++++++++++++++- .../main/scala/org/apache/spark/rdd/RDD.scala | 8 +++++--- .../java/org/apache/spark/JavaAPISuite.java | 9 +++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index af0114bee3f49..a9a1f6046cb82 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,8 @@ package org.apache.spark.api.java -import java.util.{Comparator, List => JList} +import java.util.{Comparator, Iterator => JIterator, List => JList} +import java.lang.{Iterable => JIterable} import scala.Tuple2 import scala.collection.JavaConversions._ @@ -281,6 +282,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new java.util.ArrayList(arr) } + /** + * Return a Stream that contains all of the elements in this RDD. + * + * In case of iterating it consumes memory as the biggest partition in cluster. + */ + def toLocallyIterable(): JIterable[T] = { + new JIterable[T](){ + def iterator(): JIterator[T] = { + import scala.collection.JavaConversions._ + asJavaIterator(rdd.toLocallyIterable.iterator) + } + } + } + + /** * Return an array that contains all of the elements in this RDD. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 1aa494f6a4125..d787cf110c394 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -668,11 +668,13 @@ abstract class RDD[T: ClassTag]( * * In case of iterating it consumes memory as the biggest partition in cluster. */ - def toStream(): Stream[T] = { - def collectPartition(p: Int): Array[T] = sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head + def toLocallyIterable: Stream[T] = { + def collectPartition(p: Int): Array[T] = { + sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head + } var buffer = Stream.empty[T] for (p <- 0 until this.partitions.length) { - buffer = buffer #::: { + buffer = buffer append { collectPartition(p).toStream } } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index c7d0e2d577726..4de856f498bec 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -24,6 +24,7 @@ import scala.Tuple2; +import com.google.common.collect.Lists; import com.google.common.base.Optional; import com.google.common.base.Charsets; import com.google.common.io.Files; @@ -149,6 +150,14 @@ public void call(String s) { Assert.assertEquals(2, foreachCalls); } + @Test + public void toLocallyIterable() { + List correct = Arrays.asList(1, 2, 3, 4); + JavaRDD rdd = sc.parallelize(correct); + List result = Lists.newArrayList(rdd.toLocallyIterable()); + Assert.assertTrue(correct.equals(result)); + } + @SuppressWarnings("unchecked") @Test public void lookup() { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index ef88f2bc467e3..3b3d9b83d85a1 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -33,7 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("basic operations") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) - assert(nums.toStream().toList === List(1, 2, 3, 4)) + assert(nums.toLocallyIterable.toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) assert(dups.distinct().count() === 4) assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses? From 6a994eb87a84a0ae4291505925dd6ad35806fe41 Mon Sep 17 00:00:00 2001 From: Egor Pakhomov Date: Tue, 18 Mar 2014 12:14:43 +0400 Subject: [PATCH 3/6] SPARK-1259 Make RDD locally iterable --- core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index a9a1f6046cb82..9d93323302df6 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -291,7 +291,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new JIterable[T](){ def iterator(): JIterator[T] = { import scala.collection.JavaConversions._ - asJavaIterator(rdd.toLocallyIterable.iterator) + rdd.toLocallyIterable.iterator } } } From 08363ef868dd06bca0370a7fd55338507ffa482b Mon Sep 17 00:00:00 2001 From: Egor Pakhomov Date: Sat, 22 Mar 2014 10:52:53 +0400 Subject: [PATCH 4/6] SPARK-1259 from toLocallyIterable to toLocalIterator --- .../scala/org/apache/spark/api/java/JavaRDDLike.scala | 10 +++------- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++-- .../src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- 4 files changed, 8 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 9d93323302df6..66e63e15ecb84 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -287,13 +287,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * * In case of iterating it consumes memory as the biggest partition in cluster. */ - def toLocallyIterable(): JIterable[T] = { - new JIterable[T](){ - def iterator(): JIterator[T] = { - import scala.collection.JavaConversions._ - rdd.toLocallyIterable.iterator - } - } + def toLocalIterator(): JIterator[T] = { + import scala.collection.JavaConversions._ + rdd.toLocalIterator } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d787cf110c394..04fdc313f9e66 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -668,7 +668,7 @@ abstract class RDD[T: ClassTag]( * * In case of iterating it consumes memory as the biggest partition in cluster. */ - def toLocallyIterable: Stream[T] = { + def toLocalIterator: Iterator[T] = { def collectPartition(p: Int): Array[T] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head } @@ -678,7 +678,7 @@ abstract class RDD[T: ClassTag]( collectPartition(p).toStream } } - buffer + buffer.iterator } /** diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 4de856f498bec..9ee555a329677 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -151,10 +151,10 @@ public void call(String s) { } @Test - public void toLocallyIterable() { + public void toLocalIterator() { List correct = Arrays.asList(1, 2, 3, 4); JavaRDD rdd = sc.parallelize(correct); - List result = Lists.newArrayList(rdd.toLocallyIterable()); + List result = Lists.newArrayList(rdd.toLocalIterator()); Assert.assertTrue(correct.equals(result)); } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 3b3d9b83d85a1..103c1a59a7195 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -33,7 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("basic operations") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) - assert(nums.toLocallyIterable.toList === List(1, 2, 3, 4)) + assert(nums.toLocalIterator.toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) assert(dups.distinct().count() === 4) assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses? From 34aa30018d1727723a4b82f67bbb64c3a50a92ce Mon Sep 17 00:00:00 2001 From: Egor Pakhomov Date: Fri, 4 Apr 2014 11:25:37 +0400 Subject: [PATCH 5/6] Fix toLocalIterator docs --- core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 66e63e15ecb84..952efb1dc2cc2 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -283,7 +283,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } /** - * Return a Stream that contains all of the elements in this RDD. + * Return an iterator that contains all of the elements in this RDD. * * In case of iterating it consumes memory as the biggest partition in cluster. */ diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 04fdc313f9e66..39d67ccf913ba 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -664,7 +664,7 @@ abstract class RDD[T: ClassTag]( } /** - * Return a Stream that contains all of the elements in this RDD. + * Return an iterator that contains all of the elements in this RDD. * * In case of iterating it consumes memory as the biggest partition in cluster. */ From 8ec8f249f61a1917ab7df1c65e369c68001aee00 Mon Sep 17 00:00:00 2001 From: Egor Pakhomov Date: Sat, 5 Apr 2014 10:11:04 +0400 Subject: [PATCH 6/6] Make to local iterator shorter --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 39d67ccf913ba..d8c9925d9a571 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -672,13 +672,7 @@ abstract class RDD[T: ClassTag]( def collectPartition(p: Int): Array[T] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head } - var buffer = Stream.empty[T] - for (p <- 0 until this.partitions.length) { - buffer = buffer append { - collectPartition(p).toStream - } - } - buffer.iterator + (0 until partitions.length).iterator.flatMap(i => collectPartition(i)) } /**