Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ trait CatalystConf {

def runSQLonFile: Boolean

def warehousePath: String

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
Expand All @@ -52,5 +54,6 @@ case class SimpleCatalystConf(
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20,
runSQLonFile: Boolean = true)
runSQLonFile: Boolean = true,
warehousePath: String = "/user/hive/warehouse")
extends CatalystConf
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class SessionCatalog(
}

def getDefaultDBPath(db: String): String = {
System.getProperty("java.io.tmpdir") + File.separator + db + ".db"
new Path(new Path(conf.warehousePath), db + ".db").toString
}

// ----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)

def warehousePath: String = getConfString("hive.metastore.warehouse.dir", "/user/hive/warehouse")

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveSessionState
import org.apache.spark.sql.hive.HiveSharedState
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -56,7 +56,7 @@ object HiveThriftServer2 extends Logging {
@DeveloperApi
def startWithContext(sqlContext: SQLContext): Unit = {
val server = new HiveThriftServer2(sqlContext)
server.init(sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
server.init(SparkSQLEnv.sqlContext.sharedState.asInstanceOf[HiveSharedState].executionHive.conf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
Expand Down Expand Up @@ -84,7 +84,8 @@ object HiveThriftServer2 extends Logging {

try {
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
server.init(SparkSQLEnv.sqlContext.sessionState.asInstanceOf[HiveSessionState].hiveconf)
server.init(
SparkSQLEnv.sqlContext.sharedState.asInstanceOf[HiveSharedState].executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ private[hive] object SparkSQLEnv extends Logging {
sessionState.metadataHive.setOut(new PrintStream(System.out, true, "UTF-8"))
sessionState.metadataHive.setInfo(new PrintStream(System.err, true, "UTF-8"))
sessionState.metadataHive.setError(new PrintStream(System.err, true, "UTF-8"))

sqlContext.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)

if (log.isDebugEnabled) {
sessionState.hiveconf.getAllProperties.asScala.toSeq.sorted
.foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") }
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
Expand All @@ -46,8 +44,7 @@ private[sql] class HiveSessionCatalog(
sparkSession: SparkSession,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hiveconf: HiveConf)
conf: SQLConf)
extends SessionCatalog(externalCatalog, functionResourceLoader, functionRegistry, conf) {

override def setCurrentDatabase(db: String): Unit = {
Expand All @@ -73,11 +70,6 @@ private[sql] class HiveSessionCatalog(
// | Methods and fields for interacting with HiveMetastoreCatalog |
// ----------------------------------------------------------------

override def getDefaultDBPath(db: String): String = {
val defaultPath = hiveconf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)
new Path(new Path(defaultPath), db + ".db").toString
}

// Catalog for handling data source tables. TODO: This really doesn't belong here since it is
// essentially a cache for metastore tables. However, it relies on a lot of session-specific
// things so it would be a lot of work to split its functionality between HiveSessionCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
sparkSession,
functionResourceLoader,
functionRegistry,
conf,
hiveconf)
conf)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ private[hive] class TestHiveSparkSession(
}

@transient
override lazy val sessionState: TestHiveSessionState = new TestHiveSessionState(self)
override lazy val sessionState: TestHiveSessionState =
new TestHiveSessionState(self, warehousePath)

override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(
Expand All @@ -156,19 +157,8 @@ private[hive] class TestHiveSparkSession(

sessionState.hiveconf.set("hive.plan.serialization.format", "javaXML")

// A snapshot of the entries in the starting SQLConf
// We save this because tests can mutate this singleton object if they want
// This snapshot is saved when we create this TestHiveSparkSession.
val initialSQLConf: SQLConf = {
val snapshot = new SQLConf
sessionState.conf.getAllConfs.foreach { case (k, v) => snapshot.setConfString(k, v) }
snapshot
}

val testTempDir = Utils.createTempDir()

// For some hive test case which contain ${system:test.tmp.dir}
System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath)
System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath)

/** The location of the compiled hive distribution */
lazy val hiveHome = envVarToFile("HIVE_HOME")
Expand Down Expand Up @@ -526,18 +516,19 @@ private[hive] class TestHiveSharedState(
}


private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession)
extends HiveSessionState(sparkSession) {
private[hive] class TestHiveSessionState(
sparkSession: TestHiveSparkSession,
warehousePath: File)
extends HiveSessionState(sparkSession) { self =>

override lazy val conf: SQLConf = {
new SQLConf {
clear()
override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
override def clear(): Unit = {
super.clear()
TestHiveContext.overrideConfs.map {
case (key, value) => setConfString(key, value)
}
TestHiveContext.overrideConfs.foreach { case (k, v) => setConfString(k, v) }
setConfString("hive.metastore.warehouse.dir", self.warehousePath.toURI.toString)
}
}
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1134,51 +1134,6 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
assert(getConf(testKey, "0") == "")
}

test("SET commands semantics for a HiveContext") {
// Adapted from its SQL counterpart.
val testKey = "spark.sql.key.usedfortestonly"
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"
def collectResults(df: DataFrame): Set[Any] =
df.collect().map {
case Row(key: String, value: String) => key -> value
case Row(key: String, defaultValue: String, doc: String) => (key, defaultValue, doc)
}.toSet
conf.clear()

val expectedConfs = conf.getAllDefinedConfs.toSet
assertResult(expectedConfs)(collectResults(sql("SET -v")))

// "SET" itself returns all config variables currently specified in SQLConf.
// TODO: Should we be listing the default here always? probably...
assert(sql("SET").collect().size === TestHiveContext.overrideConfs.size)

val defaults = collectResults(sql("SET"))
assertResult(Set(testKey -> testVal)) {
collectResults(sql(s"SET $testKey=$testVal"))
}

assert(sessionState.hiveconf.get(testKey, "") === testVal)
assertResult(defaults ++ Set(testKey -> testVal))(collectResults(sql("SET")))

sql(s"SET ${testKey + testKey}=${testVal + testVal}")
assert(sessionState.hiveconf.get(testKey + testKey, "") == testVal + testVal)
assertResult(defaults ++ Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
collectResults(sql("SET"))
}

// "SET key"
assertResult(Set(testKey -> testVal)) {
collectResults(sql(s"SET $testKey"))
}

assertResult(Set(nonexistentKey -> "<undefined>")) {
collectResults(sql(s"SET $nonexistentKey"))
}

conf.clear()
}

test("current_database with multiple sessions") {
sql("create database a")
sql("use a")
Expand Down