Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@
* Datasets.
*/
public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
Iterable<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;

import java.io.Serializable;
import java.util.Iterator;

/**
* A function that returns zero or more records of type Double from each input record.
*/
public interface DoubleFlatMapFunction<T> extends Serializable {
public Iterable<Double> call(T t) throws Exception;
Iterator<Double> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;

import java.io.Serializable;
import java.util.Iterator;

/**
* A function that returns zero or more output records from each input record.
*/
public interface FlatMapFunction<T, R> extends Serializable {
Iterable<R> call(T t) throws Exception;
Iterator<R> call(T t) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.api.java.function;

import java.io.Serializable;
import java.util.Iterator;

/**
* A function that takes two inputs and returns zero or more output records.
*/
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
Iterable<R> call(T1 t1, T2 t2) throws Exception;
Iterator<R> call(T1 t1, T2 t2) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@
* A function that returns zero or more output records from each grouping key and its values.
*/
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
Iterable<R> call(K key, Iterator<V> values) throws Exception;
Iterator<R> call(K key, Iterator<V> values) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@
* Base interface for function used in Dataset's mapPartitions.
*/
public interface MapPartitionsFunction<T, U> extends Serializable {
Iterable<U> call(Iterator<T> input) throws Exception;
Iterator<U> call(Iterator<T> input) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.api.java.function;

import java.io.Serializable;
import java.util.Iterator;

import scala.Tuple2;

Expand All @@ -26,5 +27,5 @@
* key-value pairs are represented as scala.Tuple2 objects.
*/
public interface PairFlatMapFunction<T, K, V> extends Serializable {
public Iterable<Tuple2<K, V>> call(T t) throws Exception;
Iterator<Tuple2<K, V>> call(T t) throws Exception;
}
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}

Expand All @@ -130,7 +130,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}

Expand All @@ -139,7 +139,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
}
Expand All @@ -149,7 +149,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
(x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
Expand All @@ -160,7 +160,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
preservesPartitioning: Boolean): JavaRDD[U] = {
def fn: (Iterator[T]) => Iterator[U] = {
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
(x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
Expand All @@ -171,7 +171,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
(x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
}
Expand All @@ -182,7 +182,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
(x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
}
Expand All @@ -193,7 +193,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
preservesPartitioning: Boolean): JavaDoubleRDD = {
def fn: (Iterator[T]) => Iterator[jl.Double] = {
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
(x: Iterator[T]) => f.call(x.asJava).asScala
}
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
.map(x => x.doubleValue()))
Expand All @@ -205,7 +205,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
(x: Iterator[T]) => f.call(x.asJava).asScala
}
JavaPairRDD.fromRDD(
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
Expand Down Expand Up @@ -290,7 +290,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
other: JavaRDDLike[U, _],
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
(x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala
(x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
}
JavaRDD.fromRDD(
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])
Expand Down
24 changes: 12 additions & 12 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -880,8 +880,8 @@ public void flatMap() {
"The quick brown fox jumps over the lazy dog."));
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
});
Assert.assertEquals("Hello", words.first());
Expand All @@ -890,12 +890,12 @@ public Iterable<String> call(String x) {
JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
new PairFlatMapFunction<String, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(String s) {
public Iterator<Tuple2<String, String>> call(String s) {
List<Tuple2<String, String>> pairs = new LinkedList<>();
for (String word : s.split(" ")) {
pairs.add(new Tuple2<>(word, word));
}
return pairs;
return pairs.iterator();
}
}
);
Expand All @@ -904,12 +904,12 @@ public Iterable<Tuple2<String, String>> call(String s) {

JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
@Override
public Iterable<Double> call(String s) {
public Iterator<Double> call(String s) {
List<Double> lengths = new LinkedList<>();
for (String word : s.split(" ")) {
lengths.add((double) word.length());
}
return lengths;
return lengths.iterator();
}
});
Assert.assertEquals(5.0, doubles.first(), 0.01);
Expand All @@ -930,8 +930,8 @@ public void mapsFromPairsToPairs() {
JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
return Collections.singletonList(item.swap());
public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
return Collections.singletonList(item.swap()).iterator();
}
});
swapped.collect();
Expand All @@ -951,12 +951,12 @@ public void mapPartitions() {
JavaRDD<Integer> partitionSums = rdd.mapPartitions(
new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
public Iterable<Integer> call(Iterator<Integer> iter) {
public Iterator<Integer> call(Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum);
return Collections.singletonList(sum).iterator();
}
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
Expand Down Expand Up @@ -1367,8 +1367,8 @@ public void zipPartitions() {
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
@Override
public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
return Arrays.asList(Iterators.size(i), Iterators.size(s));
public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
}
};

Expand Down
4 changes: 2 additions & 2 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ space into words.
// Split each line into words
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
@Override public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
});
{% endhighlight %}
Expand Down
18 changes: 8 additions & 10 deletions examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.spark.examples;


import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.regex.Pattern;

import scala.Tuple2;

Expand All @@ -32,11 +35,6 @@
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;

import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.util.regex.Pattern;

/**
* Computes the PageRank of URLs from an input file. Input file should
* be in format of:
Expand Down Expand Up @@ -108,13 +106,13 @@ public Double call(Iterable<String> rs) {
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
@Override
public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
public Iterator<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
int urlCount = Iterables.size(s._1);
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
List<Tuple2<String, Double>> results = new ArrayList<>();
for (String n : s._1) {
results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
results.add(new Tuple2<>(n, s._2() / urlCount));
}
return results;
return results.iterator();
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.spark.api.java.function.PairFunction;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;

Expand All @@ -46,8 +47,8 @@ public static void main(String[] args) throws Exception {

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(SPACE.split(s));
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.examples.streaming;

import java.util.Arrays;
import java.util.Iterator;

import scala.Tuple2;

Expand Down Expand Up @@ -116,8 +117,8 @@ public static void main(String[] args) {
// compute wordcount
lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(s.split("\\s+"));
public Iterator<String> call(String s) {
return Arrays.asList(s.split("\\s+")).iterator();
}
}).mapToPair(new PairFunction<String, String, Integer>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.examples.streaming;

import com.google.common.collect.Lists;
import com.google.common.io.Closeables;

import org.apache.spark.SparkConf;
Expand All @@ -37,6 +36,8 @@
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;

/**
Expand Down Expand Up @@ -74,8 +75,8 @@ public static void main(String[] args) {
new JavaCustomReceiver(args[0], Integer.parseInt(args[1])));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
public Iterator<String> call(String x) {
return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Iterator;
import java.util.regex.Pattern;

import scala.Tuple2;

import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
Expand Down Expand Up @@ -87,8 +87,8 @@ public String call(Tuple2<String, String> tuple2) {
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
public Iterator<String> call(String x) {
return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
Expand Down
Loading