Skip to content

Commit d792f9a

Browse files
committed
Fix Java function API methods for flatMap and mapPartitions to require producing only an Iterator, not Iterable. Also fix DStream.flatMap to require a function producing TraversableOnce only, not Traversable.
1 parent 659fd9d commit d792f9a

File tree

29 files changed

+143
-108
lines changed

29 files changed

+143
-108
lines changed

core/src/main/java/org/apache/spark/api/java/function/CoGroupFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@
2525
* Datasets.
2626
*/
2727
public interface CoGroupFunction<K, V1, V2, R> extends Serializable {
28-
Iterable<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
28+
Iterator<R> call(K key, Iterator<V1> left, Iterator<V2> right) throws Exception;
2929
}

core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
package org.apache.spark.api.java.function;
1919

2020
import java.io.Serializable;
21+
import java.util.Iterator;
2122

2223
/**
2324
* A function that returns zero or more records of type Double from each input record.
2425
*/
2526
public interface DoubleFlatMapFunction<T> extends Serializable {
26-
public Iterable<Double> call(T t) throws Exception;
27+
Iterator<Double> call(T t) throws Exception;
2728
}

core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
package org.apache.spark.api.java.function;
1919

2020
import java.io.Serializable;
21+
import java.util.Iterator;
2122

2223
/**
2324
* A function that returns zero or more output records from each input record.
2425
*/
2526
public interface FlatMapFunction<T, R> extends Serializable {
26-
Iterable<R> call(T t) throws Exception;
27+
Iterator<R> call(T t) throws Exception;
2728
}

core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
package org.apache.spark.api.java.function;
1919

2020
import java.io.Serializable;
21+
import java.util.Iterator;
2122

2223
/**
2324
* A function that takes two inputs and returns zero or more output records.
2425
*/
2526
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
26-
Iterable<R> call(T1 t1, T2 t2) throws Exception;
27+
Iterator<R> call(T1 t1, T2 t2) throws Exception;
2728
}

core/src/main/java/org/apache/spark/api/java/function/FlatMapGroupsFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@
2424
* A function that returns zero or more output records from each grouping key and its values.
2525
*/
2626
public interface FlatMapGroupsFunction<K, V, R> extends Serializable {
27-
Iterable<R> call(K key, Iterator<V> values) throws Exception;
27+
Iterator<R> call(K key, Iterator<V> values) throws Exception;
2828
}

core/src/main/java/org/apache/spark/api/java/function/MapPartitionsFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@
2424
* Base interface for function used in Dataset's mapPartitions.
2525
*/
2626
public interface MapPartitionsFunction<T, U> extends Serializable {
27-
Iterable<U> call(Iterator<T> input) throws Exception;
27+
Iterator<U> call(Iterator<T> input) throws Exception;
2828
}

core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.api.java.function;
1919

2020
import java.io.Serializable;
21+
import java.util.Iterator;
2122

2223
import scala.Tuple2;
2324

@@ -26,5 +27,5 @@
2627
* key-value pairs are represented as scala.Tuple2 objects.
2728
*/
2829
public interface PairFlatMapFunction<T, K, V> extends Serializable {
29-
public Iterable<Tuple2<K, V>> call(T t) throws Exception;
30+
Iterator<Tuple2<K, V>> call(T t) throws Exception;
3031
}

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
121121
* RDD, and then flattening the results.
122122
*/
123123
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
124-
def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
124+
def fn: (T) => Iterator[U] = (x: T) => f.call(x).asScala
125125
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
126126
}
127127

@@ -130,7 +130,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
130130
* RDD, and then flattening the results.
131131
*/
132132
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
133-
def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
133+
def fn: (T) => Iterator[jl.Double] = (x: T) => f.call(x).asScala
134134
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
135135
}
136136

