From 1c8ba5a0d480f816a0c217618b40bb615474963d Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 19 Mar 2014 19:28:26 +0900 Subject: [PATCH 1/2] Add sort-merge cogroup/joins. --- .../BlockStoreSortMergeShuffleFetcher.scala | 109 ++++++ .../spark/SortMergeShuffleFetcher.scala | 36 ++ .../scala/org/apache/spark/SparkEnv.scala | 6 + .../spark/rdd/OrderedRDDFunctions.scala | 337 +++++++++++++++++- .../spark/rdd/SortMergeCoGroupedRDD.scala | 83 +++++ .../spark/rdd/OrderedRDDFunctionsSuite.scala | 184 ++++++++++ 6 files changed, 754 insertions(+), 1 deletion(-) create mode 100644 core/src/main/scala/org/apache/spark/BlockStoreSortMergeShuffleFetcher.scala create mode 100644 core/src/main/scala/org/apache/spark/SortMergeShuffleFetcher.scala create mode 100644 core/src/main/scala/org/apache/spark/rdd/SortMergeCoGroupedRDD.scala create mode 100644 core/src/test/scala/org/apache/spark/rdd/OrderedRDDFunctionsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/BlockStoreSortMergeShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/BlockStoreSortMergeShuffleFetcher.scala new file mode 100644 index 0000000000000..7a15f2d06e888 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/BlockStoreSortMergeShuffleFetcher.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.mutable + +import org.apache.spark.executor.ShuffleReadMetrics +import org.apache.spark.serializer.Serializer +import org.apache.spark.storage.BlockId +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.storage.ShuffleBlockId +import org.apache.spark.util.CompletionIterator + +private[spark] class BlockStoreSortMergeShuffleFetcher + extends SortMergeShuffleFetcher with Logging { + + def fetch[K: Ordering]( + shuffleId: Int, + reduceId: Int, + context: TaskContext, + serializer: Serializer) + : Iterator[Product2[K, Any]] = + { + + logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) + val blockManager = SparkEnv.get.blockManager + + val startTime = System.currentTimeMillis + val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) + logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format( + shuffleId, reduceId, System.currentTimeMillis - startTime)) + + val splitsByAddress = new mutable.HashMap[BlockManagerId, mutable.ArrayBuffer[(Int, Long)]] + for (((address, size), index) <- statuses.zipWithIndex) { + splitsByAddress.getOrElseUpdate(address, mutable.ArrayBuffer()) += ((index, size)) + } + + val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = splitsByAddress.toSeq.map { + case (address, splits) => + (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2))) + } + + def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])): Iterator[Product2[K, Any]] = { + val blockId = blockPair._1 + val blockOption = blockPair._2 + blockOption match { + case Some(block) => { + block.asInstanceOf[Iterator[Product2[K, Any]]] + } + case None => { + blockId match { + case ShuffleBlockId(shufId, mapId, _) => + val address = statuses(mapId.toInt)._1 + throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null) + case _ => + throw new SparkException( + "Failed to get block " + blockId + ", which is not a shuffle block") + } + } + } + } + + val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) + val itrs = blockFetcherItr.map(unpackBlock).toArray.map(_.buffered) + val itr = Iterator.continually { + ((None: Option[K]) /: itrs) { + case (opt, itr) if (itr.hasNext) => + opt.map(key => implicitly[Ordering[K]].min(key, itr.head._1)).orElse(Some(itr.head._1)) + case (opt, _) => opt + } + }.takeWhile(_.isDefined).map(_.get).flatMap { key => + itrs.flatMap { itr => + Iterator.continually { + if (itr.hasNext && implicitly[Ordering[K]].equiv(key, itr.head._1)) { + Some(itr.next) + } else None + }.takeWhile(_.isDefined).map(_.get) + } + } + + val completionIter = CompletionIterator[Product2[K, Any], Iterator[Product2[K, Any]]](itr, { + val shuffleMetrics = new ShuffleReadMetrics + shuffleMetrics.shuffleFinishTime = System.currentTimeMillis + shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime + shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead + shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks + shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks + shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks + context.taskMetrics.shuffleReadMetrics = Some(shuffleMetrics) + }) + + new InterruptibleIterator[Product2[K, Any]](context, completionIter) + } +} diff --git a/core/src/main/scala/org/apache/spark/SortMergeShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/SortMergeShuffleFetcher.scala new file mode 100644 index 0000000000000..80c7f3430554c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SortMergeShuffleFetcher.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import org.apache.spark.serializer.Serializer + +private[spark] abstract class SortMergeShuffleFetcher { + + /** + * Fetch the shuffle outputs for a given ShuffleDependency. + * @return An iterator over the elements of the fetched shuffle outputs. + */ + def fetch[K: Ordering]( + shuffleId: Int, + reduceId: Int, + context: TaskContext, + serializer: Serializer = SparkEnv.get.serializer): Iterator[Product2[K, Any]] + + /** Stop the fetcher */ + def stop() {} +} diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 5ceac28fe7afb..2a1b42870d06e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -49,6 +49,7 @@ class SparkEnv private[spark] ( val cacheManager: CacheManager, val mapOutputTracker: MapOutputTracker, val shuffleFetcher: ShuffleFetcher, + val sortMergeShuffleFetcher: SortMergeShuffleFetcher, val broadcastManager: BroadcastManager, val blockManager: BlockManager, val connectionManager: ConnectionManager, @@ -73,6 +74,7 @@ class SparkEnv private[spark] ( httpFileServer.stop() mapOutputTracker.stop() shuffleFetcher.stop() + sortMergeShuffleFetcher.stop() broadcastManager.stop() blockManager.stop() blockManager.master.stop() @@ -207,6 +209,9 @@ object SparkEnv extends Logging { val shuffleFetcher = instantiateClass[ShuffleFetcher]( "spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher") + val sortMergeShuffleFetcher = instantiateClass[SortMergeShuffleFetcher]( + "spark.sortmerge.shuffle.fetcher", "org.apache.spark.BlockStoreSortMergeShuffleFetcher") + val httpFileServer = new HttpFileServer(securityManager) httpFileServer.initialize() conf.set("spark.fileserver.uri", httpFileServer.serverUri) @@ -241,6 +246,7 @@ object SparkEnv extends Logging { cacheManager, mapOutputTracker, shuffleFetcher, + sortMergeShuffleFetcher, broadcastManager, blockManager, connectionManager, diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala index d5691f2267bfa..92b01e6e15f5a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala @@ -17,9 +17,14 @@ package org.apache.spark.rdd -import scala.reflect.ClassTag +import scala.reflect.{ClassTag,classTag} import org.apache.spark.{Logging, RangePartitioner} +import org.apache.spark.SparkContext._ +import org.apache.spark.SparkException +import org.apache.spark.HashPartitioner +import org.apache.spark.Partitioner +import org.apache.spark.Partitioner.defaultPartitioner /** * Extra functions available on RDDs of (key, value) pairs where the key is sortable through @@ -51,4 +56,334 @@ class OrderedRDDFunctions[K <% Ordered[K]: ClassTag, } }, preservesPartitioning = true) } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + */ + def mergeJoin[W]( + other: RDD[(K, W)], + partitioner: Partitioner, + ordered: Boolean = false): RDD[(K, (V, W))] = { + mergeCogroup(other, partitioner, ordered).flatMapValues { + case (vs, ws) => + for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + } + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeLeftOuterJoin[W]( + other: RDD[(K, W)], + partitioner: Partitioner, + ordered: Boolean = false): RDD[(K, (V, Option[W]))] = { + mergeCogroup(other, partitioner, ordered).flatMapValues { + case (vs, ws) => + if (ws.isEmpty) { + vs.iterator.map((_, None)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) + } + } + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeRightOuterJoin[W]( + other: RDD[(K, W)], + partitioner: Partitioner, + ordered: Boolean = false): RDD[(K, (Option[V], W))] = { + mergeCogroup(other, partitioner, ordered).flatMapValues { + case (vs, ws) => + if (vs.isEmpty) { + ws.iterator.map((None, _)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) + } + } + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + mergeJoin(other, false) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W](other: RDD[(K, W)], ordered: Boolean): RDD[(K, (V, W))] = { + mergeJoin(other, defaultPartitioner(self, other), ordered) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { + mergeJoin(other, numPartitions, false) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W](other: RDD[(K, W)], numPartitions: Int, ordered: Boolean): RDD[(K, (V, W))] = { + mergeJoin(other, new HashPartitioner(numPartitions), ordered) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the existing partitioner/parallelism level. + */ + def mergeLeftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + mergeLeftOuterJoin(other, false) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the existing partitioner/parallelism level. + */ + def mergeLeftOuterJoin[W](other: RDD[(K, W)], ordered: Boolean): RDD[(K, (V, Option[W]))] = { + mergeLeftOuterJoin(other, defaultPartitioner(self, other), ordered) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numPartitions` partitions. + */ + def mergeLeftOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (V, Option[W]))] = { + mergeLeftOuterJoin(other, numPartitions, false) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numPartitions` partitions. + */ + def mergeLeftOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int, + ordered: Boolean): RDD[(K, (V, Option[W]))] = { + mergeLeftOuterJoin(other, new HashPartitioner(numPartitions), ordered) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def mergeRightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + mergeRightOuterJoin(other, false) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def mergeRightOuterJoin[W](other: RDD[(K, W)], ordered: Boolean): RDD[(K, (Option[V], W))] = { + mergeRightOuterJoin(other, defaultPartitioner(self, other), ordered) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def mergeRightOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (Option[V], W))] = { + mergeRightOuterJoin(other, numPartitions, false) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def mergeRightOuterJoin[W]( + other: RDD[(K, W)], + numPartitions: Int, + ordered: Boolean): RDD[(K, (Option[V], W))] = { + mergeRightOuterJoin(other, new HashPartitioner(numPartitions), ordered) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + other: RDD[(K, W)], + partitioner: Partitioner, + ordered: Boolean = false): RDD[(K, (Seq[V], Seq[W]))] = { + if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { + throw new SparkException("Default partitioner cannot partition array keys.") + } + val cg = new SortMergeCoGroupedRDD( + Seq( + if (ordered) { + self + } else { + self.mapPartitions(_.toArray.sortBy(_._1).iterator, preservesPartitioning = true) + }, + if (ordered) { + other + } else { + other.mapPartitions(_.toArray.sortBy(_._1).iterator, preservesPartitioning = true) + }), + partitioner) + cg.mapValues { + case Seq(vs, ws) => + (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) + } + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: RDD[(K, W1)], + other2: RDD[(K, W2)], + partitioner: Partitioner, + ordered: Boolean = false): RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { + throw new SparkException("Default partitioner cannot partition array keys.") + } + val cg = new SortMergeCoGroupedRDD( + Seq( + if (ordered) { + self + } else { + self.mapPartitions(_.toArray.sortBy(_._1).iterator, preservesPartitioning = true) + }, + if (ordered) { + other1 + } else { + other1.mapPartitions(_.toArray.sortBy(_._1).iterator, preservesPartitioning = true) + }, + if (ordered) { + other2 + } else { + other2.mapPartitions(_.toArray.sortBy(_._1).iterator, preservesPartitioning = true) + }), + partitioner) + cg.mapValues { + case Seq(vs, w1s, w2s) => + (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) + } + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + mergeCogroup(other, false) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W](other: RDD[(K, W)], ordered: Boolean): RDD[(K, (Seq[V], Seq[W]))] = { + mergeCogroup(other, defaultPartitioner(self, other), ordered) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: RDD[(K, W1)], + other2: RDD[(K, W2)]): RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + mergeCogroup(other1, other2, false) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: RDD[(K, W1)], + other2: RDD[(K, W2)], + ordered: Boolean): RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + mergeCogroup(other1, other2, defaultPartitioner(self, other1, other2), ordered) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + other: RDD[(K, W)], + numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + mergeCogroup(other, numPartitions, false) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + other: RDD[(K, W)], + numPartitions: Int, + ordered: Boolean): RDD[(K, (Seq[V], Seq[W]))] = { + mergeCogroup(other, new HashPartitioner(numPartitions), ordered) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: RDD[(K, W1)], + other2: RDD[(K, W2)], + numPartitions: Int): RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + mergeCogroup(other1, other2, numPartitions, false) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: RDD[(K, W1)], + other2: RDD[(K, W2)], + numPartitions: Int, + ordered: Boolean): RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + mergeCogroup(other1, other2, new HashPartitioner(numPartitions), ordered) + } + + private def getKeyClass() = classTag[K].runtimeClass } diff --git a/core/src/main/scala/org/apache/spark/rdd/SortMergeCoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SortMergeCoGroupedRDD.scala new file mode 100644 index 0000000000000..32c972fb9ed01 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/SortMergeCoGroupedRDD.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable + +import org.apache.spark.InterruptibleIterator +import org.apache.spark.Partition +import org.apache.spark.Partitioner +import org.apache.spark.SparkEnv +import org.apache.spark.TaskContext +import org.apache.spark.serializer.Serializer + +class SortMergeCoGroupedRDD[K: Ordering]( + @transient rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) + extends CoGroupedRDD(rdds, part) { + + private type CoGroup = mutable.ArrayBuffer[Any] + private type CoGroupCombiner = Seq[CoGroup] + + private var serializer: Serializer = null + + override def setSerializer(serializer: Serializer): CoGroupedRDD[K] = { + super.setSerializer(serializer) + this.serializer = serializer + this + } + + override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = { + + val split = s.asInstanceOf[CoGroupPartition] + + val itrs = for (dep <- split.deps) yield { + dep match { + case NarrowCoGroupSplitDep(rdd, _, itsSplit) => { + // Read them from the parent + rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]] + } + case ShuffleCoGroupSplitDep(shuffleId) => { + // Read map outputs of shuffle + val fetcher = SparkEnv.get.sortMergeShuffleFetcher + val ser = Serializer.getSerializer(serializer) + fetcher.fetch(shuffleId, split.index, context, ser) + } + } + }.buffered + + new InterruptibleIterator( + context, + Iterator.continually { + ((None: Option[K]) /: itrs) { + case (opt, itr) if itr.hasNext => + opt.map { key => + implicitly[Ordering[K]].min(key, itr.head._1) + }.orElse(Some(itr.head._1)) + case (opt, _) => opt + } + }.takeWhile(_.isDefined).map(_.get).map { key => + (key -> itrs.map { itr => + Iterator.continually { + if (itr.hasNext && implicitly[Ordering[K]].equiv(itr.head._1, key)) { + Some(itr.next._2) + } else None + }.takeWhile(_.isDefined).map(_.get).to[mutable.ArrayBuffer] + }) + }) + } +} diff --git a/core/src/test/scala/org/apache/spark/rdd/OrderedRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/OrderedRDDFunctionsSuite.scala new file mode 100644 index 0000000000000..0a87ae99568f9 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/OrderedRDDFunctionsSuite.scala @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet +import scala.util.Random + +import org.scalatest.FunSuite +import com.google.common.io.Files +import org.apache.hadoop.mapreduce._ +import org.apache.hadoop.conf.{Configuration, Configurable} + +import org.apache.spark.SparkContext._ +import org.apache.spark.{Partitioner, SharedSparkContext} + +class OrderedRDDFunctionsSuite extends FunSuite with SharedSparkContext { + test("mergeJoin") { + val rdd1 = sc.parallelize(Array((3, 1), (1, 2), (2, 1), (1, 1))) + val rdd2 = sc.parallelize(Array((2, 'z'), (1, 'x'), (4, 'w'), (2, 'y'))) + val joined = rdd1.mergeJoin(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSeq === Seq( + (1, (2, 'x')), + (1, (1, 'x')), + (2, (1, 'z')), + (2, (1, 'y')) + )) + } + + test("mergeJoin pre-ordered") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.mergeJoin(rdd2, true).collect() + assert(joined.size === 4) + assert(joined.toSeq === Seq( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("mergeJoin all-to-all") { + val rdd1 = sc.parallelize(Array((1, 2), (1, 1), (1, 3))) + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) + val joined = rdd1.mergeJoin(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSeq === Seq( + (1, (2, 'x')), + (1, (2, 'y')), + (1, (1, 'x')), + (1, (1, 'y')), + (1, (3, 'x')), + (1, (3, 'y')) + )) + } + + test("mergeJoin all-to-all pre-ordered") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))) + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) + val joined = rdd1.mergeJoin(rdd2, true).collect() + assert(joined.size === 6) + assert(joined.toSeq === Seq( + (1, (1, 'x')), + (1, (1, 'y')), + (1, (2, 'x')), + (1, (2, 'y')), + (1, (3, 'x')), + (1, (3, 'y')) + )) + } + + test("mergeLeftOuterJoin") { + val rdd1 = sc.parallelize(Array((3, 1), (1, 2), (2, 1), (1, 1))) + val rdd2 = sc.parallelize(Array((2, 'z'), (1, 'x'), (4, 'w'), (2, 'y'))) + val joined = rdd1.mergeLeftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSeq === Seq( + (1, (2, Some('x'))), + (1, (1, Some('x'))), + (2, (1, Some('z'))), + (2, (1, Some('y'))), + (3, (1, None)) + )) + } + + test("mergeLeftOuterJoin pre-ordered") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.mergeLeftOuterJoin(rdd2, true).collect() + assert(joined.size === 5) + assert(joined.toSeq === Seq( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("mergeRightOuterJoin") { + val rdd1 = sc.parallelize(Array((3, 1), (1, 2), (2, 1), (1, 1))) + val rdd2 = sc.parallelize(Array((2, 'z'), (1, 'x'), (4, 'w'), (2, 'y'))) + val joined = rdd1.mergeRightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSeq === Seq( + (1, (Some(2), 'x')), + (1, (Some(1), 'x')), + (2, (Some(1), 'z')), + (2, (Some(1), 'y')), + (4, (None, 'w')) + )) + } + + test("mergeRightOuterJoin pre-ordered") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.mergeRightOuterJoin(rdd2, true).collect() + assert(joined.size === 5) + assert(joined.toSeq === Seq( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + test("mergeJoin with no matches") { + val rdd1 = sc.parallelize(Array((3, 1), (1, 2), (2, 1), (1, 1))) + val rdd2 = sc.parallelize(Array((5, 'z'), (4, 'x'), (6, 'w'), (5, 'y'))) + val joined = rdd1.mergeJoin(rdd2).collect() + assert(joined.size === 0) + } + + test("mergeJoin with no matches pre-ordered") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) + val joined = rdd1.mergeJoin(rdd2, true).collect() + assert(joined.size === 0) + } + + test("mergeJoin with many output partitions") { + val rdd1 = sc.parallelize(Array((3, 1), (1, 2), (2, 1), (1, 1))) + val rdd2 = sc.parallelize(Array((2, 'z'), (1, 'x'), (4, 'w'), (2, 'y'))) + val joined = rdd1.mergeJoin(rdd2, 10).collect() + assert(joined.size === 4) + assert(joined.toSeq === Seq( + (1, (2, 'x')), + (1, (1, 'x')), + (2, (1, 'z')), + (2, (1, 'y')) + )) + } + + test("mergeJoin with many output partitions pre-ordered") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.mergeJoin(rdd2, 10, true).collect() + assert(joined.size === 4) + assert(joined.toSeq === Seq( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } +} From 99751661fcc7632a0f82816bbaca07bf822d3663 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 25 Mar 2014 19:15:09 +0900 Subject: [PATCH 2/2] Add Java APIs for sort-merge cogroup/joins. --- .../apache/spark/api/java/JavaPairRDD.scala | 803 ++++++++++++++++++ .../java/org/apache/spark/JavaAPISuite.java | 49 ++ 2 files changed, 852 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 9596dbaf75488..d3af41d9a5843 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -640,6 +640,809 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending, numPartitions)) } + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + */ + def mergeJoin[W]( + other: JavaPairRDD[K, W], + partitioner: Partitioner): JavaPairRDD[K, (V, W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeJoin(comp, other, partitioner, false) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + */ + def mergeJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + partitioner: Partitioner): JavaPairRDD[K, (V, W)] = { + mergeJoin(comp, other, partitioner, false) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + */ + def mergeJoin[W]( + other: JavaPairRDD[K, W], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (V, W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeJoin(comp, other, partitioner, ordered) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. + */ + def mergeJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (V, W)] = { + implicit val c = comp + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeJoin(other.rdd, partitioner, ordered)) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeLeftOuterJoin[W]( + other: JavaPairRDD[K, W], + partitioner: Partitioner): JavaPairRDD[K, (V, Optional[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeLeftOuterJoin(comp, other, partitioner, false) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeLeftOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + partitioner: Partitioner): JavaPairRDD[K, (V, Optional[W])] = { + mergeLeftOuterJoin(comp, other, partitioner, false) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeLeftOuterJoin[W]( + other: JavaPairRDD[K, W], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (V, Optional[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeLeftOuterJoin(comp, other, partitioner, ordered) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeLeftOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (V, Optional[W])] = { + implicit val c = comp + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeLeftOuterJoin( + other.rdd, + partitioner, + ordered).mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeRightOuterJoin[W]( + other: JavaPairRDD[K, W], + partitioner: Partitioner): JavaPairRDD[K, (Optional[V], W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeRightOuterJoin(comp, other, partitioner, false) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeRightOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + partitioner: Partitioner): JavaPairRDD[K, (Optional[V], W)] = { + mergeRightOuterJoin(comp, other, partitioner, false) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeRightOuterJoin[W]( + other: JavaPairRDD[K, W], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (Optional[V], W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeRightOuterJoin(comp, other, partitioner, ordered) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def mergeRightOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (Optional[V], W)] = { + implicit val c = comp + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeRightOuterJoin( + other.rdd, + partitioner, + ordered).mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeJoin(comp, other, false) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W](comp: Comparator[K], other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = { + mergeJoin(comp, other, false) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W]( + other: JavaPairRDD[K, W], + ordered: Boolean): JavaPairRDD[K, (V, W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeJoin(comp, other, ordered) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + ordered: Boolean): JavaPairRDD[K, (V, W)] = { + implicit val c = comp + fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeJoin(other.rdd, ordered)) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W]( + other: JavaPairRDD[K, W], + numPartitions: Int): JavaPairRDD[K, (V, W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeJoin(comp, other, numPartitions, false) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + numPartitions: Int): JavaPairRDD[K, (V, W)] = { + mergeJoin(comp, other, numPartitions, false) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W]( + other: JavaPairRDD[K, W], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (V, W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeJoin(comp, other, numPartitions, ordered) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a merge join across the cluster. + */ + def mergeJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (V, W)] = { + implicit val c = comp + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeJoin( + other.rdd, + numPartitions, + ordered)) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the existing partitioner/parallelism level. + */ + def mergeLeftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeLeftOuterJoin(comp, other, false) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the existing partitioner/parallelism level. + */ + def mergeLeftOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = { + mergeLeftOuterJoin(comp, other, false) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the existing partitioner/parallelism level. + */ + def mergeLeftOuterJoin[W]( + other: JavaPairRDD[K, W], + ordered: Boolean): JavaPairRDD[K, (V, Optional[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeLeftOuterJoin(comp, other, ordered) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the existing partitioner/parallelism level. + */ + def mergeLeftOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + ordered: Boolean): JavaPairRDD[K, (V, Optional[W])] = { + implicit val c = comp + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeLeftOuterJoin( + other.rdd, + ordered).mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numPartitions` partitions. + */ + def mergeLeftOuterJoin[W]( + other: JavaPairRDD[K, W], + numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeLeftOuterJoin(comp, other, numPartitions, false) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numPartitions` partitions. + */ + def mergeLeftOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = { + mergeLeftOuterJoin(comp, other, numPartitions, false) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numPartitions` partitions. + */ + def mergeLeftOuterJoin[W]( + other: JavaPairRDD[K, W], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (V, Optional[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeLeftOuterJoin(comp, other, numPartitions, ordered) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numPartitions` partitions. + */ + def mergeLeftOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (V, Optional[W])] = { + implicit val c = comp + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeLeftOuterJoin( + other.rdd, + numPartitions, + ordered).mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))}) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def mergeRightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeRightOuterJoin(comp, other, false) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def mergeRightOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = { + mergeRightOuterJoin(comp, other, false) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def mergeRightOuterJoin[W]( + other: JavaPairRDD[K, W], + ordered: Boolean): JavaPairRDD[K, (Optional[V], W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeRightOuterJoin(comp, other, ordered) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the existing partitioner/parallelism level. + */ + def mergeRightOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + ordered: Boolean): JavaPairRDD[K, (Optional[V], W)] = { + implicit val c = comp + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeRightOuterJoin( + other.rdd, + ordered).mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def mergeRightOuterJoin[W]( + other: JavaPairRDD[K, W], + numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeRightOuterJoin(comp, other, numPartitions, false) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def mergeRightOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = { + mergeRightOuterJoin(comp, other, numPartitions, false) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def mergeRightOuterJoin[W]( + other: JavaPairRDD[K, W], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (Optional[V], W)] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeRightOuterJoin(comp, other, numPartitions, ordered) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def mergeRightOuterJoin[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (Optional[V], W)] = { + implicit val c = comp + fromRDD( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeRightOuterJoin( + other.rdd, + numPartitions, + ordered).mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)}) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + other: JavaPairRDD[K, W], + partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other, partitioner, false) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W])] = { + mergeCogroup(comp, other, partitioner, false) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + other: JavaPairRDD[K, W], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other, partitioner, ordered) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val c = comp + fromRDD(cogroupResultToJava( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeCogroup( + other.rdd, + partitioner, + ordered))) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other1, other2, partitioner, false) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + comp: Comparator[K], + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + mergeCogroup(comp, other1, other2, partitioner, false) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other1, other2, partitioner, ordered) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + comp: Comparator[K], + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + partitioner: Partitioner, + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val c = comp + fromRDD(cogroupResult2ToJava( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeCogroup( + other1.rdd, + other2.rdd, + partitioner, + ordered))) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other, false) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = { + mergeCogroup(comp, other, false) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + other: JavaPairRDD[K, W], + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other, ordered) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val c = comp + fromRDD(cogroupResultToJava( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeCogroup( + other.rdd, + ordered))) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2]): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other1, other2, false) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + comp: Comparator[K], + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2]): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + mergeCogroup(comp, other1, other2, false) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other1, other2, ordered) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + comp: Comparator[K], + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val c = comp + fromRDD(cogroupResult2ToJava( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeCogroup( + other1.rdd, + other2.rdd, + ordered))) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + other: JavaPairRDD[K, W], + numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other, numPartitions, false) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W])] = { + mergeCogroup(comp, other, numPartitions, false) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + other: JavaPairRDD[K, W], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other, numPartitions, ordered) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def mergeCogroup[W]( + comp: Comparator[K], + other: JavaPairRDD[K, W], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W])] = { + implicit val c = comp + fromRDD(cogroupResultToJava( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeCogroup( + other.rdd, + numPartitions, + ordered))) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other1, other2, numPartitions, false) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + comp: Comparator[K], + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + numPartitions: Int): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + mergeCogroup(comp, other1, other2, numPartitions, false) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] + mergeCogroup(comp, other1, other2, numPartitions, ordered) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def mergeCogroup[W1, W2]( + comp: Comparator[K], + other1: JavaPairRDD[K, W1], + other2: JavaPairRDD[K, W2], + numPartitions: Int, + ordered: Boolean): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = { + implicit val c = comp + fromRDD(cogroupResult2ToJava( + new OrderedRDDFunctions[K, V, (K, V)](rdd).mergeCogroup( + other1.rdd, + other2.rdd, + numPartitions, + ordered))) + } + /** * Return an RDD with the keys of each tuple. */ diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index c6b65c7348ae0..8a2bfe0926f0a 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1025,4 +1025,53 @@ public Tuple2 call(Integer x) throws Exception { pairRDD.collect(); // Works fine pairRDD.collectAsMap(); // Used to crash with ClassCastException } + + @SuppressWarnings("unchecked") + @Test + public void mergeCogroup() { + JavaPairRDD categories = sc.parallelizePairs(Arrays.asList( + new Tuple2("Apples", "Fruit"), + new Tuple2("Oranges", "Fruit"), + new Tuple2("Oranges", "Citrus") + )); + JavaPairRDD prices = sc.parallelizePairs(Arrays.asList( + new Tuple2("Oranges", 2), + new Tuple2("Apples", 3) + )); + JavaPairRDD, List>> cogrouped = categories.mergeCogroup(prices); + Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString()); + Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString()); + + cogrouped.collect(); + } + + @SuppressWarnings("unchecked") + @Test + public void mergeLeftOuterJoin() { + JavaPairRDD rdd1 = sc.parallelizePairs(Arrays.asList( + new Tuple2(1, 1), + new Tuple2(1, 2), + new Tuple2(2, 1), + new Tuple2(3, 1) + )); + JavaPairRDD rdd2 = sc.parallelizePairs(Arrays.asList( + new Tuple2(1, 'x'), + new Tuple2(2, 'y'), + new Tuple2(2, 'z'), + new Tuple2(4, 'w') + )); + List>>> joined = + rdd1.mergeLeftOuterJoin(rdd2).collect(); + Assert.assertEquals(5, joined.size()); + Tuple2>> firstUnmatched = + rdd1.mergeLeftOuterJoin(rdd2).filter( + new Function>>, Boolean>() { + @Override + public Boolean call(Tuple2>> tup) + throws Exception { + return !tup._2()._2().isPresent(); + } + }).first(); + Assert.assertEquals(3, firstUnmatched._1().intValue()); + } }