Skip to content

Commit 4266ee0

Browse files
committed
[SPARK-29046] Fix NPE in SQLConf.get when active SparkContext is stopping
1 parent 86fc890 commit 4266ee0

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ object SQLConf {
139139
}
140140
} else {
141141
val isSchedulerEventLoopThread = SparkContext.getActive
142-
.map(_.dagScheduler.eventProcessLoop.eventThread)
142+
.flatMap(sc => Option(sc.dagScheduler))
143+
.map(_.eventProcessLoop.eventThread)
143144
.exists(_.getId == Thread.currentThread().getId)
144145
if (isSchedulerEventLoopThread) {
145146
// DAGScheduler event loop thread does not have an active SparkSession, the `confGetter`

sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.internal
1919

2020
import org.apache.hadoop.fs.Path
2121

22+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
2223
import org.apache.spark.sql._
2324
import org.apache.spark.sql.internal.StaticSQLConf._
2425
import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext}
@@ -320,4 +321,22 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
320321
assert(e2.getMessage.contains("spark.sql.shuffle.partitions"))
321322
}
322323

324+
test("SPARK-29046: SQLConf.get shouldn't throw NPE when active SparkContext is stopping") {
325+
// Logically, there's only one case SQLConf.get throws NPE: there's active SparkContext,
326+
// but SparkContext is stopping - especially it sets dagScheduler as null.
327+
328+
val oldSparkContext = SparkContext.getActive
329+
Utils.tryWithSafeFinally {
330+
// this is necessary to set new SparkContext as active: it cleans up active SparkContext
331+
oldSparkContext.foreach(_ => SparkContext.clearActiveContext())
332+
333+
val conf = new SparkConf().setAppName("test").setMaster("local")
334+
LocalSparkContext.withSpark(new SparkContext(conf)) { sc =>
335+
sc.dagScheduler = null
336+
SQLConf.get
337+
}
338+
} {
339+
oldSparkContext.orElse(Some(null)).foreach(SparkContext.setActiveContext)
340+
}
341+
}
323342
}

0 commit comments

Comments
 (0)