@@ -139,7 +139,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
139139
* RDD, and then flattening the results.
140140
*/
141141
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
142-
def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
142+
def fn: (T) => Iterator[(K2, V2)] = (x: T) => f.call(x).asScala
143143
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
144144
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
145145
}
@@ -149,7 +149,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
149149
*/
150150
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = {
151151
def fn: (Iterator[T]) => Iterator[U] = {
152-
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
152+
(x: Iterator[T]) => f.call(x.asJava).asScala
153153
}
154154
JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U])
155155
}
@@ -160,7 +160,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
160160
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U],
161161
preservesPartitioning: Boolean): JavaRDD[U] = {
162162
def fn: (Iterator[T]) => Iterator[U] = {
163-
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
163+
(x: Iterator[T]) => f.call(x.asJava).asScala
164164
}
165165
JavaRDD.fromRDD(
166166
rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U])
@@ -171,7 +171,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
171171
*/
172172
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
173173
def fn: (Iterator[T]) => Iterator[jl.Double] = {
174-
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
174+
(x: Iterator[T]) => f.call(x.asJava).asScala
175175
}
176176
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue()))
177177
}
@@ -182,7 +182,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
182182
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
183183
JavaPairRDD[K2, V2] = {
184184
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
185-
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
185+
(x: Iterator[T]) => f.call(x.asJava).asScala
186186
}
187187
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2])
188188
}
@@ -193,7 +193,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
193193
def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]],
194194
preservesPartitioning: Boolean): JavaDoubleRDD = {
195195
def fn: (Iterator[T]) => Iterator[jl.Double] = {
196-
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
196+
(x: Iterator[T]) => f.call(x.asJava).asScala
197197
}
198198
new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning)
199199
.map(x => x.doubleValue()))
@@ -205,7 +205,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
205205
def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2],
206206
preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = {
207207
def fn: (Iterator[T]) => Iterator[(K2, V2)] = {
208-
(x: Iterator[T]) => f.call(x.asJava).iterator().asScala
208+
(x: Iterator[T]) => f.call(x.asJava).asScala
209209
}
210210
JavaPairRDD.fromRDD(
211211
rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2])
@@ -290,7 +290,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
290290
other: JavaRDDLike[U, _],
291291
f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
292292
def fn: (Iterator[T], Iterator[U]) => Iterator[V] = {
293-
(x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala
293+
(x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).asScala
294294
}
295295
JavaRDD.fromRDD(
296296
rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V])

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -880,8 +880,8 @@ public void flatMap() {
880880
"The quick brown fox jumps over the lazy dog."));
881881
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
882882
@Override
883-
public Iterable<String> call(String x) {
884-
return Arrays.asList(x.split(" "));
883+
public Iterator<String> call(String x) {
884+
return Arrays.asList(x.split(" ")).iterator();
885885
}
886886
});
887887
Assert.assertEquals("Hello", words.first());
@@ -890,12 +890,12 @@ public Iterable<String> call(String x) {
890890
JavaPairRDD<String, String> pairsRDD = rdd.flatMapToPair(
891891
new PairFlatMapFunction<String, String, String>() {
892892
@Override
893-
public Iterable<Tuple2<String, String>> call(String s) {
893+
public Iterator<Tuple2<String, String>> call(String s) {
894894
List<Tuple2<String, String>> pairs = new LinkedList<>();
895895
for (String word : s.split(" ")) {
896896
pairs.add(new Tuple2<>(word, word));
897897
}
898-
return pairs;
898+
return pairs.iterator();
899899
}
900900
}
901901
);
@@ -904,12 +904,12 @@ public Iterable<Tuple2<String, String>> call(String s) {
904904

905905
JavaDoubleRDD doubles = rdd.flatMapToDouble(new DoubleFlatMapFunction<String>() {
906906
@Override
907-
public Iterable<Double> call(String s) {
907+
public Iterator<Double> call(String s) {
908908
List<Double> lengths = new LinkedList<>();
909909
for (String word : s.split(" ")) {
910910
lengths.add((double) word.length());
911911
}
912-
return lengths;
912+
return lengths.iterator();
913913
}
914914
});
915915
Assert.assertEquals(5.0, doubles.first(), 0.01);
@@ -930,8 +930,8 @@ public void mapsFromPairsToPairs() {
930930
JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
931931
new PairFlatMapFunction<Tuple2<Integer, String>, String, Integer>() {
932932
@Override
933-
public Iterable<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
934-
return Collections.singletonList(item.swap());
933+
public Iterator<Tuple2<String, Integer>> call(Tuple2<Integer, String> item) {
934+
return Collections.singletonList(item.swap()).iterator();
935935
}
936936
});
937937
swapped.collect();
@@ -951,12 +951,12 @@ public void mapPartitions() {
951951
JavaRDD<Integer> partitionSums = rdd.mapPartitions(
952952
new FlatMapFunction<Iterator<Integer>, Integer>() {
953953
@Override
954-
public Iterable<Integer> call(Iterator<Integer> iter) {
954+
public Iterator<Integer> call(Iterator<Integer> iter) {
955955
int sum = 0;
956956
while (iter.hasNext()) {
957957
sum += iter.next();
958958
}
959-
return Collections.singletonList(sum);
959+
return Collections.singletonList(sum).iterator();
960960
}
961961
});
962962
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
@@ -1367,8 +1367,8 @@ public void zipPartitions() {
13671367
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
13681368
new FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer>() {
13691369
@Override
1370-
public Iterable<Integer> call(Iterator<Integer> i, Iterator<String> s) {
1371-
return Arrays.asList(Iterators.size(i), Iterators.size(s));
1370+
public Iterator<Integer> call(Iterator<Integer> i, Iterator<String> s) {
1371+
return Arrays.asList(Iterators.size(i), Iterators.size(s)).iterator();
13721372
}
13731373
};
13741374

docs/streaming-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ space into words.
165165
// Split each line into words
166166
JavaDStream<String> words = lines.flatMap(
167167
new FlatMapFunction<String, String>() {
168-
@Override public Iterable<String> call(String x) {
169-
return Arrays.asList(x.split(" "));
168+
@Override public Iterator<String> call(String x) {
169+
return Arrays.asList(x.split(" ")).iterator();
170170
}
171171
});
172172
{% endhighlight %}

0 commit comments

Comments
 (0)