@@ -53,7 +53,7 @@ class KryoSerializer(conf: SparkConf)
5353 private val maxBufferSize = conf.getInt(" spark.kryoserializer.buffer.max.mb" , 64 ) * 1024 * 1024
5454 private val referenceTracking = conf.getBoolean(" spark.kryo.referenceTracking" , true )
5555 private val registrationRequired = conf.getBoolean(" spark.kryo.registrationRequired" , false )
56- private val userRegistratorName = conf.getOption(" spark.kryo.registrator" )
56+ private val userRegistrator = conf.getOption(" spark.kryo.registrator" )
5757 private val classesToRegister = conf.get(" spark.kryo.classesToRegister" , " " )
5858 .split(',' )
5959 .filter(! _.isEmpty)
@@ -91,23 +91,15 @@ class KryoSerializer(conf: SparkConf)
9191 kryo.register(classOf [SerializableWritable [_]], new KryoJavaSerializer ())
9292 kryo.register(classOf [HttpBroadcast [_]], new KryoJavaSerializer ())
9393
94- // Allow the user to register their own classes by setting spark.kryo.registrator.
95- val userRegistrator = userRegistratorName.map( reg => try {
96- Class .forName(reg, true , classLoader).newInstance().asInstanceOf [KryoRegistrator ]
97- } catch {
98- case e : Exception => {
99- throw new SparkException (s " Failed to load user registrator " +
100- userRegistratorName.getOrElse(" " ), e)
101- }
102- })
103-
10494 try {
10595 // Use the default classloader when calling the user registrator.
10696 Thread .currentThread.setContextClassLoader(classLoader)
10797 // Register classes given through spark.kryo.classesToRegister.
10898 classesToRegister.foreach { clazz => kryo.register(clazz) }
109- // Call user registrator.
110- userRegistrator.foreach { reg => reg.registerClasses(kryo) }
99+ // Allow the user to register their own classes by setting spark.kryo.registrator.
100+ userRegistrator
101+ .map(Class .forName(_, true , classLoader).newInstance().asInstanceOf [KryoRegistrator ])
102+ .foreach { reg => reg.registerClasses(kryo) }
111103 } catch {
112104 case e : Exception =>
113105 throw new SparkException (s " Failed to register classes with Kryo " , e)
@@ -123,20 +115,6 @@ class KryoSerializer(conf: SparkConf)
123115 kryo
124116 }
125117
126- private def registerClasses (registrator : KryoRegistrator , classLoader : ClassLoader , kryo : Kryo ) {
127- val oldClassLoader = Thread .currentThread.getContextClassLoader
128- try {
129- // Use the default classloader when calling the user registrator.
130- Thread .currentThread.setContextClassLoader(classLoader)
131- registrator.registerClasses(kryo)
132- } catch {
133- case e : Exception =>
134- throw new SparkException (s " Failed to invoke registrator " + registrator.getClass.getName, e)
135- } finally {
136- Thread .currentThread.setContextClassLoader(oldClassLoader)
137- }
138- }
139-
140118 override def newInstance (): SerializerInstance = {
141119 new KryoSerializerInstance (this )
142120 }
0 commit comments