-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-31532][SQL] Builder should not propagate static sql configs to the existing active or default SparkSession #28316
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… the existing active or default SparkSession
|
cc: @cloud-fan @dongjoon-hyun @maropu @HyukjinKwon many thanks. |
|
Test build #121688 has finished for PR 28316 at commit
|
| var session = activeThreadSession.get() | ||
| if ((session ne null) && !session.sparkContext.isStopped) { | ||
| options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } | ||
| for ((k, v) <- options if !SQLConf.staticConfKeys.contains(k)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we have clearer warning message than configuration may not take effect if options has any static config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 too. we can warn static SQL separately. But do we still need a general warning messages for core configurations?
e.g.
for ((k, v) <- options) {
if (SQLConf.staticConfKeys.contains(k)) {
logWarning(s"Using an existing SparkSession, the static configuration $k is ignored.")
} else {
session.sessionState.conf.setConfString(k, v)
}
}
if ((options -- SQLConf.staticConfKeys.asScala).nonEmpty) {
logWarning("Using an existing SparkSession; some spark core configurations may not take" +
" effect.")
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 too. we can warn static SQL separately. But do we still need a general warning messages for core configurations?
e.g.
for ((k, v) <- options) {
if (SQLConf.staticConfKeys.contains(k)) {
logWarning(s"Using an existing SparkSession, the static configuration $k is ignored.")
} else {
session.sessionState.conf.setConfString(k, v)
}
}
if ((options -- SQLConf.staticConfKeys.asScala).nonEmpty) {
logWarning("Using an existing SparkSession; some spark core configurations may not take" +
" effect.")
}| assert(!session.conf.get(WAREHOUSE_PATH).contains("SPARK-31532-db")) | ||
| assert(session.conf.get(WAREHOUSE_PATH) === session2.conf.get(WAREHOUSE_PATH)) | ||
| assert(session2.conf.get(GLOBAL_TEMP_DATABASE) === "globaltempdb-spark-31532") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this blank
|
Nice catch! Looks fine. |
|
|
||
| test("SPARK-31532: should not propagate static sql configs to the existing" + | ||
| " active/default SparkSession") { | ||
| val session = SparkSession.builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just in case, could we add tests for the case where we should set static configs at SparkSession (In case that no active/default SparkSession exists)? Or, we already have such a test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add a new test for such a case, that is, if SparkContext instance exists but no SparkSession exists, then the static configs remain changeable before SparkSession is finally created.
| } | ||
|
|
||
| private def applyModifiableSettings(session: SparkSession): Unit = { | ||
| for ((k, v) <- options) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about
val (staticConfs, otherConfs) = options.partition(kv => SQLConf.staticConfKeys.contains(kv._1))
if (staticConfs.nonEmpty) warn...
if (otherConfs.nonEmpty) warn...
|
Test build #121714 has finished for PR 28316 at commit
|
|
Test build #121712 has finished for PR 28316 at commit
|
|
retest this please |
|
Test build #121730 has finished for PR 28316 at commit
|
|
retest this please |
|
Test build #121761 has finished for PR 28316 at commit
|
… the existing active or default SparkSession
### What changes were proposed in this pull request?
SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession
This seems a long-standing bug.
```scala
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
|spark.sql.warehou...|file:/Users/kenty...|
+--------------------+--------------------+
scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir;
at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)
at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100)
at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
... 47 elided
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get
getClass getOrCreate
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+-----+
| key|value|
+--------------------+-----+
|spark.sql.warehou...| xyz|
+--------------------+-----+
scala>
OptionsAttachments
```
### Why are the changes needed?
bugfix as shown in the previous section
### Does this PR introduce any user-facing change?
Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances.
### How was this patch tested?
new ut.
Closes #28316 from yaooqinn/SPARK-31532.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
(cherry picked from commit 8424f55)
Signed-off-by: Takeshi Yamamuro <[email protected]>
… the existing active or default SparkSession
### What changes were proposed in this pull request?
SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession
This seems a long-standing bug.
```scala
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
|spark.sql.warehou...|file:/Users/kenty...|
+--------------------+--------------------+
scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir;
at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)
at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100)
at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
... 47 elided
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get
getClass getOrCreate
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+-----+
| key|value|
+--------------------+-----+
|spark.sql.warehou...| xyz|
+--------------------+-----+
scala>
OptionsAttachments
```
### Why are the changes needed?
bugfix as shown in the previous section
### Does this PR introduce any user-facing change?
Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances.
### How was this patch tested?
new ut.
Closes #28316 from yaooqinn/SPARK-31532.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
(cherry picked from commit 8424f55)
Signed-off-by: Takeshi Yamamuro <[email protected]>
|
Thanks, @yaooqinn @cloud-fan ! Merged to master/3.0/2.4 according to SPARK-31532. |
|
Thank you for pinging me, @maropu . |
|
We need SPARK-28179 (#24979) to fix the failure, but it will be a behavior change in cc @gatorsmile , @HyukjinKwon , @srowen |
|
Oops, I forgot the Xiao's comment #28262 (comment) ... |
|
In |
|
I took a look. It looks okay to change the conf itself. |
|
Could you make a follow-up PR with the following? |
|
Yea, ok. I'll do now. |
|
Technically, the difference is the stored value in |
|
Ping the related people including me on the follow-PR. Thanks! |
|
Ah.. There is an old test case using |
|
Please update them together. And, let's see the test result. |
…BAL_TEMP_DATABASE config in SparkSessionBuilderSuite ### What changes were proposed in this pull request? This PR intends to fix test code for using lowercases for the `GLOBAL_TEMP_DATABASE` config in `SparkSessionBuilderSuite`. The handling of the config is different between branch-3.0+ and branch-2.4. In branch-3.0+, Spark always lowercases a value in the config, so I think we had better always use lowercases for it in the test. This comes from the dongjoon-hyun comment: #28316 (comment) ### Why are the changes needed? To fix the test failure in branch-2.4. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Fixed the test. Closes #28339 from maropu/SPARK-31532. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
… the existing active or default SparkSession
### What changes were proposed in this pull request?
SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession
This seems a long-standing bug.
```scala
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+--------------------+
| key| value|
+--------------------+--------------------+
|spark.sql.warehou...|file:/Users/kenty...|
+--------------------+--------------------+
scala> spark.sql("set spark.sql.warehouse.dir=2");
org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.sql.warehouse.dir;
at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:154)
at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42)
at org.apache.spark.sql.execution.command.SetCommand.$anonfun$x$7$6(SetCommand.scala:100)
at org.apache.spark.sql.execution.command.SetCommand.run(SetCommand.scala:156)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
... 47 elided
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").get
getClass getOrCreate
scala> SparkSession.builder.config("spark.sql.warehouse.dir", "xyz").getOrCreate
20/04/23 23:49:13 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
res7: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession6403d574
scala> spark.sql("set spark.sql.warehouse.dir").show
+--------------------+-----+
| key|value|
+--------------------+-----+
|spark.sql.warehou...| xyz|
+--------------------+-----+
scala>
OptionsAttachments
```
### Why are the changes needed?
bugfix as shown in the previous section
### Does this PR introduce any user-facing change?
Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances.
### How was this patch tested?
new ut.
Closes apache#28316 from yaooqinn/SPARK-31532.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
(cherry picked from commit 8424f55)
Signed-off-by: Takeshi Yamamuro <[email protected]>
What changes were proposed in this pull request?
SparkSessionBuilder shoud not propagate static sql configurations to the existing active/default SparkSession
This seems a long-standing bug.
Why are the changes needed?
bugfix as shown in the previous section
Does this PR introduce any user-facing change?
Yes, static SQL configurations with SparkSession.builder.config do not propagate to any existing or new SparkSession instances.
How was this patch tested?
new ut.