Skip to content

Commit 6bb56fa

Browse files
sryzapwendell
authored andcommitted
SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
Author: Sandy Ryza <[email protected]> Closes #789 from sryza/sandy-spark-1813 and squashes the following commits: 48b05e9 [Sandy Ryza] Simplify b824932 [Sandy Ryza] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time 6a15bb7 [Sandy Ryza] Small fix a2278c0 [Sandy Ryza] Respond to review comments 6ef592e [Sandy Ryza] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy
1 parent 856b081 commit 6bb56fa

File tree

17 files changed

+195
-87
lines changed

17 files changed

+195
-87
lines changed

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
package org.apache.spark
1919

2020
import scala.collection.JavaConverters._
21-
import scala.collection.mutable.HashMap
21+
import scala.collection.mutable.{HashMap, LinkedHashSet}
22+
import org.apache.spark.serializer.KryoSerializer
2223

2324
/**
2425
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
@@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
140141
this
141142
}
142143

144+
/**
145+
* Use Kryo serialization and register the given set of classes with Kryo.
146+
* If called multiple times, this will append the classes from all calls together.
147+
*/
148+
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
149+
val allClassNames = new LinkedHashSet[String]()
150+
allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
151+
allClassNames ++= classes.map(_.getName)
152+
153+
set("spark.kryo.classesToRegister", allClassNames.mkString(","))
154+
set("spark.serializer", classOf[KryoSerializer].getName)
155+
this
156+
}
157+
143158
/** Remove a parameter from the configuration */
144159
def remove(key: String): SparkConf = {
145160
settings.remove(key)

core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,18 @@ class KryoSerializer(conf: SparkConf)
5353
private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024
5454
private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true)
5555
private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false)
56-
private val registrator = conf.getOption("spark.kryo.registrator")
56+
private val userRegistrator = conf.getOption("spark.kryo.registrator")
57+
private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
58+
.split(',')
59+
.filter(!_.isEmpty)
60+
.map { className =>
61+
try {
62+
Class.forName(className)
63+
} catch {
64+
case e: Exception =>
65+
throw new SparkException("Failed to load class to register with Kryo", e)
66+
}
67+
}
5768

5869
def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
5970

@@ -80,22 +91,20 @@ class KryoSerializer(conf: SparkConf)
8091
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
8192
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
8293

83-
// Allow the user to register their own classes by setting spark.kryo.registrator
84-
for (regCls <- registrator) {
85-
logDebug("Running user registrator: " + regCls)
86-
try {
87-
val reg = Class.forName(regCls, true, classLoader).newInstance()
88-
.asInstanceOf[KryoRegistrator]
89-
90-
// Use the default classloader when calling the user registrator.
91-
Thread.currentThread.setContextClassLoader(classLoader)
92-
reg.registerClasses(kryo)
93-
} catch {
94-
case e: Exception =>
95-
throw new SparkException(s"Failed to invoke $regCls", e)
96-
} finally {
97-
Thread.currentThread.setContextClassLoader(oldClassLoader)
98-
}
94+
try {
95+
// Use the default classloader when calling the user registrator.
96+
Thread.currentThread.setContextClassLoader(classLoader)
97+
// Register classes given through spark.kryo.classesToRegister.
98+
classesToRegister.foreach { clazz => kryo.register(clazz) }
99+
// Allow the user to register their own classes by setting spark.kryo.registrator.
100+
userRegistrator
101+
.map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator])
102+
.foreach { reg => reg.registerClasses(kryo) }
103+
} catch {
104+
case e: Exception =>
105+
throw new SparkException(s"Failed to register classes with Kryo", e)
106+
} finally {
107+
Thread.currentThread.setContextClassLoader(oldClassLoader)
99108
}
100109

101110
// Register Chill's classes; we do this after our ranges and the user's own classes to let

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1418,4 +1418,16 @@ public Optional<Integer> call(Integer i) {
14181418
}
14191419
}
14201420

