From acbaf8b65629344d760360b768e89f1712af8942 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 19 Sep 2017 10:19:43 -0500 Subject: [PATCH 1/2] [SPARK-21928][CORE] Set classloader on SerializerManager private kryo We have to make sure thatthat SerializerManager's private instance of kryo also uses the right classloader, regardless of the current thread classloader. In particular, this fixes serde during remote cache fetches, as those occur in netty threads. --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 +++ .../scala/org/apache/spark/serializer/SerializerManager.scala | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 6237a4df0be58..3c738edf0b29a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -130,6 +130,9 @@ private[spark] class Executor( // Set the classloader for serializer env.serializer.setDefaultClassLoader(replClassLoader) + // SPARK-21928. SerializerManager's internal instance of Kryo might get used in netty threads + // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too. + env.serializerManager.setDefaultClassLoader(replClassLoader) // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index bb7ed8709ba8a..311383e7ea2bd 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -41,6 +41,10 @@ private[spark] class SerializerManager( private[this] val kryoSerializer = new KryoSerializer(conf) + def setDefaultClassLoader(classLoader: ClassLoader): Unit = { + kryoSerializer.setDefaultClassLoader(classLoader) + } + private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]] private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { val primitiveClassTags = Set[ClassTag[_]]( From 20e3585eac16ef3bfe403ec23f57a2705ff47ecb Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 19 Sep 2017 15:49:18 -0500 Subject: [PATCH 2/2] fix tests --- .../test/scala/org/apache/spark/executor/ExecutorSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala index 884a2750e621d..105a178f2d94e 100644 --- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.rdd.RDD import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription} -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.UninterruptibleThread @@ -234,6 +234,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug val mockMemoryManager = mock[MemoryManager] when(mockEnv.conf).thenReturn(conf) when(mockEnv.serializer).thenReturn(serializer) + when(mockEnv.serializerManager).thenReturn(mock[SerializerManager]) when(mockEnv.rpcEnv).thenReturn(mockRpcEnv) when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem) when(mockEnv.memoryManager).thenReturn(mockMemoryManager)