From 6ef592ee5a4359261266d7dbeab842c53a83f402 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 14 May 2014 00:23:49 -0700 Subject: [PATCH 1/5] SPARK-1813. Add a utility to SparkConf that makes using Kryo really easy --- .../scala/org/apache/spark/SparkConf.scala | 18 +++++++- .../spark/serializer/KryoSerializer.scala | 46 ++++++++++++------- .../org/apache/spark/SparkConfSuite.scala | 35 ++++++++++++++ .../serializer/KryoSerializerSuite.scala | 6 +-- docs/tuning.md | 17 +------ .../spark/examples/bagel/PageRankUtils.scala | 7 --- .../examples/bagel/WikipediaPageRank.scala | 3 +- .../spark/examples/graphx/Analytics.scala | 6 +-- .../examples/graphx/SynthBenchmark.scala | 5 +- .../spark/examples/mllib/MovieLensALS.scala | 10 +--- .../spark/graphx/GraphKryoRegistrator.scala | 4 +- .../org/apache/spark/graphx/GraphXUtils.scala | 45 ++++++++++++++++++ .../spark/graphx/LocalSparkContext.scala | 6 +-- .../graphx/impl/EdgePartitionSuite.scala | 6 +-- .../graphx/impl/VertexPartitionSuite.scala | 6 +-- 15 files changed, 148 insertions(+), 72 deletions(-) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 605df0e929faa..fccf820d234d3 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -18,7 +18,9 @@ package org.apache.spark import scala.collection.JavaConverters._ -import scala.collection.mutable.HashMap +import scala.collection.mutable.{HashMap, LinkedHashSet} +import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} +import com.esotericsoftware.kryo.Kryo /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -140,6 +142,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } + /** + * Use Kryo serialization and register the given set of classes with Kryo. + * If called multiple times, this will append the classes from all calls together. + */ + def registerKryoClasses(classes: Seq[Class[_ <: Any]]): SparkConf = { + val allClassNames = new LinkedHashSet[String]() + allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty) + allClassNames ++= classes.map(_.getName) + + set("spark.kryo.classesToRegister", allClassNames.mkString(",")) + set("spark.serializer", classOf[KryoSerializer].getName) + this + } + /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { settings.remove(key) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index d6386f8c06fff..b94ad9ff59dad 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -53,7 +53,6 @@ class KryoSerializer(conf: SparkConf) private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val registrator = conf.getOption("spark.kryo.registrator") def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) @@ -81,21 +80,20 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) // Allow the user to register their own classes by setting spark.kryo.registrator - for (regCls <- registrator) { - logDebug("Running user registrator: " + regCls) - try { - val reg = Class.forName(regCls, true, classLoader).newInstance() - .asInstanceOf[KryoRegistrator] - - // Use the default classloader when calling the user registrator. - Thread.currentThread.setContextClassLoader(classLoader) - reg.registerClasses(kryo) - } catch { - case e: Exception => - throw new SparkException(s"Failed to invoke $regCls", e) - } finally { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } + try { + val reg = conf.getOption("spark.kryo.registrator").map( + Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]).getOrElse( + new DefaultKryoRegistrator(conf)) + + // Use the default classloader when calling the user registrator. + Thread.currentThread.setContextClassLoader(classLoader) + reg.registerClasses(kryo) + } catch { + case e: Exception => + throw new SparkException(s"Failed to invoke registrator " + + conf.get("spark.kryo.registrator", ""), e) + } finally { + Thread.currentThread.setContextClassLoader(oldClassLoader) } // Register Chill's classes; we do this after our ranges and the user's own classes to let @@ -237,6 +235,22 @@ private class JavaIterableWrapperSerializer } } +private class DefaultKryoRegistrator(conf: SparkConf) extends KryoRegistrator { + override def registerClasses(kryo: Kryo) { + conf.getOption("spark.kryo.classesToRegister").foreach { classNames => + for (className <- classNames.split(',')) { + try { + val clazz = Class.forName(className) + kryo.register(clazz) + } catch { + case e: Exception => + throw new SparkException("Failed to load class to register with Kryo", e) + } + } + } + } +} + private object JavaIterableWrapperSerializer extends Logging { // The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper). val wrapperClass = diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 87e9012622456..3aec79dcd2d53 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark import org.scalatest.FunSuite +import org.apache.spark.serializer.KryoSerializer class SparkConfSuite extends FunSuite with LocalSparkContext { test("loading from system properties") { @@ -133,4 +134,38 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { System.clearProperty("spark.test.a.b.c") } } + + test("register kryo classes through registerKryoClasses") { + val conf = new SparkConf() + class Class1 {} + class Class2 {} + class Class3 {} + + conf.registerKryoClasses(Seq(classOf[Class1], classOf[Class2])) + assert(conf.get("spark.kryo.classesToRegister") == + classOf[Class1].getName + "," + classOf[Class2].getName) + + conf.registerKryoClasses(Seq(classOf[Class3])) + assert(conf.get("spark.kryo.classesToRegister") == + classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) + + conf.registerKryoClasses(Seq(classOf[Class2])) + assert(conf.get("spark.kryo.classesToRegister") == + classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + new KryoSerializer(conf) + } + + test("register kryo classes through conf") { + val conf = new SparkConf() + conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer") + conf.set("spark.serializer", classOf[KryoSerializer].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + new KryoSerializer(conf) + } + } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index e1e35b688d581..e1f5b35abf823 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -210,13 +210,13 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { } test("kryo with nonexistent custom registrator should fail") { - import org.apache.spark.{SparkConf, SparkException} + import org.apache.spark.SparkException val conf = new SparkConf(false) conf.set("spark.kryo.registrator", "this.class.does.not.exist") - + val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance()) - assert(thrown.getMessage.contains("Failed to invoke this.class.does.not.exist")) + assert(thrown.getMessage.contains("Failed to invoke registrator this.class.does.not.exist")) } test("default class loader can be set by a different thread") { diff --git a/docs/tuning.md b/docs/tuning.md index 8fb2a0433b1a8..9b5c9adac6a4f 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -47,24 +47,11 @@ registration requirement, but we recommend trying it in any network-intensive ap Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered in the AllScalaRegistrar from the [Twitter chill](https://github.com/twitter/chill) library. -To register your own custom classes with Kryo, create a public class that extends -[`org.apache.spark.serializer.KryoRegistrator`](api/scala/index.html#org.apache.spark.serializer.KryoRegistrator) and set the -`spark.kryo.registrator` config property to point to it, as follows: +To register your own custom classes with Kryo, use the `registerKryoClasses` method. {% highlight scala %} -import com.esotericsoftware.kryo.Kryo -import org.apache.spark.serializer.KryoRegistrator - -class MyRegistrator extends KryoRegistrator { - override def registerClasses(kryo: Kryo) { - kryo.register(classOf[MyClass1]) - kryo.register(classOf[MyClass2]) - } -} - val conf = new SparkConf().setMaster(...).setAppName(...) -conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") -conf.set("spark.kryo.registrator", "mypackage.MyRegistrator") +conf.registerKryoClasses(Seq(classOf[MyClass1], classOf[MyClass2])) val sc = new SparkContext(conf) {% endhighlight %} diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index e06f4dcd54442..1a3aeea2abbb6 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -99,13 +99,6 @@ class PRMessage() extends Message[String] with Serializable { } } -class PRKryoRegistrator extends KryoRegistrator { - def registerClasses(kryo: Kryo) { - kryo.register(classOf[PRVertex]) - kryo.register(classOf[PRMessage]) - } -} - class CustomPartitioner(partitions: Int) extends Partitioner { def numPartitions = partitions diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index e4db3ec51313d..ce70a2cc37c69 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -38,8 +38,7 @@ object WikipediaPageRank { } val sparkConf = new SparkConf() sparkConf.setAppName("WikipediaPageRank") - sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - sparkConf.set("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) + sparkConf.registerKryoClasses(Seq(classOf[PRVertex], classOf[PRMessage])) val inputFile = args(0) val threshold = args(1).toDouble diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala index 45527d9382fd0..d70d93608a57c 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala @@ -46,10 +46,8 @@ object Analytics extends Logging { } val options = mutable.Map(optionsList: _*) - val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") - .set("spark.locality.wait", "100000") + val conf = new SparkConf().set("spark.locality.wait", "100000") + GraphXUtils.registerKryoClasses(conf) val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { println("Set the number of edge partitions using --numEPart.") diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index 5f35a5836462e..05676021718d9 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -18,7 +18,7 @@ package org.apache.spark.examples.graphx import org.apache.spark.SparkContext._ -import org.apache.spark.graphx.PartitionStrategy +import org.apache.spark.graphx.{GraphXUtils, PartitionStrategy} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.graphx.util.GraphGenerators import java.io.{PrintWriter, FileOutputStream} @@ -80,8 +80,7 @@ object SynthBenchmark { val conf = new SparkConf() .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)") - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + GraphXUtils.registerKryoClasses(conf) val sc = new SparkContext(conf) diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index fc6678013b932..2030f21cb1a52 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -40,13 +40,6 @@ import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} */ object MovieLensALS { - class ALSRegistrator extends KryoRegistrator { - override def registerClasses(kryo: Kryo) { - kryo.register(classOf[Rating]) - kryo.register(classOf[mutable.BitSet]) - } - } - case class Params( input: String = null, kryo: Boolean = false, @@ -108,8 +101,7 @@ object MovieLensALS { def run(params: Params) { val conf = new SparkConf().setAppName(s"MovieLensALS with $params") if (params.kryo) { - conf.set("spark.serializer", classOf[KryoSerializer].getName) - .set("spark.kryo.registrator", classOf[ALSRegistrator].getName) + conf.registerKryoClasses(Seq(classOf[mutable.BitSet], classOf[Rating])) .set("spark.kryoserializer.buffer.mb", "8") } val sc = new SparkContext(conf) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index 1948c978c30bf..dd4be074b20c3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -30,8 +30,10 @@ import org.apache.spark.util.collection.OpenHashSet /** * Registers GraphX classes with Kryo for improved performance. + * + * This is deprecated in favor of using `GraphXUtils.registerKryoClasses(conf)` */ -class GraphKryoRegistrator extends KryoRegistrator { +@deprecated class GraphKryoRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo) { kryo.register(classOf[Edge[Object]]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala new file mode 100644 index 0000000000000..5c8e58cd48454 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx + +import org.apache.spark.SparkConf +import org.apache.spark.graphx.impl._ +import org.apache.spark.util.collection.{OpenHashSet, BitSet} +import org.apache.spark.util.BoundedPriorityQueue +import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap + +object GraphXUtils { + /** + * Registers classes that GraphX uses with Kryo. + */ + def registerKryoClasses(conf: SparkConf) { + conf.registerKryoClasses(Seq( + classOf[Edge[Object]], + classOf[(VertexId, Object)], + classOf[EdgePartition[Object, Object]], + classOf[BitSet], + classOf[VertexIdToIndexMap], + classOf[VertexAttributeBlock[Object]], + classOf[PartitionStrategy], + classOf[BoundedPriorityQueue[Object]], + classOf[EdgeDirection], + classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]], + classOf[OpenHashSet[Int]], + classOf[OpenHashSet[Long]])) + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala index 47594a800a3b1..a3e28efc75a98 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/LocalSparkContext.scala @@ -17,9 +17,6 @@ package org.apache.spark.graphx -import org.scalatest.Suite -import org.scalatest.BeforeAndAfterEach - import org.apache.spark.SparkConf import org.apache.spark.SparkContext @@ -31,8 +28,7 @@ trait LocalSparkContext { /** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */ def withSpark[T](f: SparkContext => T) = { val conf = new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + GraphXUtils.registerKryoClasses(conf) val sc = new SparkContext("local", "test", conf) try { f(sc) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index 9d00f76327e4c..db1dac6160080 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -129,9 +129,9 @@ class EdgePartitionSuite extends FunSuite { val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) val a: EdgePartition[Int, Int] = makeEdgePartition(aList) val javaSer = new JavaSerializer(new SparkConf()) - val kryoSer = new KryoSerializer(new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")) + val conf = new SparkConf() + GraphXUtils.registerKryoClasses(conf) + val kryoSer = new KryoSerializer(conf) for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) { val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a)) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala index f9e771a900013..fe8304c1cdc32 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala @@ -125,9 +125,9 @@ class VertexPartitionSuite extends FunSuite { val verts = Set((0L, 1), (1L, 1), (2L, 1)) val vp = VertexPartition(verts.iterator) val javaSer = new JavaSerializer(new SparkConf()) - val kryoSer = new KryoSerializer(new SparkConf() - .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")) + val conf = new SparkConf() + GraphXUtils.registerKryoClasses(conf) + val kryoSer = new KryoSerializer(conf) for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) { val vpSer: VertexPartition[Int] = s.deserialize(s.serialize(vp)) From a2278c095dde0cc9c9d34f1fcc17f70686942a39 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 25 Sep 2014 14:51:36 -0700 Subject: [PATCH 2/5] Respond to review comments --- .../scala/org/apache/spark/SparkConf.scala | 5 ++- .../spark/serializer/KryoSerializer.scala | 35 ++++++++++--------- .../java/org/apache/spark/JavaAPISuite.java | 12 +++++++ .../org/apache/spark/SparkConfSuite.scala | 31 +++++++++------- docs/configuration.md | 16 +++++++-- .../spark/examples/bagel/PageRankUtils.scala | 10 ------ .../examples/bagel/WikipediaPageRank.scala | 2 +- .../spark/examples/mllib/MovieLensALS.scala | 4 +-- .../spark/graphx/GraphKryoRegistrator.scala | 6 ++-- .../org/apache/spark/graphx/GraphXUtils.scala | 6 ++-- 10 files changed, 73 insertions(+), 54 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index fccf820d234d3..90827ca3a2592 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -19,8 +19,7 @@ package org.apache.spark import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, LinkedHashSet} -import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} -import com.esotericsoftware.kryo.Kryo +import org.apache.spark.serializer.KryoSerializer /** * Configuration for a Spark application. Used to set various Spark parameters as key-value pairs. @@ -146,7 +145,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * Use Kryo serialization and register the given set of classes with Kryo. * If called multiple times, this will append the classes from all calls together. */ - def registerKryoClasses(classes: Seq[Class[_ <: Any]]): SparkConf = { + def registerKryoClasses(classes: Array[Class[_ <: Any]]): SparkConf = { val allClassNames = new LinkedHashSet[String]() allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty) allClassNames ++= classes.map(_.getName) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index b94ad9ff59dad..299184db74258 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -53,6 +53,18 @@ class KryoSerializer(conf: SparkConf) private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) + private val userRegistrator = conf.getOption("spark.kryo.registrator") + private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") + .split(',') + .filter(!_.isEmpty) + .map { className => + try { + Class.forName(className) + } catch { + case e: Exception => + throw new SparkException("Failed to load class to register with Kryo", e) + } + } def newKryoOutput() = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) @@ -81,9 +93,10 @@ class KryoSerializer(conf: SparkConf) // Allow the user to register their own classes by setting spark.kryo.registrator try { - val reg = conf.getOption("spark.kryo.registrator").map( - Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]).getOrElse( - new DefaultKryoRegistrator(conf)) + val reg = userRegistrator + .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]) + .getOrElse(new DefaultKryoRegistrator(classesToRegister)) + logDebug("Running Kryo registrator: " + reg.getClass.getName) // Use the default classloader when calling the user registrator. Thread.currentThread.setContextClassLoader(classLoader) @@ -91,7 +104,7 @@ class KryoSerializer(conf: SparkConf) } catch { case e: Exception => throw new SparkException(s"Failed to invoke registrator " + - conf.get("spark.kryo.registrator", ""), e) + userRegistrator.getOrElse(""), e) } finally { Thread.currentThread.setContextClassLoader(oldClassLoader) } @@ -235,19 +248,9 @@ private class JavaIterableWrapperSerializer } } -private class DefaultKryoRegistrator(conf: SparkConf) extends KryoRegistrator { +private class DefaultKryoRegistrator(classes: Seq[Class[_ <: Any]]) extends KryoRegistrator { override def registerClasses(kryo: Kryo) { - conf.getOption("spark.kryo.classesToRegister").foreach { classNames => - for (className <- classNames.split(',')) { - try { - val clazz = Class.forName(className) - kryo.register(clazz) - } catch { - case e: Exception => - throw new SparkException("Failed to load class to register with Kryo", e) - } - } - } + classes.foreach { clazz => kryo.register(clazz) } } } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 4a078435447e5..f0a41bd1c5b9a 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1333,4 +1333,16 @@ public Optional call(Integer i) { } } + static class Class1 {} + static class Class2 {} + + @Test + public void testRegisterKryoClasses() { + SparkConf conf = new SparkConf(); + conf.registerKryoClasses(new Class[]{ Class1.class, Class2.class }); + Assert.assertEquals( + Class1.class.getName() + "," + Class2.class.getName(), + conf.get("spark.kryo.classesToRegister")); + } + } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 3aec79dcd2d53..437a852e7b2b1 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -136,36 +136,41 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { } test("register kryo classes through registerKryoClasses") { - val conf = new SparkConf() - class Class1 {} - class Class2 {} - class Class3 {} + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") - conf.registerKryoClasses(Seq(classOf[Class1], classOf[Class2])) - assert(conf.get("spark.kryo.classesToRegister") == + conf.registerKryoClasses(Array(classOf[Class1], classOf[Class2])) + assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName + "," + classOf[Class2].getName) - conf.registerKryoClasses(Seq(classOf[Class3])) - assert(conf.get("spark.kryo.classesToRegister") == + conf.registerKryoClasses(Array(classOf[Class3])) + assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) - conf.registerKryoClasses(Seq(classOf[Class2])) - assert(conf.get("spark.kryo.classesToRegister") == + conf.registerKryoClasses(Array(classOf[Class2])) + assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName + "," + classOf[Class2].getName + "," + classOf[Class3].getName) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. - new KryoSerializer(conf) + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new Class1()) + serializer.newInstance().serialize(new Class2()) + serializer.newInstance().serialize(new Class3()) } test("register kryo classes through conf") { - val conf = new SparkConf() + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer") conf.set("spark.serializer", classOf[KryoSerializer].getName) // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't // blow up. - new KryoSerializer(conf) + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new StringBuffer()) } } + +class Class1 {} +class Class2 {} +class Class3 {} diff --git a/docs/configuration.md b/docs/configuration.md index f311f0d2a6206..25db2c8be0edf 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -124,12 +124,24 @@ of the most common options to set are: org.apache.spark.Serializer. + + spark.kryo.classesToRegister + (none) + + If you use Kryo serialization, give a comma-separated list of custom class names to register + with Kryo. If a custom registrator is given through spark.kryo.registrator it + overrides any classes specified through this property. + See the tuning guide for more details. + + spark.kryo.registrator (none) - If you use Kryo serialization, set this class to register your custom classes with Kryo. - It should be set to a class that extends + If you use Kryo serialization, set this class to register your custom classes with Kryo. This + property is useful if you need to register your classes in a custom way, e.g. to specify a custom + field serializer. Otherwise spark.kryo.classesToRegister is simpler. It should be + set to a class that extends KryoRegistrator. See the tuning guide for more details. diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala index 1a3aeea2abbb6..e322d4ce5a745 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/PageRankUtils.scala @@ -18,17 +18,7 @@ package org.apache.spark.examples.bagel import org.apache.spark._ -import org.apache.spark.SparkContext._ -import org.apache.spark.serializer.KryoRegistrator - import org.apache.spark.bagel._ -import org.apache.spark.bagel.Bagel._ - -import scala.collection.mutable.ArrayBuffer - -import java.io.{InputStream, OutputStream, DataInputStream, DataOutputStream} - -import com.esotericsoftware.kryo._ class PageRankUtils extends Serializable { def computeWithCombiner(numVertices: Long, epsilon: Double)( diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala index ce70a2cc37c69..859abedf2a55e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala +++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRank.scala @@ -38,7 +38,7 @@ object WikipediaPageRank { } val sparkConf = new SparkConf() sparkConf.setAppName("WikipediaPageRank") - sparkConf.registerKryoClasses(Seq(classOf[PRVertex], classOf[PRMessage])) + sparkConf.registerKryoClasses(Array(classOf[PRVertex], classOf[PRMessage])) val inputFile = args(0) val threshold = args(1).toDouble diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala index 2030f21cb1a52..8796c28db8a66 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/MovieLensALS.scala @@ -19,7 +19,6 @@ package org.apache.spark.examples.mllib import scala.collection.mutable -import com.esotericsoftware.kryo.Kryo import org.apache.log4j.{Level, Logger} import scopt.OptionParser @@ -27,7 +26,6 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating} import org.apache.spark.rdd.RDD -import org.apache.spark.serializer.{KryoSerializer, KryoRegistrator} /** * An example app for ALS on MovieLens data (http://grouplens.org/datasets/movielens/). @@ -101,7 +99,7 @@ object MovieLensALS { def run(params: Params) { val conf = new SparkConf().setAppName(s"MovieLensALS with $params") if (params.kryo) { - conf.registerKryoClasses(Seq(classOf[mutable.BitSet], classOf[Rating])) + conf.registerKryoClasses(Array(classOf[mutable.BitSet], classOf[Rating])) .set("spark.kryoserializer.buffer.mb", "8") } val sc = new SparkContext(conf) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index dd4be074b20c3..563c948957ecf 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -27,13 +27,11 @@ import org.apache.spark.graphx.impl._ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap import org.apache.spark.util.collection.OpenHashSet - /** * Registers GraphX classes with Kryo for improved performance. - * - * This is deprecated in favor of using `GraphXUtils.registerKryoClasses(conf)` */ -@deprecated class GraphKryoRegistrator extends KryoRegistrator { +@deprecated("Register GraphX classes with Kryo using GraphXUtils.registerKryoClasses", "1.2.0") +class GraphKryoRegistrator extends KryoRegistrator { def registerClasses(kryo: Kryo) { kryo.register(classOf[Edge[Object]]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala index 5c8e58cd48454..2cb07937eaa2a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphXUtils.scala @@ -18,17 +18,19 @@ package org.apache.spark.graphx import org.apache.spark.SparkConf + import org.apache.spark.graphx.impl._ +import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap + import org.apache.spark.util.collection.{OpenHashSet, BitSet} import org.apache.spark.util.BoundedPriorityQueue -import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap object GraphXUtils { /** * Registers classes that GraphX uses with Kryo. */ def registerKryoClasses(conf: SparkConf) { - conf.registerKryoClasses(Seq( + conf.registerKryoClasses(Array( classOf[Edge[Object]], classOf[(VertexId, Object)], classOf[EdgePartition[Object, Object]], From 6a15bb78f34138b0b1ce8ecd6c1a5c08f64ecad6 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 29 Sep 2014 18:40:03 -0700 Subject: [PATCH 3/5] Small fix --- core/src/main/scala/org/apache/spark/SparkConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 90827ca3a2592..dbbcc23305c50 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -145,7 +145,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * Use Kryo serialization and register the given set of classes with Kryo. * If called multiple times, this will append the classes from all calls together. */ - def registerKryoClasses(classes: Array[Class[_ <: Any]]): SparkConf = { + def registerKryoClasses(classes: Array[Class[_]]): SparkConf = { val allClassNames = new LinkedHashSet[String]() allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty) allClassNames ++= classes.map(_.getName) From b824932c3b576f2b156fe8187e1d3a95e7305df4 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 1 Oct 2014 22:11:39 -0700 Subject: [PATCH 4/5] Allow both spark.kryo.classesToRegister and spark.kryo.registrator at the same time --- .../spark/serializer/KryoSerializer.scala | 46 ++++++++++++------- .../org/apache/spark/SparkConfSuite.scala | 24 +++++++++- .../serializer/KryoSerializerSuite.scala | 2 +- docs/configuration.md | 3 +- 4 files changed, 55 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 299184db74258..905efcd994786 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -53,7 +53,7 @@ class KryoSerializer(conf: SparkConf) private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val userRegistrator = conf.getOption("spark.kryo.registrator") + private val userRegistratorName = conf.getOption("spark.kryo.registrator") private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) @@ -91,20 +91,26 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) - // Allow the user to register their own classes by setting spark.kryo.registrator - try { - val reg = userRegistrator - .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]) - .getOrElse(new DefaultKryoRegistrator(classesToRegister)) - logDebug("Running Kryo registrator: " + reg.getClass.getName) + // Allow the user to register their own classes by setting spark.kryo.registrator. + val userRegistrator = userRegistratorName.map( reg => try { + Class.forName(reg, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] + } catch { + case e: Exception => { + throw new SparkException(s"Failed to load user registrator " + + userRegistratorName.getOrElse(""), e) + } + }) + try { // Use the default classloader when calling the user registrator. Thread.currentThread.setContextClassLoader(classLoader) - reg.registerClasses(kryo) + // Register classes given through spark.kryo.classesToRegister. + classesToRegister.foreach { clazz => kryo.register(clazz) } + // Call user registrator. + userRegistrator.foreach { reg => reg.registerClasses(kryo) } } catch { case e: Exception => - throw new SparkException(s"Failed to invoke registrator " + - userRegistrator.getOrElse(""), e) + throw new SparkException(s"Failed to register classes with Kryo", e) } finally { Thread.currentThread.setContextClassLoader(oldClassLoader) } @@ -117,6 +123,20 @@ class KryoSerializer(conf: SparkConf) kryo } + private def registerClasses(registrator: KryoRegistrator, classLoader: ClassLoader, kryo: Kryo) { + val oldClassLoader = Thread.currentThread.getContextClassLoader + try { + // Use the default classloader when calling the user registrator. + Thread.currentThread.setContextClassLoader(classLoader) + registrator.registerClasses(kryo) + } catch { + case e: Exception => + throw new SparkException(s"Failed to invoke registrator " + registrator.getClass.getName, e) + } finally { + Thread.currentThread.setContextClassLoader(oldClassLoader) + } + } + override def newInstance(): SerializerInstance = { new KryoSerializerInstance(this) } @@ -248,12 +268,6 @@ private class JavaIterableWrapperSerializer } } -private class DefaultKryoRegistrator(classes: Seq[Class[_ <: Any]]) extends KryoRegistrator { - override def registerClasses(kryo: Kryo) { - classes.foreach { clazz => kryo.register(clazz) } - } -} - private object JavaIterableWrapperSerializer extends Logging { // The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper). val wrapperClass = diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 437a852e7b2b1..5d018ea9868a7 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark import org.scalatest.FunSuite -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer} +import com.esotericsoftware.kryo.Kryo class SparkConfSuite extends FunSuite with LocalSparkContext { test("loading from system properties") { @@ -158,6 +159,21 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { serializer.newInstance().serialize(new Class3()) } + test("register kryo classes through registerKryoClasses and custom registrator") { + val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") + + conf.registerKryoClasses(Array(classOf[Class1])) + assert(conf.get("spark.kryo.classesToRegister") === classOf[Class1].getName) + + conf.set("spark.kryo.registrator", classOf[CustomRegistrator].getName) + + // Kryo doesn't expose a way to discover registered classes, but at least make sure this doesn't + // blow up. + val serializer = new KryoSerializer(conf) + serializer.newInstance().serialize(new Class1()) + serializer.newInstance().serialize(new Class2()) + } + test("register kryo classes through conf") { val conf = new SparkConf().set("spark.kryo.registrationRequired", "true") conf.set("spark.kryo.classesToRegister", "java.lang.StringBuffer") @@ -174,3 +190,9 @@ class SparkConfSuite extends FunSuite with LocalSparkContext { class Class1 {} class Class2 {} class Class3 {} + +class CustomRegistrator extends KryoRegistrator { + def registerClasses(kryo: Kryo) { + kryo.register(classOf[Class2]) + } +} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index e1f5b35abf823..58367c9930437 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -216,7 +216,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { conf.set("spark.kryo.registrator", "this.class.does.not.exist") val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance()) - assert(thrown.getMessage.contains("Failed to invoke registrator this.class.does.not.exist")) + assert(thrown.getMessage.contains("Failed to load user registrator this.class.does.not.exist")) } test("default class loader can be set by a different thread") { diff --git a/docs/configuration.md b/docs/configuration.md index 25db2c8be0edf..9e4cef66ba36c 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -129,8 +129,7 @@ of the most common options to set are: (none) If you use Kryo serialization, give a comma-separated list of custom class names to register - with Kryo. If a custom registrator is given through spark.kryo.registrator it - overrides any classes specified through this property. + with Kryo. See the tuning guide for more details. From 48b05e9c45d6c216d9d4ae2c43f85debe3af73c9 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Wed, 1 Oct 2014 23:32:50 -0700 Subject: [PATCH 5/5] Simplify --- .../spark/serializer/KryoSerializer.scala | 32 +++---------------- .../serializer/KryoSerializerSuite.scala | 2 +- 2 files changed, 6 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 905efcd994786..621a951c27d07 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -53,7 +53,7 @@ class KryoSerializer(conf: SparkConf) private val maxBufferSize = conf.getInt("spark.kryoserializer.buffer.max.mb", 64) * 1024 * 1024 private val referenceTracking = conf.getBoolean("spark.kryo.referenceTracking", true) private val registrationRequired = conf.getBoolean("spark.kryo.registrationRequired", false) - private val userRegistratorName = conf.getOption("spark.kryo.registrator") + private val userRegistrator = conf.getOption("spark.kryo.registrator") private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) @@ -91,23 +91,15 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) - // Allow the user to register their own classes by setting spark.kryo.registrator. - val userRegistrator = userRegistratorName.map( reg => try { - Class.forName(reg, true, classLoader).newInstance().asInstanceOf[KryoRegistrator] - } catch { - case e: Exception => { - throw new SparkException(s"Failed to load user registrator " + - userRegistratorName.getOrElse(""), e) - } - }) - try { // Use the default classloader when calling the user registrator. Thread.currentThread.setContextClassLoader(classLoader) // Register classes given through spark.kryo.classesToRegister. classesToRegister.foreach { clazz => kryo.register(clazz) } - // Call user registrator. - userRegistrator.foreach { reg => reg.registerClasses(kryo) } + // Allow the user to register their own classes by setting spark.kryo.registrator. + userRegistrator + .map(Class.forName(_, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]) + .foreach { reg => reg.registerClasses(kryo) } } catch { case e: Exception => throw new SparkException(s"Failed to register classes with Kryo", e) @@ -123,20 +115,6 @@ class KryoSerializer(conf: SparkConf) kryo } - private def registerClasses(registrator: KryoRegistrator, classLoader: ClassLoader, kryo: Kryo) { - val oldClassLoader = Thread.currentThread.getContextClassLoader - try { - // Use the default classloader when calling the user registrator. - Thread.currentThread.setContextClassLoader(classLoader) - registrator.registerClasses(kryo) - } catch { - case e: Exception => - throw new SparkException(s"Failed to invoke registrator " + registrator.getClass.getName, e) - } finally { - Thread.currentThread.setContextClassLoader(oldClassLoader) - } - } - override def newInstance(): SerializerInstance = { new KryoSerializerInstance(this) } diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 58367c9930437..64ac6d2d920d2 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -216,7 +216,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { conf.set("spark.kryo.registrator", "this.class.does.not.exist") val thrown = intercept[SparkException](new KryoSerializer(conf).newInstance()) - assert(thrown.getMessage.contains("Failed to load user registrator this.class.does.not.exist")) + assert(thrown.getMessage.contains("Failed to register classes with Kryo")) } test("default class loader can be set by a different thread") {