diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index de4e9c62b57a4..586403a7d0501 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -74,7 +74,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, hiveContext: val ctx = if (hiveContext.hiveThriftServerSingleSession) { hiveContext } else { - hiveContext.newSession() + hiveContext.newSession(username) } ctx.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) sparkSqlOperationManager.sessionToContexts += sessionHandle -> ctx diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 05863ae18350d..b33cf3401513c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -100,12 +100,16 @@ class HiveContext private[hive]( * and Hive client (both of execution and metadata) with existing HiveContext. */ override def newSession(): HiveContext = { + newSession() + } + + def newSession(userName: String = null): HiveContext = { new HiveContext( sc = sc, cacheManager = cacheManager, listener = listener, - execHive = executionHive.newSession(), - metaHive = metadataHive.newSession(), + execHive = executionHive.newSession(userName), + metaHive = metadataHive.newSession(userName), isRootContext = false) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index f681cc67041a1..be2719810521f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -185,7 +185,7 @@ private[hive] trait HiveClient { def addJar(path: String): Unit /** Return a [[HiveClient]] as new session, that will share the class loader and Hive client */ - def newSession(): HiveClient + def newSession(userName: String = null): HiveClient /** Run a function within Hive state (SessionState, HiveConf, Hive client and class loader) */ def withHiveState[A](f: => A): A diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index cf1ff55c96fc9..52c5285217060 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -58,6 +58,7 @@ import org.apache.spark.util.{CircularBuffer, Utils} * this [[HiveClientImpl]]. */ private[hive] class HiveClientImpl( + val userName: String = null, override val version: HiveVersion, config: Map[String, String], initClassLoader: ClassLoader, @@ -119,13 +120,14 @@ private[hive] class HiveClientImpl( } initialConf.set(k, v) } - val state = new SessionState(initialConf) + val state = new SessionState(initialConf, userName) if (clientLoader.cachedHive != null) { Hive.set(clientLoader.cachedHive.asInstanceOf[Hive]) } SessionState.start(state) state.out = new PrintStream(outputBuffer, true, "UTF-8") state.err = new PrintStream(outputBuffer, true, "UTF-8") + state.setIsHiveServerQuery(true) state } finally { Thread.currentThread().setContextClassLoader(original) @@ -412,13 +414,15 @@ private[hive] class HiveClientImpl( */ protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = withHiveState { logDebug(s"Running hiveql '$cmd'") - if (cmd.toLowerCase.startsWith("set")) { logDebug(s"Changing config: $cmd") } + if (cmd.toLowerCase.startsWith("set") && !cmd.toLowerCase.startsWith("set role ")) { + logDebug(s"Changing config: $cmd") + } try { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") // The remainder of the command. val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() - val proc = shim.getCommandProcessor(tokens(0), conf) + val proc = shim.getCommandProcessor(tokens, conf) proc match { case driver: Driver => val response: CommandProcessorResponse = driver.run(cmd) @@ -521,8 +525,8 @@ private[hive] class HiveClientImpl( runSqlHive(s"ADD JAR $path") } - def newSession(): HiveClientImpl = { - clientLoader.createClient().asInstanceOf[HiveClientImpl] + def newSession(userName: String = null): HiveClientImpl = { + clientLoader.createClient(userName).asInstanceOf[HiveClientImpl] } def reset(): Unit = withHiveState { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 70c10be25be9f..e5be0085c3561 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -67,7 +67,7 @@ private[client] sealed abstract class Shim { def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition] - def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor + def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor def getDriverResults(driver: Driver): Seq[String] @@ -213,8 +213,8 @@ private[client] class Shim_v0_12 extends Shim with Logging { getAllPartitions(hive, table) } - override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = - getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor] + override def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor = + getCommandProcessorMethod.invoke(null, token(0), conf).asInstanceOf[CommandProcessor] override def getDriverResults(driver: Driver): Seq[String] = { val res = new JArrayList[String]() @@ -357,8 +357,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { partitions.asScala.toSeq } - override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor = - getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor] + override def getCommandProcessor(token: Array[String], conf: HiveConf): CommandProcessor = + getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor] override def getDriverResults(driver: Driver): Seq[String] = { val res = new JArrayList[Object]() diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala index dca7396ee1ab4..8333276b73a6e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala @@ -233,9 +233,9 @@ private[hive] class IsolatedClientLoader( } /** The isolated client interface to Hive. */ - private[hive] def createClient(): HiveClient = { + private[hive] def createClient(userName: String = null): HiveClient = { if (!isolationOn) { - return new HiveClientImpl(version, config, baseClassLoader, this) + return new HiveClientImpl(userName, version, config, baseClassLoader, this) } // Pre-reflective instantiation setup. logDebug("Initializing the logger to avoid disaster...") @@ -246,7 +246,7 @@ private[hive] class IsolatedClientLoader( classLoader .loadClass(classOf[HiveClientImpl].getName) .getConstructors.head - .newInstance(version, config, classLoader, this) + .newInstance(userName, version, config, classLoader, this) .asInstanceOf[HiveClient] } catch { case e: InvocationTargetException =>