diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 179dab11a2b5..4df100c2a830 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -34,6 +34,8 @@ trait CatalystConf { def runSQLonFile: Boolean + def warehousePath: String + /** * Returns the [[Resolver]] for the current configuration, which can be used to determine if two * identifiers are equal. @@ -52,5 +54,6 @@ case class SimpleCatalystConf( optimizerMaxIterations: Int = 100, optimizerInSetConversionThreshold: Int = 10, maxCaseBranchesForCodegen: Int = 20, - runSQLonFile: Boolean = true) + runSQLonFile: Boolean = true, + warehousePath: String = "/user/hive/warehouse") extends CatalystConf diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index d7fd54308af5..b06f24bc4866 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -125,7 +125,7 @@ class SessionCatalog( } def getDefaultDBPath(db: String): String = { - System.getProperty("java.io.tmpdir") + File.separator + db + ".db" + new Path(new Path(conf.warehousePath), db + ".db").toString } // ---------------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6fbf32676f5a..115317effc76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -645,6 +645,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH) + def warehousePath: String = getConfString("hive.metastore.warehouse.dir", "/user/hive/warehouse") + override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL) override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 24a25023a6e3..3622220ad313 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -34,7 +34,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.hive.HiveSessionState +import org.apache.spark.sql.hive.HiveSharedState import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab import org.apache.spark.sql.internal.SQLConf @@ -56,7 +56,7 @@ object HiveThriftServer2 extends Logging { @DeveloperApi def startWithContext(sqlContext: SQLContext): Unit = { val server = new HiveThriftServer2(sqlContext) - server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) + server.init(SparkSQLEnv.sqlContext.sharedState.asInstanceOf[HiveSharedState].executionHive.conf) server.start() listener = new HiveThriftServer2Listener(server, sqlContext.conf) sqlContext.sparkContext.addSparkListener(listener) @@ -84,7 +84,8 @@ object HiveThriftServer2 extends Logging { try { val server = new HiveThriftServer2(SparkSQLEnv.sqlContext) - server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf) + server.init( + SparkSQLEnv.sqlContext.sharedState.asInstanceOf[HiveSharedState].executionHive.conf) server.start() logInfo("HiveThriftServer2 started") listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 268ba2f0bca7..665a44e51a0c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -60,13 +60,7 @@ private[hive] object SparkSQLEnv extends Logging { sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8")) sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8")) sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8")) - sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion) - - if (log.isDebugEnabled) { - sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted - .foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") } - } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index f70131ec8666..456587e0e081 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} @@ -46,8 +44,7 @@ private[sql] class HiveSessionCatalog( sparkSession: SparkSession, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, - conf: SQLConf, - hiveconf: HiveConf) + conf: SQLConf) extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) { override def setCurrentDatabase(db: String): Unit = { @@ -73,11 +70,6 @@ private[sql] class HiveSessionCatalog( // | Methods and fields for interacting with HiveMetastoreCatalog | // ---------------------------------------------------------------- - override def getDefaultDBPath(db: String): String = { - val defaultPath = hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE) - new Path(new Path(defaultPath), db + ".db").toString - } - // Catalog for handling data source tables. TODO: This really doesn't belong here since it is // essentially a cache for metastore tables. However, it relies on a lot of session-specific // things so it would be a lot of work to split its functionality between HiveSessionCatalog diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index f071df75816e..712eecf802b5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -78,8 +78,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) sparkSession, functionResourceLoader, functionRegistry, - conf, - hiveconf) + conf) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 1d8f24cb27dc..37113b66f563 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -136,7 +136,8 @@ private[hive] class TestHiveSparkSession( } @transient - override lazy val sessionState: TestHiveSessionState = new TestHiveSessionState(self) + override lazy val sessionState: TestHiveSessionState = + new TestHiveSessionState(self, warehousePath) override def newSession(): TestHiveSparkSession = { new TestHiveSparkSession( @@ -156,19 +157,8 @@ private[hive] class TestHiveSparkSession( sessionState.hiveconf.set("hive.plan.serialization.format", "javaXML") - // A snapshot of the entries in the starting SQLConf - // We save this because tests can mutate this singleton object if they want - // This snapshot is saved when we create this TestHiveSparkSession. - val initialSQLConf: SQLConf = { - val snapshot = new SQLConf - sessionState.conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) } - snapshot - } - - val testTempDir = Utils.createTempDir() - // For some hive test case which contain ${system:test.tmp.dir} - System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath) + System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath) /** The location of the compiled hive distribution */ lazy val hiveHome = envVarToFile("HIVE_HOME") @@ -526,8 +516,10 @@ private[hive] class TestHiveSharedState( } -private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) - extends HiveSessionState(sparkSession) { +private[hive] class TestHiveSessionState( + sparkSession: TestHiveSparkSession, + warehousePath: File) + extends HiveSessionState(sparkSession) { self => override lazy val conf: SQLConf = { new SQLConf { @@ -535,9 +527,8 @@ private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false) override def clear(): Unit = { super.clear() - TestHiveContext.overrideConfs.map { - case (key, value) => setConfString(key, value) - } + TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) } + setConfString("hive.metastore.warehouse.dir", self.warehousePath.toURI.toString) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala deleted file mode 100644 index b2c0f7e0e57b..000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveContextSuite.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.hive - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.hive.test.TestHive - - -class HiveContextSuite extends SparkFunSuite { - - test("HiveContext can access `spark.sql.*` configs") { - // Avoid creating another SparkContext in the same JVM - val sc = TestHive.sparkContext - require(sc.conf.get("spark.sql.hive.metastore.barrierPrefixes") == - "org.apache.spark.sql.hive.execution.PairSerDe") - assert(TestHive.sparkSession.initialSQLConf.getConfString( - "spark.sql.hive.metastore.barrierPrefixes") == - "org.apache.spark.sql.hive.execution.PairSerDe") - // This setting should be also set in the hiveconf of the current session. - assert(TestHive.sessionState.hiveconf.get( - "spark.sql.hive.metastore.barrierPrefixes", "") == - "org.apache.spark.sql.hive.execution.PairSerDe") - } - -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala deleted file mode 100644 index ac3a65032fb0..000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SerializationSuite.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.serializer.JavaSerializer - -class SerializationSuite extends SparkFunSuite { - - test("[SPARK-5840] HiveContext should be serializable") { - val hiveContext = org.apache.spark.sql.hive.test.TestHive - hiveContext.sessionState.hiveconf - val serializer = new JavaSerializer(new SparkConf()).newInstance() - val bytes = serializer.serialize(hiveContext) - val deSer = serializer.deserialize[AnyRef](bytes) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index d7f6d18b5ebc..bec75f28a2f1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -1134,51 +1134,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assert(getConf(testKey, "0") == "") } - test("SET commands semantics for a HiveContext") { - // Adapted from its SQL counterpart. - val testKey = "spark.sql.key.usedfortestonly" - val testVal = "test.val.0" - val nonexistentKey = "nonexistent" - def collectResults(df: DataFrame): Set[Any] = - df.collect().map { - case Row(key: String, value: String) => key -> value - case Row(key: String, defaultValue: String, doc: String) => (key, defaultValue, doc) - }.toSet - conf.clear() - - val expectedConfs = conf.getAllDefinedConfs.toSet - assertResult(expectedConfs)(collectResults(sql("SET -v"))) - - // "SET" itself returns all config variables currently specified in SQLConf. - // TODO: Should we be listing the default here always? probably... - assert(sql("SET").collect().size === TestHiveContext.overrideConfs.size) - - val defaults = collectResults(sql("SET")) - assertResult(Set(testKey -> testVal)) { - collectResults(sql(s"SET $testKey=$testVal")) - } - - assert(sessionState.hiveconf.get(testKey, "") === testVal) - assertResult(defaults ++ Set(testKey -> testVal))(collectResults(sql("SET"))) - - sql(s"SET ${testKey + testKey}=${testVal + testVal}") - assert(sessionState.hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(defaults ++ Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { - collectResults(sql("SET")) - } - - // "SET key" - assertResult(Set(testKey -> testVal)) { - collectResults(sql(s"SET $testKey")) - } - - assertResult(Set(nonexistentKey -> "")) { - collectResults(sql(s"SET $nonexistentKey")) - } - - conf.clear() - } - test("current_database with multiple sessions") { sql("create database a") sql("use a")