Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.api.java

import java.util.{Comparator, List => JList}
import java.util.{Comparator, Iterator => JIterator, List => JList}
import java.lang.{Iterable => JIterable}

import scala.Tuple2
import scala.collection.JavaConversions._
Expand Down Expand Up @@ -281,6 +282,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new java.util.ArrayList(arr)
}

/**
* Return an iterator that contains all of the elements in this RDD.
*
* In case of iterating it consumes memory as the biggest partition in cluster.
*/
def toLocalIterator(): JIterator[T] = {
import scala.collection.JavaConversions._
rdd.toLocalIterator
}


/**
* Return an array that contains all of the elements in this RDD.
*/
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,18 @@ abstract class RDD[T: ClassTag](
Array.concat(results: _*)
}

/**
* Return an iterator that contains all of the elements in this RDD.
*
* In case of iterating it consumes memory as the biggest partition in cluster.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should say "largest partition in the RDD" - not a big deal, we can fix on merge.

*/
def toLocalIterator: Iterator[T] = {
def collectPartition(p: Int): Array[T] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
}
(0 until partitions.length).iterator.flatMap(i => collectPartition(i))
}

/**
* Return an array that contains all of the elements in this RDD.
*/
Expand Down
9 changes: 9 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import scala.Tuple2;

import com.google.common.collect.Lists;
import com.google.common.base.Optional;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
Expand Down Expand Up @@ -149,6 +150,14 @@ public void call(String s) {
Assert.assertEquals(2, foreachCalls);
}

@Test
public void toLocalIterator() {
List<Integer> correct = Arrays.asList(1, 2, 3, 4);
JavaRDD<Integer> rdd = sc.parallelize(correct);
List<Integer> result = Lists.newArrayList(rdd.toLocalIterator());
Assert.assertTrue(correct.equals(result));
}

@SuppressWarnings("unchecked")
@Test
public void lookup() {
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("basic operations") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
assert(dups.distinct().count() === 4)
assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?
Expand Down