Skip to content

Commit ed1afac

Browse files
committed
[SPARK-9043] Serialize key, value and combiner classes in ShuffleDependency
ShuffleManager implementations are currently not given type information for the key, value and combiner classes. Serialization of shuffle objects relies on objects being JavaSerializable, with methods defined for reading/writing the object or, alternatively, serialization via Kryo which uses reflection. Serialization systems like Avro, Thrift and Protobuf generate classes with zero argument constructors and explicit schema information (e.g. IndexedRecords in Avro have get, put and getSchema methods). By serializing the key, value and combiner class names in ShuffleDependency, shuffle implementations will have access to schema information when registerShuffle() is called.
1 parent 2ddeb63 commit ed1afac

File tree

9 files changed

+164
-23
lines changed

9 files changed

+164
-23
lines changed

bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ object Bagel extends Logging {
7878
val startTime = System.currentTimeMillis
7979

8080
val aggregated = agg(verts, aggregator)
81-
val combinedMsgs = msgs.combineByKey(
81+
val combinedMsgs = msgs.combineByKeyWithClassTag(
8282
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
8383
val grouped = combinedMsgs.groupWith(verts)
8484
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure

core/src/main/scala/org/apache/spark/Dependency.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark
1919

20+
import scala.reflect.ClassTag
21+
2022
import org.apache.spark.annotation.DeveloperApi
2123
import org.apache.spark.rdd.RDD
2224
import org.apache.spark.serializer.Serializer
@@ -65,7 +67,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
6567
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
6668
*/
6769
@DeveloperApi
68-
class ShuffleDependency[K, V, C](
70+
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
6971
@transient private val _rdd: RDD[_ <: Product2[K, V]],
7072
val partitioner: Partitioner,
7173
val serializer: Option[Serializer] = None,
@@ -76,6 +78,15 @@ class ShuffleDependency[K, V, C](
7678

7779
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
7880

81+
/**
82+
* The key, value and combiner classes are serialized so that shuffle manager
83+
* implementation can use the information to build
84+
*/
85+
val keyClassName: String = reflect.classTag[K].runtimeClass.getName
86+
val valueClassName: String = reflect.classTag[V].runtimeClass.getName
87+
// Note: It's possible that the combiner class tag is null, if the combineByKey
88+
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
89+
val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName)
7990
val shuffleId: Int = _rdd.context.newShuffleId()
8091

8192
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
239239
mapSideCombine: Boolean,
240240
serializer: Serializer): JavaPairRDD[K, C] = {
241241
implicit val ctag: ClassTag[C] = fakeClassTag
242-
fromRDD(rdd.combineByKey(
242+
fromRDD(rdd.combineByKeyWithClassTag(
243243
createCombiner,
244244
mergeValue,
245245
mergeCombiners,

core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.language.existentials
2222
import java.io.{IOException, ObjectOutputStream}
2323

2424
import scala.collection.mutable.ArrayBuffer
25+
import scala.reflect.ClassTag
2526

2627
import org.apache.spark._
2728
import org.apache.spark.annotation.DeveloperApi
@@ -74,7 +75,9 @@ private[spark] class CoGroupPartition(
7475
* @param part partitioner used to partition the shuffle output
7576
*/
7677
@DeveloperApi
77-
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
78+
class CoGroupedRDD[K: ClassTag](
79+
@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
80+
part: Partitioner)
7881
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
7982

8083
// For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).

core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
5757
with SparkHadoopMapReduceUtil
5858
with Serializable
5959
{
60+
6061
/**
6162
* Generic function to combine the elements for each key using a custom set of aggregation
6263
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
@@ -70,12 +71,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
7071
* In addition, users can control the partitioning of the output RDD, and whether to perform
7172
* map-side aggregation (if a mapper can produce multiple items with the same key).
7273
*/
73-
def combineByKey[C](createCombiner: V => C,
74+
def combineByKeyWithClassTag[C](
75+
createCombiner: V => C,
7476
mergeValue: (C, V) => C,
7577
mergeCombiners: (C, C) => C,
7678
partitioner: Partitioner,
7779
mapSideCombine: Boolean = true,
78-
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
80+
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
7981
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
8082
if (keyClass.isArray) {
8183
if (mapSideCombine) {
@@ -103,13 +105,48 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
103105
}
104106

105107
/**
106-
* Simplified version of combineByKey that hash-partitions the output RDD.
108+
* This method is here for backward compatibility. It
109+
* does not provide combiner classtag information to
110+
* the shuffle.
111+
*
112+
* @see [[combineByKeyWithClassTag]]
113+
*/
114+
def combineByKey[C](
115+
createCombiner: V => C,
116+
mergeValue: (C, V) => C,
117+
mergeCombiners: (C, C) => C,
118+
partitioner: Partitioner,
119+
mapSideCombine: Boolean = true,
120+
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
121+
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
122+
partitioner, mapSideCombine, serializer)(null)
123+
}
124+
125+
/**
126+
* This method is here for backward compatibility. It
127+
* does not provide combiner classtag information to
128+
* the shuffle.
129+
*
130+
* @see [[combineByKeyWithClassTag]]
107131
*/
108-
def combineByKey[C](createCombiner: V => C,
132+
def combineByKey[C](
133+
createCombiner: V => C,
109134
mergeValue: (C, V) => C,
110135
mergeCombiners: (C, C) => C,
111136
numPartitions: Int): RDD[(K, C)] = self.withScope {
112-
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
137+
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
138+
}
139+
140+
/**
141+
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
142+
*/
143+
def combineByKeyWithClassTag[C](
144+
createCombiner: V => C,
145+
mergeValue: (C, V) => C,
146+
mergeCombiners: (C, C) => C,
147+
numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
148+
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
149+
new HashPartitioner(numPartitions))
113150
}
114151

115152
/**
@@ -133,7 +170,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
133170

134171
// We will clean the combiner closure later in `combineByKey`
135172
val cleanedSeqOp = self.context.clean(seqOp)
136-
combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
173+
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
174+
cleanedSeqOp, combOp, partitioner)
137175
}
138176

139177
/**
@@ -182,7 +220,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
182220
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
183221

184222
val cleanedFunc = self.context.clean(func)
185-
combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
223+
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
224+
cleanedFunc, cleanedFunc, partitioner)
186225
}
187226

188227
/**
@@ -268,7 +307,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
268307
* "combiner" in MapReduce.
269308
*/
270309
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
271-
combineByKey[V]((v: V) => v, func, func, partitioner)
310+
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
272311
}
273312

274313
/**
@@ -392,7 +431,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
392431
h1
393432
}
394433

395-
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
434+
combineByKeyWithClassTag(createHLL, mergeValueHLL, mergeHLL, partitioner)
435+
.mapValues(_.cardinality())
396436
}
397437

398438
/**
@@ -466,7 +506,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
466506
val createCombiner = (v: V) => CompactBuffer(v)
467507
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
468508
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
469-
val bufs = combineByKey[CompactBuffer[V]](
509+
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
470510
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
471511
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
472512
}
@@ -565,12 +605,28 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
565605
}
566606

567607
/**
568-
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
608+
* This method is here for backward compatibility. It
609+
* does not provide combiner classtag information to
610+
* the shuffle.
611+
*
612+
* @see [[combineByKeyWithClassTag]]
613+
*/
614+
def combineByKey[C](
615+
createCombiner: V => C,
616+
mergeValue: (C, V) => C,
617+
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
618+
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
619+
}
620+
621+
/**
622+
* Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
569623
* existing partitioner/parallelism level.
570624
*/
571-
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
572-
: RDD[(K, C)] = self.withScope {
573-
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
625+
def combineByKeyWithClassTag[C](
626+
createCombiner: V => C,
627+
mergeValue: (C, V) => C,
628+
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
629+
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
574630
}
575631

576632
/**

core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.rdd
1919

20+
import scala.reflect.ClassTag
21+
2022
import org.apache.spark._
2123
import org.apache.spark.annotation.DeveloperApi
2224
import org.apache.spark.serializer.Serializer
@@ -37,7 +39,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
3739
*/
3840
// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs
3941
@DeveloperApi
40-
class ShuffledRDD[K, V, C](
42+
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
4143
@transient var prev: RDD[_ <: Product2[K, V]],
4244
part: Partitioner)
4345
extends RDD[(K, C)](prev.context, Nil) {

core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,17 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
6363
}
6464

6565
override def getDependencies: Seq[Dependency[_]] = {
66-
Seq(rdd1, rdd2).map { rdd =>
66+
def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
67+
: Dependency[_] = {
6768
if (rdd.partitioner == Some(part)) {
6869
logDebug("Adding one-to-one dependency with " + rdd)
6970
new OneToOneDependency(rdd)
7071
} else {
7172
logDebug("Adding shuffle dependency with " + rdd)
72-
new ShuffleDependency(rdd, part, serializer)
73+
new ShuffleDependency[T1, T2, Any](rdd, part, serializer)
7374
}
7475
}
76+
Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
7577
}
7678

7779
override def getPartitions: Array[Partition] = {
@@ -105,7 +107,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
105107
seq
106108
}
107109
}
108-
def integrate(depNum: Int, op: Product2[K, V] => Unit) = {
110+
def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
109111
dependencies(depNum) match {
110112
case oneToOneDependency: OneToOneDependency[_] =>
111113
val dependencyPartition = partition.narrowDeps(depNum).get.split

core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -483,7 +483,7 @@ class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int,
483483
object CheckpointSuite {
484484
// This is a custom cogroup function that does not use mapValues like
485485
// the PairRDDFunctions.cogroup()
486-
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
486+
def cogroup[K: ClassTag, V: ClassTag](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
487487
: RDD[(K, Array[Iterable[V]])] = {
488488
new CoGroupedRDD[K](
489489
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.shuffle
18+
19+
import org.apache.spark._
20+
21+
case class KeyClass()
22+
23+
case class ValueClass()
24+
25+
case class CombinerClass()
26+
27+
class ShuffleDependencySuite extends SparkFunSuite with LocalSparkContext {
28+
29+
val conf = new SparkConf(loadDefaults = false)
30+
31+
test("key, value, and combiner classes correct in shuffle dependency without aggregation") {
32+
sc = new SparkContext("local", "test", conf.clone())
33+
val rdd = sc.parallelize(1 to 5, 4)
34+
.map(key => (KeyClass(), ValueClass()))
35+
.groupByKey()
36+
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
37+
assert(!dep.mapSideCombine, "Test requires that no map-side aggregator is defined")
38+
assert(dep.keyClassName == classOf[KeyClass].getName)
39+
assert(dep.valueClassName == classOf[ValueClass].getName)
40+
}
41+
42+
test("key, value, and combiner classes available in shuffle dependency with aggregation") {
43+
sc = new SparkContext("local", "test", conf.clone())
44+
val rdd = sc.parallelize(1 to 5, 4)
45+
.map(key => (KeyClass(), ValueClass()))
46+
.aggregateByKey(CombinerClass())({ case (a, b) => a }, { case (a, b) => a })
47+
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
48+
assert(dep.mapSideCombine && dep.aggregator.isDefined, "Test requires map-side aggregation")
49+
assert(dep.keyClassName == classOf[KeyClass].getName)
50+
assert(dep.valueClassName == classOf[ValueClass].getName)
51+
assert(dep.combinerClassName == Some(classOf[CombinerClass].getName))
52+
}
53+
54+
test("combineByKey null combiner class tag handled correctly") {
55+
sc = new SparkContext("local", "test", conf.clone())
56+
val rdd = sc.parallelize(1 to 5, 4)
57+
.map(key => (KeyClass(), ValueClass()))
58+
.combineByKey((v: ValueClass) => v,
59+
(c: AnyRef, v: ValueClass) => c,
60+
(c1: AnyRef, c2: AnyRef) => c1)
61+
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
62+
assert(dep.keyClassName == classOf[KeyClass].getName)
63+
assert(dep.valueClassName == classOf[ValueClass].getName)
64+
assert(dep.combinerClassName == None)
65+
}
66+
67+
}

0 commit comments

Comments
 (0)