Skip to content

Commit 743a889

Browse files
SaldanhaJoshRosen
authored andcommitted
[SPARK-4459] Change groupBy type parameter from K to U
Please see https://issues.apache.org/jira/browse/SPARK-4459 Author: Saldanha <[email protected]> Closes apache#3327 from alokito/master and squashes the following commits: 54b1095 [Saldanha] [SPARK-4459] changed type parameter for keyBy from K to U d5f73c3 [Saldanha] [SPARK-4459] added keyBy test 316ad77 [Saldanha] SPARK-4459 changed type parameter for groupBy from K to U. 62ddd4b [Saldanha] SPARK-4459 added failing unit test
1 parent 794f3ae commit 743a889

File tree

2 files changed

+51
-7
lines changed

2 files changed

+51
-7
lines changed

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,8 +211,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
211211
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
212212
* mapping to that key.
213213
*/
214-
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
215-
implicit val ctagK: ClassTag[K] = fakeClassTag
214+
def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
215+
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
216+
implicit val ctagK: ClassTag[U] = fakeClassTag
216217
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
217218
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
218219
}
@@ -221,10 +222,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
221222
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
222223
* mapping to that key.
223224
*/
224-
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
225-
implicit val ctagK: ClassTag[K] = fakeClassTag
225+
def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = {
226+
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
227+
implicit val ctagK: ClassTag[U] = fakeClassTag
226228
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
227-
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
229+
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U])))
228230
}
229231

230232
/**
@@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
458460
/**
459461
* Creates tuples of the elements in this RDD by applying `f`.
460462
*/
461-
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
462-
implicit val ctag: ClassTag[K] = fakeClassTag
463+
def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
464+
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
465+
implicit val ctag: ClassTag[U] = fakeClassTag
463466
JavaPairRDD.fromRDD(rdd.keyBy(f))
464467
}
465468

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,47 @@ public Boolean call(Integer x) {
323323
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
324324
}
325325

326+
@Test
327+
public void groupByOnPairRDD() {
328+
// Regression test for SPARK-4459
329+
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
330+
Function<Tuple2<Integer, Integer>, Boolean> areOdd =
331+
new Function<Tuple2<Integer, Integer>, Boolean>() {
332+
@Override
333+
public Boolean call(Tuple2<Integer, Integer> x) {
334+
return (x._1() % 2 == 0) && (x._2() % 2 == 0);
335+
}
336+
};
337+
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
338+
JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
339+
Assert.assertEquals(2, oddsAndEvens.count());
340+
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
341+
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
342+
343+
oddsAndEvens = pairRDD.groupBy(areOdd, 1);
344+
Assert.assertEquals(2, oddsAndEvens.count());
345+
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
346+
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
347+
}
348+
349+
@SuppressWarnings("unchecked")
350+
@Test
351+
public void keyByOnPairRDD() {
352+
// Regression test for SPARK-4459
353+
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
354+
Function<Tuple2<Integer, Integer>, String> sumToString =
355+
new Function<Tuple2<Integer, Integer>, String>() {
356+
@Override
357+
public String call(Tuple2<Integer, Integer> x) {
358+
return String.valueOf(x._1() + x._2());
359+
}
360+
};
361+
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
362+
JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
363+
Assert.assertEquals(7, keyed.count());
364+
Assert.assertEquals(1, (long) keyed.lookup("2").get(0)._1());
365+
}
366+
326367
@SuppressWarnings("unchecked")
327368
@Test
328369
public void cogroup() {

0 commit comments

Comments
 (0)