1421+
static class Class1 {}
1422+
static class Class2 {}
1423+
1424+
@Test
1425+
public void testRegisterKryoClasses() {
1426+
SparkConf conf = new SparkConf();
1427+
conf.registerKryoClasses(new Class[]{ Class1.class, Class2.class });
1428+
Assert.assertEquals(
1429+
Class1.class.getName() + "," + Class2.class.getName(),
1430+
conf.get("spark.kryo.classesToRegister"));
1431+
}
1432+
14211433
}

core/src/test/scala/org/apache/spark/SparkConfSuite.scala

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark
1919

2020
import org.scalatest.FunSuite
21+
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
22+
import com.esotericsoftware.kryo.Kryo
2123

2224
class SparkConfSuite extends FunSuite with LocalSparkContext {
2325
test("loading from system properties") {
@@ -133,4 +135,64 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
133135
System.clearProperty("spark.test.a.b.c")
134136
}
135137
}
138+
139+
test("register kryo classes through registerKryoClasses") {
140+
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
141+
142+
conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2]))
143+
assert(conf.get("spark.kryo.classesToRegister") ===
144+
classOf[Class1].getName + "," + classOf[Class2].getName)
145+
146+
conf.registerKryoClasses(Array(classOf[Class3]))
147+
assert(conf.get("spark.kryo.classesToRegister") ===
148+
classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
149+
150+
conf.registerKryoClasses(Array(classOf[Class2]))
151+
assert(conf.get("spark.kryo.classesToRegister") ===
152+
classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName)
153+
154+
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
155+
// blow up.
156+
val serializer = new KryoSerializer(conf)
157+
serializer.newInstance().serialize(new Class1())
158+
serializer.newInstance().serialize(new Class2())
159+
serializer.newInstance().serialize(new Class3())
160+
}
161+
162+
test("register kryo classes through registerKryoClasses and custom registrator") {
163+
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
164+
165+
conf.registerKryoClasses(Array(classOf[Class1]))
166+
assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName)
167+
168+
conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName)
169+
170+
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
171+
// blow up.
172+
val serializer = new KryoSerializer(conf)
173+
serializer.newInstance().serialize(new Class1())
174+
serializer.newInstance().serialize(new Class2())
175+
}
176+
177+
test("register kryo classes through conf") {
178+
val conf = new SparkConf().set("spark.kryo.registrationRequired", "true")
179+
conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer")
180+
conf.set("spark.serializer", classOf[KryoSerializer].getName)
181+
182+
// Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't
183+
// blow up.
184+
val serializer = new KryoSerializer(conf)
185+
serializer.newInstance().serialize(new StringBuffer())
186+
}
187+
188+
}
189+
190+
class Class1 {}
191+
class Class2 {}
192+
class Class3 {}
193+
194+
class CustomRegistrator extends KryoRegistrator {
195+
def registerClasses(kryo: Kryo) {
196+
kryo.register(classOf[Class2])
197+
}
136198
}

core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,13 +210,13 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext {
210210
}
211211

212212
test("kryo with nonexistent custom registrator should fail") {
213-
import org.apache.spark.{SparkConf, SparkException}
213+
import org.apache.spark.SparkException
214214

215215
val conf = new SparkConf(false)
216216
conf.set("spark.kryo.registrator", "this.class.does.not.exist")
217-
217+
218218
val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance())
219-
assert(thrown.getMessage.contains("Failed to invoke this.class.does.not.exist"))
219+
assert(thrown.getMessage.contains("Failed to register classes with Kryo"))
220220
}
221221

