@@ -249,6 +249,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
249249 ))
250250 }
251251
252+ test(" groupWith3" ) {
253+ val rdd1 = sc.parallelize(Array ((1 , 1 ), (1 , 2 ), (2 , 1 ), (3 , 1 )))
254+ val rdd2 = sc.parallelize(Array ((1 , 'x' ), (2 , 'y' ), (2 , 'z' ), (4 , 'w' )))
255+ val rdd3 = sc.parallelize(Array ((1 , 'a' ), (3 , 'b' ), (4 , 'c' ), (4 , 'd' )))
256+ val joined = rdd1.groupWith(rdd2, rdd3).collect()
257+ assert(joined.size === 4 )
258+ val joinedSet = joined.map(x => (x._1,
259+ (x._2._1.toList, x._2._2.toList, x._2._3.toList))).toSet
260+ assert(joinedSet === Set (
261+ (1 , (List (1 , 2 ), List ('x' ), List ('a' ))),
262+ (2 , (List (1 ), List ('y' , 'z' ), List ())),
263+ (3 , (List (1 ), List (), List ('b' ))),
264+ (4 , (List (), List ('w' ), List ('c' , 'd' )))
265+ ))
266+ }
267+
268+ test(" groupWith4" ) {
269+ val rdd1 = sc.parallelize(Array ((1 , 1 ), (1 , 2 ), (2 , 1 ), (3 , 1 )))
270+ val rdd2 = sc.parallelize(Array ((1 , 'x' ), (2 , 'y' ), (2 , 'z' ), (4 , 'w' )))
271+ val rdd3 = sc.parallelize(Array ((1 , 'a' ), (3 , 'b' ), (4 , 'c' ), (4 , 'd' )))
272+ val rdd4 = sc.parallelize(Array ((2 , '@' )))
273+ val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect()
274+ assert(joined.size === 4 )
275+ val joinedSet = joined.map(x => (x._1,
276+ (x._2._1.toList, x._2._2.toList, x._2._3.toList, x._2._4.toList))).toSet
277+ assert(joinedSet === Set (
278+ (1 , (List (1 , 2 ), List ('x' ), List ('a' ), List ())),
279+ (2 , (List (1 ), List ('y' , 'z' ), List (), List ('@' ))),
280+ (3 , (List (1 ), List (), List ('b' ), List ())),
281+ (4 , (List (), List ('w' ), List ('c' , 'd' ), List ()))
282+ ))
283+ }
284+
252285 test(" zero-partition RDD" ) {
253286 val emptyDir = Files .createTempDir()
254287 emptyDir.deleteOnExit()
0 commit comments