Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object Bagel extends Logging {
val startTime = System.currentTimeMillis

val aggregated = agg(verts, aggregator)
val combinedMsgs = msgs.combineByKey(
val combinedMsgs = msgs.combineByKeyWithClassTag(
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark

import scala.reflect.ClassTag

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
Expand Down Expand Up @@ -65,7 +67,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this is a binary-incompatible change to a DeveloperAPI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the source for DeveloperApi, a Developer API is unstable and can change between minor releases.

@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
Expand All @@ -76,6 +78,13 @@ class ShuffleDependency[K, V, C](

override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)

val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
mapSideCombine: Boolean,
serializer: Serializer): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
fromRDD(rdd.combineByKey(
fromRDD(rdd.combineByKeyWithClassTag(
createCombiner,
mergeValue,
mergeCombiners,
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.language.existentials
import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -74,7 +75,9 @@ private[spark] class CoGroupPartition(
* @param part partitioner used to partition the shuffle output
*/
@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
class CoGroupedRDD[K: ClassTag](
@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
part: Partitioner)
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {

// For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
Expand Down
90 changes: 76 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
with SparkHadoopMapReduceUtil
with Serializable
{

/**
* :: Experimental ::
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
* Note that V and C can be different -- for example, one might group an RDD of type
Expand All @@ -70,12 +72,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
def combineByKey[C](createCombiner: V => C,
@Experimental
def combineByKeyWithClassTag[C](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, why call this something else? Does it not compile if you just called this combineByKey as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because PairRDDFunctions is a stable API, we can't change the method signature of combineByKey. Adding the ClassTag, would add an implicit argument. If we leave the old combineByKey methods and add new combineByKey methods with ClassTags, then we get compiler errors being unable to resolve the combineByKey symbol.

If you know as way of doing this more cleanly, I would be happy to make that change.

createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
Expand Down Expand Up @@ -103,13 +107,50 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* Simplified version of combineByKey that hash-partitions the output RDD.
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](createCombiner: V => C,
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}

/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
* This method is here for backward compatibility. It does not provide combiner
* classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope {
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}

/**
* :: Experimental ::
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
*/
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
new HashPartitioner(numPartitions))
}

/**
Expand All @@ -133,7 +174,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}

/**
Expand Down Expand Up @@ -182,7 +224,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

val cleanedFunc = self.context.clean(func)
combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
}

/**
Expand Down Expand Up @@ -268,7 +311,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKey[V]((v: V) => v, func, func, partitioner)
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

/**
Expand Down Expand Up @@ -392,7 +435,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
h1
}

combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
combineByKeyWithClassTag(createHLL, mergeValueHLL, mergeHLL, partitioner)
.mapValues(_.cardinality())
}

/**
Expand Down Expand Up @@ -466,7 +510,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
Expand Down Expand Up @@ -565,12 +609,30 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level. This method is here for backward compatibility. It
* does not provide combiner classtag information to the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

/**
* :: Experimental ::
* Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
: RDD[(K, C)] = self.withScope {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}

/**
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
Expand All @@ -37,7 +39,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
*/
// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs
@DeveloperApi
class ShuffledRDD[K, V, C](
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,17 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
}

override def getDependencies: Seq[Dependency[_]] = {
Seq(rdd1, rdd2).map { rdd =>
def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
: Dependency[_] = {
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency(rdd, part, serializer)
new ShuffleDependency[T1, T2, Any](rdd, part, serializer)
}
}
Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
}

override def getPartitions: Array[Partition] = {
Expand Down Expand Up @@ -105,7 +107,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
seq
}
}
def integrate(depNum: Int, op: Product2[K, V] => Unit) = {
def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
dependencies(depNum) match {
case oneToOneDependency: OneToOneDependency[_] =>
val dependencyPartition = partition.narrowDeps(depNum).get.split
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int,
object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
def cogroup[K: ClassTag, V: ClassTag](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
: RDD[(K, Array[Iterable[V]])] = {
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle

import org.apache.spark._

case class KeyClass()

case class ValueClass()

case class CombinerClass()

class ShuffleDependencySuite extends SparkFunSuite with LocalSparkContext {

val conf = new SparkConf(loadDefaults = false)

test("key, value, and combiner classes correct in shuffle dependency without aggregation") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 5, 4)
.map(key => (KeyClass(), ValueClass()))
.groupByKey()
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
assert(!dep.mapSideCombine, "Test requires that no map-side aggregator is defined")
assert(dep.keyClassName == classOf[KeyClass].getName)
assert(dep.valueClassName == classOf[ValueClass].getName)
}

test("key, value, and combiner classes available in shuffle dependency with aggregation") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 5, 4)
.map(key => (KeyClass(), ValueClass()))
.aggregateByKey(CombinerClass())({ case (a, b) => a }, { case (a, b) => a })
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
assert(dep.mapSideCombine && dep.aggregator.isDefined, "Test requires map-side aggregation")
assert(dep.keyClassName == classOf[KeyClass].getName)
assert(dep.valueClassName == classOf[ValueClass].getName)
assert(dep.combinerClassName == Some(classOf[CombinerClass].getName))
}

test("combineByKey null combiner class tag handled correctly") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 5, 4)
.map(key => (KeyClass(), ValueClass()))
.combineByKey((v: ValueClass) => v,
(c: AnyRef, v: ValueClass) => c,
(c1: AnyRef, c2: AnyRef) => c1)
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
assert(dep.keyClassName == classOf[KeyClass].getName)
assert(dep.valueClassName == classOf[ValueClass].getName)
assert(dep.combinerClassName == None)
}

}