@@ -543,6 +543,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
543543 partitioner : Partitioner ): JavaPairRDD [K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ])] =
544544 fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
545545
546+ /**
547+ * For each key k in `this` or `other1` or `other2` or `other3`,
548+ * return a resulting RDD that contains a tuple with the list of values
549+ * for that key in `this`, `other1`, `other2` and `other3`.
550+ */
551+ def cogroup [W1 , W2 , W3 ](other1 : JavaPairRDD [K , W1 ],
552+ other2 : JavaPairRDD [K , W2 ],
553+ other3 : JavaPairRDD [K , W3 ],
554+ partitioner : Partitioner )
555+ : JavaPairRDD [K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ], JIterable [W3 ])] =
556+ fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, partitioner)))
557+
546558 /**
547559 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
548560 * list of values for that key in `this` as well as `other`.
@@ -558,6 +570,17 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
558570 : JavaPairRDD [K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ])] =
559571 fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
560572
573+ /**
574+ * For each key k in `this` or `other1` or `other2` or `other3`,
575+ * return a resulting RDD that contains a tuple with the list of values
576+ * for that key in `this`, `other1`, `other2` and `other3`.
577+ */
578+ def cogroup [W1 , W2 , W3 ](other1 : JavaPairRDD [K , W1 ],
579+ other2 : JavaPairRDD [K , W2 ],
580+ other3 : JavaPairRDD [K , W3 ])
581+ : JavaPairRDD [K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ], JIterable [W3 ])] =
582+ fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3)))
583+
561584 /**
562585 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
563586 * list of values for that key in `this` as well as `other`.
@@ -574,6 +597,18 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
574597 : JavaPairRDD [K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ])] =
575598 fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
576599
600+ /**
601+ * For each key k in `this` or `other1` or `other2` or `other3`,
602+ * return a resulting RDD that contains a tuple with the list of values
603+ * for that key in `this`, `other1`, `other2` and `other3`.
604+ */
605+ def cogroup [W1 , W2 , W3 ](other1 : JavaPairRDD [K , W1 ],
606+ other2 : JavaPairRDD [K , W2 ],
607+ other3 : JavaPairRDD [K , W3 ],
608+ numPartitions : Int )
609+ : JavaPairRDD [K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ], JIterable [W3 ])] =
610+ fromRDD(cogroupResult3ToJava(rdd.cogroup(other1, other2, other3, numPartitions)))
611+
577612 /** Alias for cogroup. */
578613 def groupWith [W ](other : JavaPairRDD [K , W ]): JavaPairRDD [K , (JIterable [V ], JIterable [W ])] =
579614 fromRDD(cogroupResultToJava(rdd.groupWith(other)))
@@ -583,6 +618,13 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
583618 : JavaPairRDD [K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ])] =
584619 fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
585620
621+ /** Alias for cogroup. */
622+ def groupWith [W1 , W2 , W3 ](other1 : JavaPairRDD [K , W1 ],
623+ other2 : JavaPairRDD [K , W2 ],
624+ other3 : JavaPairRDD [K , W3 ])
625+ : JavaPairRDD [K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ], JIterable [W3 ])] =
626+ fromRDD(cogroupResult3ToJava(rdd.groupWith(other1, other2, other3)))
627+
586628 /**
587629 * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
588630 * RDD has a known partitioner by only searching the partition that the key maps to.
@@ -786,6 +828,15 @@ object JavaPairRDD {
786828 .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
787829 }
788830
831+ private [spark]
832+ def cogroupResult3ToJava [K : ClassTag , V , W1 , W2 , W3 ](
833+ rdd : RDD [(K , (Iterable [V ], Iterable [W1 ], Iterable [W2 ], Iterable [W3 ]))])
834+ : RDD [(K , (JIterable [V ], JIterable [W1 ], JIterable [W2 ], JIterable [W3 ]))] = {
835+ rddToPairRDDFunctions(rdd)
836+ .mapValues(x =>
837+ (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4)))
838+ }
839+
789840 def fromRDD [K : ClassTag , V : ClassTag ](rdd : RDD [(K , V )]): JavaPairRDD [K , V ] = {
790841 new JavaPairRDD [K , V ](rdd)
791842 }
0 commit comments