Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext:
val ctx = if (hiveContext.hiveThriftServerSingleSession) {
hiveContext
} else {
hiveContext.newSession()
hiveContext.newSession(username)
}
ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion)
sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,16 @@ class HiveContext private[hive](
* and Hive client (both of execution and metadata) with existing HiveContext.
*/
override def newSession(): HiveContext = {
newSession()
}

def newSession(userName: String = null): HiveContext = {
new HiveContext(
sc = sc,
cacheManager = cacheManager,
listener = listener,
execHive = executionHive.newSession(),
metaHive = metadataHive.newSession(),
execHive = executionHive.newSession(userName),
metaHive = metadataHive.newSession(userName),
isRootContext = false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private[hive] trait HiveClient {
def addJar(path: String): Unit

/** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */
def newSession(): HiveClient
def newSession(userName: String = null): HiveClient

/** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */
def withHiveState[A](f: => A): A
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.apache.spark.util.{CircularBuffer, Utils}
* this [[HiveClientImpl]].
*/
private[hive] class HiveClientImpl(
val userName: String = null,
override val version: HiveVersion,
config: Map[String, String],
initClassLoader: ClassLoader,
Expand Down Expand Up @@ -119,13 +120,14 @@ private[hive] class HiveClientImpl(
}
initialConf.set(k, v)
}
val state = new SessionState(initialConf)
val state = new SessionState(initialConf, userName)
if (clientLoader.cachedHive != null) {
Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
}
SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, "UTF-8")
state.err = new PrintStream(outputBuffer, true, "UTF-8")
state.setIsHiveServerQuery(true)
state
} finally {
Thread.currentThread().setContextClassLoader(original)
Expand Down Expand Up @@ -412,13 +414,15 @@ private[hive] class HiveClientImpl(
*/
protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState {
logDebug(s"Running hiveql '$cmd'")
if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") }
if (cmd.toLowerCase.startsWith("set") && !cmd.toLowerCase.startsWith("set role ")) {
logDebug(s"Changing config: $cmd")
}
try {
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
// The remainder of the command.
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
val proc = shim.getCommandProcessor(tokens(0), conf)
val proc = shim.getCommandProcessor(tokens, conf)
proc match {
case driver: Driver =>
val response: CommandProcessorResponse = driver.run(cmd)
Expand Down Expand Up @@ -521,8 +525,8 @@ private[hive] class HiveClientImpl(
runSqlHive(s"ADD JAR $path")
}

def newSession(): HiveClientImpl = {
clientLoader.createClient().asInstanceOf[HiveClientImpl]
def newSession(userName: String = null): HiveClientImpl = {
clientLoader.createClient(userName).asInstanceOf[HiveClientImpl]
}

def reset(): Unit = withHiveState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[client] sealed abstract class Shim {

def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition]

def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor

def getDriverResults(driver: Driver): Seq[String]

Expand Down Expand Up @@ -213,8 +213,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
getAllPartitions(hive, table)
}

override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]
override def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, token(0), conf).asInstanceOf[CommandProcessor]

override def getDriverResults(driver: Driver): Seq[String] = {
val res = new JArrayList[String]()
Expand Down Expand Up @@ -357,8 +357,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
partitions.asScala.toSeq
}

override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor]
override def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]

override def getDriverResults(driver: Driver): Seq[String] = {
val res = new JArrayList[Object]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,9 @@ private[hive] class IsolatedClientLoader(
}

/** The isolated client interface to Hive. */
private[hive] def createClient(): HiveClient = {
private[hive] def createClient(userName: String = null): HiveClient = {
if (!isolationOn) {
return new HiveClientImpl(version, config, baseClassLoader, this)
return new HiveClientImpl(userName, version, config, baseClassLoader, this)
}
// Pre-reflective instantiation setup.
logDebug("Initializing the logger to avoid disaster...")
Expand All @@ -246,7 +246,7 @@ private[hive] class IsolatedClientLoader(
classLoader
.loadClass(classOf[HiveClientImpl].getName)
.getConstructors.head
.newInstance(version, config, classLoader, this)
.newInstance(userName, version, config, classLoader, this)
.asInstanceOf[HiveClient]
} catch {
case e: InvocationTargetException =>
Expand Down