|
21 | 21 | import java.util.*; |
22 | 22 |
|
23 | 23 | import scala.Tuple2; |
| 24 | +import scala.Tuple3; |
| 25 | +import scala.Tuple4; |
| 26 | + |
24 | 27 |
|
25 | 28 | import com.google.common.collect.Iterables; |
26 | 29 | import com.google.common.collect.Iterators; |
@@ -304,6 +307,66 @@ public void cogroup() { |
304 | 307 | cogrouped.collect(); |
305 | 308 | } |
306 | 309 |
|
| 310 | + @SuppressWarnings("unchecked") |
| 311 | + @Test |
| 312 | + public void cogroup3() { |
| 313 | + JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( |
| 314 | + new Tuple2<String, String>("Apples", "Fruit"), |
| 315 | + new Tuple2<String, String>("Oranges", "Fruit"), |
| 316 | + new Tuple2<String, String>("Oranges", "Citrus") |
| 317 | + )); |
| 318 | + JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( |
| 319 | + new Tuple2<String, Integer>("Oranges", 2), |
| 320 | + new Tuple2<String, Integer>("Apples", 3) |
| 321 | + )); |
| 322 | + JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList( |
| 323 | + new Tuple2<String, Integer>("Oranges", 21), |
| 324 | + new Tuple2<String, Integer>("Apples", 42) |
| 325 | + )); |
| 326 | + |
| 327 | + JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped = |
| 328 | + categories.cogroup(prices, quantities); |
| 329 | + Assert.assertEquals("[Fruit, Citrus]", |
| 330 | + Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); |
| 331 | + Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); |
| 332 | + Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3())); |
| 333 | + |
| 334 | + |
| 335 | + cogrouped.collect(); |
| 336 | + } |
| 337 | + |
| 338 | + @SuppressWarnings("unchecked") |
| 339 | + @Test |
| 340 | + public void cogroup4() { |
| 341 | + JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList( |
| 342 | + new Tuple2<String, String>("Apples", "Fruit"), |
| 343 | + new Tuple2<String, String>("Oranges", "Fruit"), |
| 344 | + new Tuple2<String, String>("Oranges", "Citrus") |
| 345 | + )); |
| 346 | + JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList( |
| 347 | + new Tuple2<String, Integer>("Oranges", 2), |
| 348 | + new Tuple2<String, Integer>("Apples", 3) |
| 349 | + )); |
| 350 | + JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList( |
| 351 | + new Tuple2<String, Integer>("Oranges", 21), |
| 352 | + new Tuple2<String, Integer>("Apples", 42) |
| 353 | + )); |
| 354 | + JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList( |
| 355 | + new Tuple2<String, String>("Oranges", "BR"), |
| 356 | + new Tuple2<String, String>("Apples", "US") |
| 357 | + )); |
| 358 | + |
| 359 | + JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>, Iterable<String>>> cogrouped = |
| 360 | + categories.cogroup(prices, quantities, countries); |
| 361 | + Assert.assertEquals("[Fruit, Citrus]", |
| 362 | + Iterables.toString(cogrouped.lookup("Oranges").get(0)._1())); |
| 363 | + Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2())); |
| 364 | + Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3())); |
| 365 | + Assert.assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4())); |
| 366 | + |
| 367 | + cogrouped.collect(); |
| 368 | + } |
| 369 | + |
307 | 370 | @SuppressWarnings("unchecked") |
308 | 371 | @Test |
309 | 372 | public void leftOuterJoin() { |
|
0 commit comments