1717
1818package org .apache .spark .sql .hive .thriftserver
1919
20- import java .security .PrivilegedExceptionAction
2120import java .sql .{Date , Timestamp }
22- import java .util .concurrent .Future
2321import java .util .{ArrayList => JArrayList , List => JList , Map => JMap }
2422
2523import scala .collection .JavaConversions ._
2624import scala .collection .mutable .{ArrayBuffer , Map => SMap }
2725import scala .math ._
2826
29- import org .apache .hadoop .hive .conf .HiveConf
3027import org .apache .hadoop .hive .metastore .api .FieldSchema
31- import org .apache .hadoop .hive .ql .metadata .Hive
32- import org .apache .hadoop .hive .ql .session .SessionState
33- import org .apache .hadoop .hive .shims .ShimLoader
3428import org .apache .hadoop .security .UserGroupInformation
3529import org .apache .hive .service .cli ._
3630import org .apache .hive .service .cli .operation .ExecuteStatementOperation
3731import org .apache .hive .service .cli .session .HiveSession
3832
3933import org .apache .spark .Logging
34+ import org .apache .spark .sql .catalyst .plans .logical .SetCommand
4035import org .apache .spark .sql .catalyst .types ._
4136import org .apache .spark .sql .hive .thriftserver .ReflectionUtils ._
4237import org .apache .spark .sql .hive .{HiveContext , HiveMetastoreTypes }
43- import org .apache .spark .sql .{SchemaRDD , Row => SparkRow }
38+ import org .apache .spark .sql .{Row => SparkRow , SQLConf , SchemaRDD }
4439
4540/**
4641 * A compatibility layer for interacting with Hive version 0.13.1.
4742 */
4843private [thriftserver] object HiveThriftServerShim {
4944 val version = " 0.13.1"
5045
51- def setServerUserName (sparkServiceUGI : UserGroupInformation , sparkCliService: SparkSQLCLIService ) = {
46+ def setServerUserName (
47+ sparkServiceUGI : UserGroupInformation ,
48+ sparkCliService: SparkSQLCLIService ) = {
5249 setSuperField(sparkCliService, " serviceUGI" , sparkServiceUGI)
5350 }
5451}
@@ -72,39 +69,14 @@ private[hive] class SparkExecuteStatementOperation(
7269 confOverlay : JMap [String , String ],
7370 runInBackground : Boolean = true )(
7471 hiveContext : HiveContext ,
75- sessionToActivePool : SMap [HiveSession , String ]) extends ExecuteStatementOperation (
76- parentSession, statement, confOverlay, runInBackground) with Logging {
72+ sessionToActivePool : SMap [HiveSession , String ])
73+ // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
74+ extends ExecuteStatementOperation (parentSession, statement, confOverlay, false ) with Logging {
7775
7876 private var result : SchemaRDD = _
7977 private var iter : Iterator [SparkRow ] = _
8078 private var dataTypes : Array [DataType ] = _
8179
82- private def runInternal (cmd : String ) = {
83- try {
84- result = hiveContext.sql(cmd)
85- logDebug(result.queryExecution.toString())
86- val groupId = round(random * 1000000 ).toString
87- hiveContext.sparkContext.setJobGroup(groupId, statement)
88- iter = {
89- val useIncrementalCollect =
90- hiveContext.getConf(" spark.sql.thriftServer.incrementalCollect" , " false" ).toBoolean
91- if (useIncrementalCollect) {
92- result.toLocalIterator
93- } else {
94- result.collect().iterator
95- }
96- }
97- dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
98- } catch {
99- // Actually do need to catch Throwable as some failures don't inherit from Exception and
100- // HiveServer will silently swallow them.
101- case e : Throwable =>
102- setState(OperationState .ERROR )
103- logError(" Error executing query:" ,e)
104- throw new HiveSQLException (e.toString)
105- }
106- }
107-
10880 def close (): Unit = {
10981 // RDDs will be cleaned automatically upon garbage collection.
11082 logDebug(" CLOSING" )
@@ -182,76 +154,43 @@ private[hive] class SparkExecuteStatementOperation(
182154 }
183155 }
184156
185- private def getConfigForOperation : HiveConf = {
186- var sqlOperationConf : HiveConf = getParentSession.getHiveConf
187- if (! getConfOverlay.isEmpty || shouldRunAsync) {
188- sqlOperationConf = new HiveConf (sqlOperationConf)
189- import scala .collection .JavaConversions ._
190- for (confEntry <- getConfOverlay.entrySet) {
191- try {
192- sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
193- }
194- catch { case e : IllegalArgumentException =>
195- throw new HiveSQLException (" Error applying statement specific settings" , e)
196- }
197- }
198- }
199- sqlOperationConf
200- }
201-
202157 def run (): Unit = {
203158 logInfo(s " Running query ' $statement' " )
204- val opConfig : HiveConf = getConfigForOperation
205159 setState(OperationState .RUNNING )
206- setHasResultSet(true )
207-
208- if (! shouldRunAsync) {
209- runInternal(statement)
210- setState(OperationState .FINISHED )
211- } else {
212- val parentSessionState = SessionState .get
213- val sessionHive : Hive = Hive .get
214- val currentUGI : UserGroupInformation = ShimLoader .getHadoopShims.getUGIForConf(opConfig)
215-
216- val backgroundOperation : Runnable = new Runnable {
217- def run () {
218- val doAsAction : PrivilegedExceptionAction [AnyRef ] =
219- new PrivilegedExceptionAction [AnyRef ] {
220- def run : AnyRef = {
221- Hive .set(sessionHive)
222- SessionState .setCurrentSessionState(parentSessionState)
223- try {
224- runInternal(statement)
225- }
226- catch { case e : HiveSQLException =>
227- setOperationException(e)
228- logError(" Error running hive query: " , e)
229- }
230- null
231- }
232- }
233- try {
234- ShimLoader .getHadoopShims.doAs(currentUGI, doAsAction)
235- }
236- catch { case e : Exception =>
237- setOperationException(new HiveSQLException (e))
238- logError(" Error running hive query as user : " + currentUGI.getShortUserName, e)
239- }
240- setState(OperationState .FINISHED )
241- }
160+ try {
161+ result = hiveContext.sql(statement)
162+ logDebug(result.queryExecution.toString())
163+ result.queryExecution.logical match {
164+ case SetCommand (Some ((SQLConf .THRIFTSERVER_POOL , Some (value)))) =>
165+ sessionToActivePool(parentSession) = value
166+ logInfo(s " Setting spark.scheduler.pool= $value for future statements in this session. " )
167+ case _ =>
242168 }
243169
244- try {
245- val backgroundHandle : Future [_] = getParentSession.getSessionManager.
246- submitBackgroundOperation(backgroundOperation)
247- setBackgroundHandle(backgroundHandle)
248- } catch {
249- // Actually do need to catch Throwable as some failures don't inherit from Exception and
250- // HiveServer will silently swallow them.
251- case e : Throwable =>
252- logError(" Error executing query:" ,e)
253- throw new HiveSQLException (e.toString)
170+ val groupId = round(random * 1000000 ).toString
171+ hiveContext.sparkContext.setJobGroup(groupId, statement)
172+ sessionToActivePool.get(parentSession).foreach { pool =>
173+ hiveContext.sparkContext.setLocalProperty(" spark.scheduler.pool" , pool)
174+ }
175+ iter = {
176+ val useIncrementalCollect =
177+ hiveContext.getConf(" spark.sql.thriftServer.incrementalCollect" , " false" ).toBoolean
178+ if (useIncrementalCollect) {
179+ result.toLocalIterator
180+ } else {
181+ result.collect().iterator
182+ }
254183 }
184+ dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
185+ setHasResultSet(true )
186+ } catch {
187+ // Actually do need to catch Throwable as some failures don't inherit from Exception and
188+ // HiveServer will silently swallow them.
189+ case e : Throwable =>
190+ setState(OperationState .ERROR )
191+ logError(" Error executing query:" , e)
192+ throw new HiveSQLException (e.toString)
255193 }
194+ setState(OperationState .FINISHED )
256195 }
257196}
0 commit comments