222222
test("default class loader can be set by a different thread") {

docs/configuration.md

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,23 @@ of the most common options to set are:
124124
<code>org.apache.spark.Serializer</code></a>.
125125
</td>
126126
</tr>
127+
<tr>
128+
<td><code>spark.kryo.classesToRegister</code></td>
129+
<td>(none)</td>
130+
<td>
131+
If you use Kryo serialization, give a comma-separated list of custom class names to register
132+
with Kryo.
133+
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
134+
</td>
135+
</tr>
127136
<tr>
128137
<td><code>spark.kryo.registrator</code></td>
129138
<td>(none)</td>
130139
<td>
131-
If you use Kryo serialization, set this class to register your custom classes with Kryo.
132-
It should be set to a class that extends
140+
If you use Kryo serialization, set this class to register your custom classes with Kryo. This
141+
property is useful if you need to register your classes in a custom way, e.g. to specify a custom
142+
field serializer. Otherwise <code>spark.kryo.classesToRegister</code> is simpler. It should be
143+
set to a class that extends
133144
<a href="api/scala/index.html#org.apache.spark.serializer.KryoRegistrator">
134145
<code>KryoRegistrator</code></a>.
135146
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.

docs/tuning.md

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -47,24 +47,11 @@ registration requirement, but we recommend trying it in any network-intensive ap
4747
Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered
4848
in the AllScalaRegistrar from the [Twitter chill](https://github.com/twitter/chill) library.
4949

50-
To register your own custom classes with Kryo, create a public class that extends
51-
[`org.apache.spark.serializer.KryoRegistrator`](api/scala/index.html#org.apache.spark.serializer.KryoRegistrator) and set the
52-
`spark.kryo.registrator` config property to point to it, as follows:
50+
To register your own custom classes with Kryo, use the `registerKryoClasses` method.
5351

5452
{% highlight scala %}
55-
import com.esotericsoftware.kryo.Kryo
56-
import org.apache.spark.serializer.KryoRegistrator
57-
58-
class MyRegistrator extends KryoRegistrator {
59-
override def registerClasses(kryo: Kryo) {
60-
kryo.register(classOf[MyClass1])
61-
kryo.register(classOf[MyClass2])
62-
}
63-
}
64-
6553
val conf = new SparkConf().setMaster(...).setAppName(...)
66-
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
67-
conf.set("spark.kryo.registrator", "mypackage.MyRegistrator")
54+
conf.registerKryoClasses(Seq(classOf[MyClass1], classOf[MyClass2]))
6855
val sc = new SparkContext(conf)
6956
{% endhighlight %}
7057

examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,7 @@
1818
package org.apache.spark.examples.bagel
1919

2020
import org.apache.spark._
21-
import org.apache.spark.SparkContext._
22-
import org.apache.spark.serializer.KryoRegistrator
23-
2421
import org.apache.spark.bagel._
25-
import org.apache.spark.bagel.Bagel._
26-
27-
import scala.collection.mutable.ArrayBuffer
28-
29-
import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream}
30-
31-
import com.esotericsoftware.kryo._
3222

3323
class PageRankUtils extends Serializable {
3424
def computeWithCombiner(numVertices: Long, epsilon: Double)(
@@ -99,13 +89,6 @@ class PRMessage() extends Message[String] with Serializable {
9989
}
10090
}
10191

102-
class PRKryoRegistrator extends KryoRegistrator {
103-
def registerClasses(kryo: Kryo) {
104-
kryo.register(classOf[PRVertex])
105-
kryo.register(classOf[PRMessage])
106-
}
107-
}
108-
10992
class CustomPartitioner(partitions: Int) extends Partitioner {
11093
def numPartitions = partitions
11194

examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ object WikipediaPageRank {
3838
}
3939
val sparkConf = new SparkConf()
4040
sparkConf.setAppName("WikipediaPageRank")
41-
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
42-
sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName)
41+
sparkConf.registerKryoClasses(Array(classOf[PRVertex], classOf[PRMessage]))
4342

4443
val inputFile = args(0)
4544
val threshold = args(1).toDouble

examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,8 @@ object Analytics extends Logging {
4646
}
4747
val options = mutable.Map(optionsList: _*)
4848

49-
val conf = new SparkConf()
50-
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
51-
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
52-
.set("spark.locality.wait", "100000")
49+
val conf = new SparkConf().set("spark.locality.wait", "100000")
50+
GraphXUtils.registerKryoClasses(conf)
5351

5452
val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
5553
println("Set the number of edge partitions using --numEPart.")

0 commit comments

Comments
 (0)