diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 1c13a6819fe5..a7bd2ef67eec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -127,7 +127,7 @@ class SparkSession private( @Unstable @transient lazy val sharedState: SharedState = { - existingSharedState.getOrElse(new SharedState(sparkContext)) + existingSharedState.getOrElse(new SharedState(sparkContext, initialSessionOptions)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 5b6160e2b408..3dd2ae61e8cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -39,8 +39,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils} /** * A class that holds all state shared across sessions in a given [[SQLContext]]. + * + * @param sparkContext The Spark context associated with this SharedState + * @param initialConfigs The configs from the very first created SparkSession */ -private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { +private[sql] class SharedState( + val sparkContext: SparkContext, + initialConfigs: scala.collection.Map[String, String]) + extends Logging { // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. @@ -77,6 +83,27 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { } logInfo(s"Warehouse path is '$warehousePath'.") + // These 2 variables should be initiated after `warehousePath`, because in the first place we need + // to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into + // both spark conf and hadoop conf avoiding be affected by any SparkSession level options + private val (conf, hadoopConf) = { + val confClone = sparkContext.conf.clone() + val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) + // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing + // `SharedState`, all `SparkSession` level configurations have higher priority to generate a + // `SharedState` instance. This will be done only once then shared across `SparkSession`s + initialConfigs.foreach { + case (k, _) if k == "hive.metastore.warehouse.dir" || k == WAREHOUSE_PATH.key => + logWarning(s"Not allowing to set ${WAREHOUSE_PATH.key} or hive.metastore.warehouse.dir " + + s"in SparkSession's options, it should be set statically for cross-session usages") + case (k, v) => + logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v") + confClone.set(k, v) + hadoopConfClone.set(k, v) + + } + (confClone, hadoopConfClone) + } /** * Class for caching query results reused in future executions. @@ -89,7 +116,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ val statusStore: SQLAppStatusStore = { val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] - val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true) + val listener = new SQLAppStatusListener(conf, kvStore, live = true) sparkContext.listenerBus.addToStatusQueue(listener) val statusStore = new SQLAppStatusStore(kvStore, Some(listener)) sparkContext.ui.foreach(new SQLTab(statusStore, _)) @@ -101,9 +128,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { */ lazy val externalCatalog: ExternalCatalogWithListener = { val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( - SharedState.externalCatalogClassName(sparkContext.conf), - sparkContext.conf, - sparkContext.hadoopConfiguration) + SharedState.externalCatalogClassName(conf), conf, hadoopConf) val defaultDbDefinition = CatalogDatabase( SessionCatalog.DEFAULT_DATABASE, @@ -137,7 +162,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { // System preserved database should not exists in metastore. However it's hard to guarantee it // for every session, because case-sensitivity differs. Here we always lowercase it to make our // life easier. - val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) + val globalTempDB = conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) if (externalCatalog.databaseExists(globalTempDB)) { throw new SparkException( s"$globalTempDB is a system preserved database, please rename your existing database " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 1db57b76ac24..c45f3e706e9e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog( private[hive] class TestHiveSharedState( sc: SparkContext, hiveClient: Option[HiveClient] = None) - extends SharedState(sc) { + extends SharedState(sc, initialConfigs = Map.empty[String, String]) { override lazy val externalCatalog: ExternalCatalogWithListener = { new ExternalCatalogWithListener(new TestHiveExternalCatalog( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala new file mode 100644 index 000000000000..6e2dcfc04d49 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.hive.conf.HiveConf.ConfVars + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.sql.internal.SharedState +import org.apache.spark.sql.internal.StaticSQLConf._ +import org.apache.spark.util.Utils + +class HiveSharedStateSuite extends SparkFunSuite { + + test("initial configs should be passed to SharedState but not SparkContext") { + val conf = new SparkConf().setMaster("local").setAppName("SharedState Test") + val sc = SparkContext.getOrCreate(conf) + val invalidPath = "invalid/path" + val metastorePath = Utils.createTempDir() + val tmpDb = "tmp_db" + + // The initial configs used to generate SharedState, none of these should affect the global + // shared SparkContext's configurations. Especially, all these configs are passed to the cloned + // confs inside SharedState except metastore warehouse dir. + val initialConfigs = Map("spark.foo" -> "bar", + WAREHOUSE_PATH.key -> invalidPath, + ConfVars.METASTOREWAREHOUSE.varname -> invalidPath, + CATALOG_IMPLEMENTATION.key -> "hive", + ConfVars.METASTORECONNECTURLKEY.varname -> + s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true", + GLOBAL_TEMP_DATABASE.key -> tmpDb) + + val state = new SharedState(sc, initialConfigs) + assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options") + assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath, + "warehouse conf in session options can't affect application wide spark conf") + assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath, + "warehouse conf in session options can't affect application wide hadoop conf") + + assert(!state.sparkContext.conf.contains("spark.foo"), + "static spark conf should not be affected by session") + assert(state.externalCatalog.unwrapped.isInstanceOf[HiveExternalCatalog], + "Initial SparkSession options can determine the catalog") + val client = state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client + assert(client.getConf("spark.foo", "") === "bar", + "session level conf should be passed to catalog") + assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, invalidPath) !== invalidPath, + "session level conf should be passed to catalog except warehouse dir") + + assert(state.globalTempViewManager.database === tmpDb) + } +}