Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ object SparkSession extends Logging {
* SparkSession exists, the method creates a new SparkSession and assigns the
* newly created SparkSession as the global default.
*
* In case an existing SparkSession is returned, the config options specified in
* In case an existing SparkSession is returned, the non-static config options specified in
* this builder will be applied to the existing SparkSession.
*
* @since 2.0.0
Expand All @@ -905,10 +905,7 @@ object SparkSession extends Logging {
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
applyModifiableSettings(session)
return session
}

Expand All @@ -917,10 +914,7 @@ object SparkSession extends Logging {
// If the current thread does not have an active session, get it from the global session.
session = defaultSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
if (options.nonEmpty) {
logWarning("Using an existing SparkSession; some configuration may not take effect.")
}
applyModifiableSettings(session)
return session
}

Expand Down Expand Up @@ -959,6 +953,22 @@ object SparkSession extends Logging {

return session
}

private def applyModifiableSettings(session: SparkSession): Unit = {
val (staticConfs, otherConfs) =
options.partition(kv => SQLConf.staticConfKeys.contains(kv._1))

otherConfs.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }

if (staticConfs.nonEmpty) {
logWarning("Using an existing SparkSession; the static sql configurations will not take" +
" effect.")
}
if (otherConfs.nonEmpty) {
logWarning("Using an existing SparkSession; some spark core configurations may not take" +
" effect.")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
import org.apache.spark.sql.internal.StaticSQLConf._

/**
* Test cases for the builder pattern of [[SparkSession]].
Expand Down Expand Up @@ -168,4 +168,51 @@ class SparkSessionBuilderSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(session.sessionState.conf.getConfString("spark.app.name") === "test-app-SPARK-31234")
assert(session.sessionState.conf.getConf(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31234")
}

test("SPARK-31532: should not propagate static sql configs to the existing" +
" active/default SparkSession") {
val session = SparkSession.builder()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just in case, could we add tests for the case where we should set static configs at SparkSession (In case that no active/default SparkSession exists)? Or, we already have such a test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a new test for such a case, that is, if SparkContext instance exists but no SparkSession exists, then the static configs remain changeable before SparkSession is finally created.

.master("local")
.config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532")
.config("spark.app.name", "test-app-SPARK-31532")
.getOrCreate()
// do not propagate static sql configs to the existing active session
val session1 = SparkSession
.builder()
.config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-1")
.getOrCreate()
assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
assert(session1.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")

// do not propagate static sql configs to the existing default session
SparkSession.clearActiveSession()
val session2 = SparkSession
.builder()
.config(WAREHOUSE_PATH.key, "SPARK-31532-db")
.config(GLOBAL_TEMP_DATABASE.key, value = "globalTempDB-SPARK-31532-2")
.getOrCreate()

assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db"))
assert(session.conf.get(WAREHOUSE_PATH) === session2.conf.get(WAREHOUSE_PATH))
assert(session2.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532")
}

test("SPARK-31532: propagate static sql configs if no existing SparkSession") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-app-SPARK-31532-2")
.set(GLOBAL_TEMP_DATABASE.key, "globaltempdb-spark-31532")
.set(WAREHOUSE_PATH.key, "SPARK-31532-db")
SparkContext.getOrCreate(conf)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this blank

// propagate static sql configs if no existing session
val session = SparkSession
.builder()
.config(GLOBAL_TEMP_DATABASE.key, "globalTempDB-SPARK-31532-2")
.config(WAREHOUSE_PATH.key, "SPARK-31532-db-2")
.getOrCreate()
assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2")
assert(session.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532-2")
assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2")
}
}