From 4266ee0e3295f3692ccb9f5e36cda14c7b0a14d3 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 11 Sep 2019 16:33:57 +0900 Subject: [PATCH 1/2] [SPARK-29046] Fix NPE in SQLConf.get when active SparkContext is stopping --- .../apache/spark/sql/internal/SQLConf.scala | 3 ++- .../spark/sql/internal/SQLConfSuite.scala | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index d9b0a72618c7e..e1d41191a52ed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -139,7 +139,8 @@ object SQLConf { } } else { val isSchedulerEventLoopThread = SparkContext.getActive - .map(_.dagScheduler.eventProcessLoop.eventThread) + .flatMap(sc => Option(sc.dagScheduler)) + .map(_.eventProcessLoop.eventThread) .exists(_.getId == Thread.currentThread().getId) if (isSchedulerEventLoopThread) { // DAGScheduler event loop thread does not have an active SparkSession, the `confGetter` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 1dfbca64f5778..e698ac032cd82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.internal import org.apache.hadoop.fs.Path +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.internal.StaticSQLConf._ import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext} @@ -320,4 +321,22 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { assert(e2.getMessage.contains("spark.sql.shuffle.partitions")) } + test("SPARK-29046: SQLConf.get shouldn't throw NPE when active SparkContext is stopping") { + // Logically, there's only one case SQLConf.get throws NPE: there's active SparkContext, + // but SparkContext is stopping - especially it sets dagScheduler as null. + + val oldSparkContext = SparkContext.getActive + Utils.tryWithSafeFinally { + // this is necessary to set new SparkContext as active: it cleans up active SparkContext + oldSparkContext.foreach(_ => SparkContext.clearActiveContext()) + + val conf = new SparkConf().setAppName("test").setMaster("local") + LocalSparkContext.withSpark(new SparkContext(conf)) { sc => + sc.dagScheduler = null + SQLConf.get + } + } { + oldSparkContext.orElse(Some(null)).foreach(SparkContext.setActiveContext) + } + } } From a1472e83f893c7a171b3b00a75814538246c0720 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 12 Sep 2019 05:35:03 +0900 Subject: [PATCH 2/2] Reflect review comment --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e1d41191a52ed..4f3e39ad49afe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -139,7 +139,7 @@ object SQLConf { } } else { val isSchedulerEventLoopThread = SparkContext.getActive - .flatMap(sc => Option(sc.dagScheduler)) + .flatMap { sc => Option(sc.dagScheduler) } .map(_.eventProcessLoop.eventThread) .exists(_.getId == Thread.currentThread().getId) if (isSchedulerEventLoopThread) {