Skip to content

Commit ac9c053

Browse files
yaooqinncloud-fan
authored andcommitted
[SPARK-26794][SQL] SparkSession enableHiveSupport does not point to hive but in-memory while the SparkContext exists
## What changes were proposed in this pull request? ```java public class SqlDemo { public static void main(final String[] args) throws Exception { SparkConf conf = new SparkConf().setAppName("spark-sql-demo"); JavaSparkContext sc = new JavaSparkContext(conf); SparkSession ss = SparkSession.builder().enableHiveSupport().getOrCreate(); ss.sql("show databases").show(); } } ``` Before https://issues.apache.org/jira/browse/SPARK-20946, the demo above point to the right hive metastore if the hive-site.xml is present. But now it can only point to the default in-memory one. Catalog is now as a variable shared across SparkSessions, it is instantiated with SparkContext's conf. After https://issues.apache.org/jira/browse/SPARK-20946, Session level configs are not pass to SparkContext's conf anymore, so the enableHiveSupport API takes no affect on the catalog instance. You can set spark.sql.catalogImplementation=hive application wide to solve the problem, or never create a sc before you call SparkSession.builder().enableHiveSupport().getOrCreate() Here we respect the SparkSession level configuration at the first time to generate catalog within SharedState ## How was this patch tested? 1. add ut 2. manually ```scala test("enableHiveSupport has right to determine the catalog while using an existing sc") { val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") val sc = SparkContext.getOrCreate(conf) val ss = SparkSession.builder().enableHiveSupport().getOrCreate() assert(ss.sharedState.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], "The catalog should be hive ") val ss2 = SparkSession.builder().getOrCreate() assert(ss2.sharedState.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], "The catalog should be shared across sessions") } ``` Without this fix, the above test will fail. You can apply it to `org.apache.spark.sql.hive.HiveSharedStateSuite`, and run, ```sbt ./build/sbt -Phadoop-2.7 -Phive "hive/testOnly org.apache.spark.sql.hive.HiveSharedStateSuite" ``` to verify. Closes #23709 from yaooqinn/SPARK-26794. Authored-by: Kent Yao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent f34b872 commit ac9c053

File tree

4 files changed

+99
-8
lines changed

4 files changed

+99
-8
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ class SparkSession private(
127127
@Unstable
128128
@transient
129129
lazy val sharedState: SharedState = {
130-
existingSharedState.getOrElse(new SharedState(sparkContext))
130+
existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions))
131131
}
132132

133133
/**

sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
3939

4040
/**
4141
* A class that holds all state shared across sessions in a given [[SQLContext]].
42+
*
43+
* @param sparkContext The Spark context associated with this SharedState
44+
* @param initialConfigs The configs from the very first created SparkSession
4245
*/
43-
private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
46+
private[sql] class SharedState(
47+
val sparkContext: SparkContext,
48+
initialConfigs: scala.collection.Map[String, String])
49+
extends Logging {
4450

4551
// Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
4652
// the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf.
@@ -77,6 +83,27 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
7783
}
7884
logInfo(s"Warehouse path is '$warehousePath'.")
7985

86+
// These 2 variables should be initiated after `warehousePath`, because in the first place we need
87+
// to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into
88+
// both spark conf and hadoop conf avoiding be affected by any SparkSession level options
89+
private val (conf, hadoopConf) = {
90+
val confClone = sparkContext.conf.clone()
91+
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
92+
// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
93+
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
94+
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
95+
initialConfigs.foreach {
96+
case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key =>
97+
logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " +
98+
s"in SparkSession's options, it should be set statically for cross-session usages")
99+
case (k, v) =>
100+
logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v")
101+
confClone.set(k, v)
102+
hadoopConfClone.set(k, v)
103+
104+
}
105+
(confClone, hadoopConfClone)
106+
}
80107

81108
/**
82109
* Class for caching query results reused in future executions.
@@ -89,7 +116,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
89116
*/
90117
val statusStore: SQLAppStatusStore = {
91118
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
92-
val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
119+
val listener = new SQLAppStatusListener(conf, kvStore, live = true)
93120
sparkContext.listenerBus.addToStatusQueue(listener)
94121
val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
95122
sparkContext.ui.foreach(new SQLTab(statusStore, _))
@@ -101,9 +128,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
101128
*/
102129
lazy val externalCatalog: ExternalCatalogWithListener = {
103130
val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
104-
SharedState.externalCatalogClassName(sparkContext.conf),
105-
sparkContext.conf,
106-
sparkContext.hadoopConfiguration)
131+
SharedState.externalCatalogClassName(conf), conf, hadoopConf)
107132

108133
val defaultDbDefinition = CatalogDatabase(
109134
SessionCatalog.DEFAULT_DATABASE,
@@ -137,7 +162,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
137162
// System preserved database should not exists in metastore. However it's hard to guarantee it
138163
// for every session, because case-sensitivity differs. Here we always lowercase it to make our
139164
// life easier.
140-
val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
165+
val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
141166
if (externalCatalog.databaseExists(globalTempDB)) {
142167
throw new SparkException(
143168
s"$globalTempDB is a system preserved database, please rename your existing database " +

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog(
9090
private[hive] class TestHiveSharedState(
9191
sc: SparkContext,
9292
hiveClient: Option[HiveClient] = None)
93-
extends SharedState(sc) {
93+
extends SharedState(sc, initialConfigs = Map.empty[String, String]) {
9494

9595
override lazy val externalCatalog: ExternalCatalogWithListener = {
9696
new ExternalCatalogWithListener(new TestHiveExternalCatalog(
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive
19+
20+
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
21+
22+
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
23+
import org.apache.spark.sql.internal.SharedState
24+
import org.apache.spark.sql.internal.StaticSQLConf._
25+
import org.apache.spark.util.Utils
26+
27+
class HiveSharedStateSuite extends SparkFunSuite {
28+
29+
test("initial configs should be passed to SharedState but not SparkContext") {
30+
val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
31+
val sc = SparkContext.getOrCreate(conf)
32+
val invalidPath = "invalid/path"
33+
val metastorePath = Utils.createTempDir()
34+
val tmpDb = "tmp_db"
35+
36+
// The initial configs used to generate SharedState, none of these should affect the global
37+
// shared SparkContext's configurations. Especially, all these configs are passed to the cloned
38+
// confs inside SharedState except metastore warehouse dir.
39+
val initialConfigs = Map("spark.foo" -> "bar",
40+
WAREHOUSE_PATH.key -> invalidPath,
41+
ConfVars.METASTOREWAREHOUSE.varname -> invalidPath,
42+
CATALOG_IMPLEMENTATION.key -> "hive",
43+
ConfVars.METASTORECONNECTURLKEY.varname ->
44+
s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true",
45+
GLOBAL_TEMP_DATABASE.key -> tmpDb)
46+
47+
val state = new SharedState(sc, initialConfigs)
48+
assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options")
49+
assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
50+
"warehouse conf in session options can't affect application wide spark conf")
51+
assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath,
52+
"warehouse conf in session options can't affect application wide hadoop conf")
53+
54+
assert(!state.sparkContext.conf.contains("spark.foo"),
55+
"static spark conf should not be affected by session")
56+
assert(state.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog],
57+
"Initial SparkSession options can determine the catalog")
58+
val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
59+
assert(client.getConf("spark.foo", "") === "bar",
60+
"session level conf should be passed to catalog")
61+
assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, invalidPath) !== invalidPath,
62+
"session level conf should be passed to catalog except warehouse dir")
63+
64+
assert(state.globalTempViewManager.database === tmpDb)
65+
}
66+
}

0 commit comments

Comments
 (0)