From 2a4944ffe5836408b80f9aa06e9b28e57aa16649 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Fri, 27 Apr 2018 15:03:14 -0500 Subject: [PATCH 1/2] [SPARK-23894][CORE][SQL] Defensively clear ActiveSession in Executors Because SparkSession.getActiveSession uses an InheritableThreadLocal, the ThreadPool in executors might end up inheriting the SparkSession from a driver thread. This leads to test failures as executors should never have an active SparkSession. So in local mode, defensively clear the active session. --- .../org/apache/spark/executor/Executor.scala | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c325222b764b..04e78076a26a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -229,6 +229,23 @@ private[spark] class Executor( ManagementFactory.getGarbageCollectorMXBeans.asScala.map(_.getCollectionTime).sum } + /** + * Only in local mode, we have to prevent the driver from setting the active SparkSession + * in the executor threads. See SPARK-23894. + */ + lazy val clearActiveSparkSessionMethod = if (Utils.isLocalMaster(conf)) { + try { + val cls = Utils.classForName("org.apache.spark.sql.SparkSession") + Some(cls.getMethod("clearActiveSession")) + } catch { + case _: ClassNotFoundException => + // sql not on the classpath, no problem, we don't need to worry about clearing anything + None + } + } else { + None + } + class TaskRunner( execBackend: ExecutorBackend, private val taskDescription: TaskDescription) @@ -299,6 +316,9 @@ private[spark] class Executor( Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() logInfo(s"Running $taskName (TID $taskId)") + // When running in local mode, we might end up with the active session from the driver set on + // this thread, though we never should, so we defensively clear it. See SPARK-23894. + clearActiveSparkSessionMethod.foreach(_.invoke(null)) execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 var taskStartCpu: Long = 0 From 49783c5e3a6fa835fced52fb9cfb478065c3c044 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 30 Apr 2018 08:57:44 -0500 Subject: [PATCH 2/2] review feedback --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 04e78076a26a..76654406373b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -233,7 +233,7 @@ private[spark] class Executor( * Only in local mode, we have to prevent the driver from setting the active SparkSession * in the executor threads. See SPARK-23894. */ - lazy val clearActiveSparkSessionMethod = if (Utils.isLocalMaster(conf)) { + private lazy val clearActiveSparkSessionMethod = if (Utils.isLocalMaster(conf)) { try { val cls = Utils.classForName("org.apache.spark.sql.SparkSession") Some(cls.getMethod("clearActiveSession"))