Skip to content

Commit 850833f

Browse files
HeartSaVioRHyukjinKwon
authored andcommitted
[SPARK-29046][SQL] Fix NPE in SQLConf.get when active SparkContext is stopping
# What changes were proposed in this pull request? This patch fixes the bug regarding NPE in SQLConf.get, which is only possible when SparkContext._dagScheduler is null due to stopping SparkContext. The logic doesn't seem to consider active SparkContext could be in progress of stopping. Note that it can't be encountered easily as `SparkContext.stop()` blocks the main thread, but there're many cases which SQLConf.get is accessed concurrently while SparkContext.stop() is executing - users run another threads, or listener is accessing SQLConf.get after dagScheduler is set to null (this is the case what I encountered.) ### Why are the changes needed? The bug brings NPE. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Added new UT to verify NPE doesn't occur. Without patch, the test fails with throwing NPE. Closes #25753 from HeartSaVioR/SPARK-29046. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 8f632d7 commit 850833f

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,8 @@ object SQLConf {
139139
}
140140
} else {
141141
val isSchedulerEventLoopThread = SparkContext.getActive
142-
.map(_.dagScheduler.eventProcessLoop.eventThread)
142+
.flatMap { sc => Option(sc.dagScheduler) }
143+
.map(_.eventProcessLoop.eventThread)
143144
.exists(_.getId == Thread.currentThread().getId)
144145
if (isSchedulerEventLoopThread) {
145146
// DAGScheduler event loop thread does not have an active SparkSession, the `confGetter`

sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.internal
1919

2020
import org.apache.hadoop.fs.Path
2121

22+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext}
2223
import org.apache.spark.sql._
2324
import org.apache.spark.sql.internal.StaticSQLConf._
2425
import org.apache.spark.sql.test.{SharedSparkSession, TestSQLContext}
@@ -320,4 +321,22 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
320321
assert(e2.getMessage.contains("spark.sql.shuffle.partitions"))
321322
}
322323

324+
test("SPARK-29046: SQLConf.get shouldn't throw NPE when active SparkContext is stopping") {
325+
// Logically, there's only one case SQLConf.get throws NPE: there's active SparkContext,
326+
// but SparkContext is stopping - especially it sets dagScheduler as null.
327+
328+
val oldSparkContext = SparkContext.getActive
329+
Utils.tryWithSafeFinally {
330+
// this is necessary to set new SparkContext as active: it cleans up active SparkContext
331+
oldSparkContext.foreach(_ => SparkContext.clearActiveContext())
332+
333+
val conf = new SparkConf().setAppName("test").setMaster("local")
334+
LocalSparkContext.withSpark(new SparkContext(conf)) { sc =>
335+
sc.dagScheduler = null
336+
SQLConf.get
337+
}
338+
} {
339+
oldSparkContext.orElse(Some(null)).foreach(SparkContext.setActiveContext)
340+
}
341+
}
323342
}

0 commit comments

Comments
 (0)