@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.conf.HiveConf
2222import org .apache .hadoop .hive .conf .HiveConf .ConfVars
2323import org .apache .hive .service .cli .thrift .{ThriftBinaryCLIService , ThriftHttpCLIService }
2424import org .apache .hive .service .server .{HiveServer2 , ServerOptionsProcessor }
25+ import org .apache .spark .sql .SQLConf
2526
2627import org .apache .spark .{SparkContext , SparkConf , Logging }
2728import org .apache .spark .annotation .DeveloperApi
@@ -52,7 +53,7 @@ object HiveThriftServer2 extends Logging {
5253 val server = new HiveThriftServer2 (sqlContext)
5354 server.init(sqlContext.hiveconf)
5455 server.start()
55- listener = new HiveThriftServer2Listener (server, sqlContext.sparkContext. conf)
56+ listener = new HiveThriftServer2Listener (server, sqlContext.conf)
5657 sqlContext.sparkContext.addSparkListener(listener)
5758 uiTab = if (sqlContext.sparkContext.getConf.getBoolean(" spark.ui.enabled" , true )) {
5859 Some (new ThriftServerTab (sqlContext.sparkContext))
@@ -80,7 +81,7 @@ object HiveThriftServer2 extends Logging {
8081 server.init(SparkSQLEnv .hiveContext.hiveconf)
8182 server.start()
8283 logInfo(" HiveThriftServer2 started" )
83- listener = new HiveThriftServer2Listener (server, SparkSQLEnv .sparkContext .conf)
84+ listener = new HiveThriftServer2Listener (server, SparkSQLEnv .hiveContext .conf)
8485 SparkSQLEnv .sparkContext.addSparkListener(listener)
8586 uiTab = if (SparkSQLEnv .sparkContext.getConf.getBoolean(" spark.ui.enabled" , true )) {
8687 Some (new ThriftServerTab (SparkSQLEnv .sparkContext))
@@ -100,10 +101,10 @@ object HiveThriftServer2 extends Logging {
100101 val ip : String ,
101102 val userName : String ) {
102103 var finishTimestamp : Long = 0L
103- var totalExecute : Int = 0
104+ var totalExecution : Int = 0
104105 def totalTime : Long = {
105106 if (finishTimestamp == 0L ) {
106- System .currentTimeMillis() - startTimestamp
107+ System .currentTimeMillis - startTimestamp
107108 } else {
108109 finishTimestamp - startTimestamp
109110 }
@@ -139,43 +140,37 @@ object HiveThriftServer2 extends Logging {
139140 /**
140141 * A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
141142 */
142- class HiveThriftServer2Listener (
143+ private [thriftserver] class HiveThriftServer2Listener (
143144 val server : HiveServer2 ,
144- val conf : SparkConf ) extends SparkListener {
145+ val conf : SQLConf ) extends SparkListener {
145146
146147 override def onApplicationEnd (applicationEnd : SparkListenerApplicationEnd ): Unit = {
147148 server.stop()
148149 }
149150
150- val sessionList = new mutable.HashMap [String , SessionInfo ]
151- val executeList = new mutable.HashMap [String , ExecutionInfo ]
151+ val sessionList = new mutable.LinkedHashMap [String , SessionInfo ]
152+ val executionList = new mutable.LinkedHashMap [String , ExecutionInfo ]
152153 val retainedStatements =
153- conf.getInt( " spark.thriftserver.ui.retainedStatements " , 200 )
154+ conf.getConf( SQLConf . THRIFTSERVER_UI_STATEMENT_LIMIT , " 200" ).toInt
154155 val retainedSessions =
155- conf.getInt( " spark.thriftserver.ui.retainedSessions " , 200 )
156+ conf.getConf( SQLConf . THRIFTSERVER_UI_SESSION_LIMIT , " 200" ).toInt
156157 var totalRunning = 0
157158
158159 override def onJobStart (jobStart : SparkListenerJobStart ): Unit = {
159- val jobGroup = for (
160- props <- Option (jobStart.properties);
161- statement <- Option (props.getProperty(SparkContext .SPARK_JOB_GROUP_ID ))
162- ) yield statement
163-
164- jobGroup.map( groupId => {
165- val ret = executeList.find( _ match {
166- case (id : String , info : ExecutionInfo ) => info.groupId == groupId
167- })
168- if (ret.isDefined) {
169- ret.get._2.jobId += jobStart.jobId.toString
170- ret.get._2.groupId = groupId
171- }
172- })
160+ for {
161+ props <- Option (jobStart.properties)
162+ groupId <- Option (props.getProperty(SparkContext .SPARK_JOB_GROUP_ID ))
163+ (_, info) <- executionList if info.groupId == groupId
164+ } {
165+ info.jobId += jobStart.jobId.toString
166+ info.groupId = groupId
167+ }
173168 }
174169
175170 def onSessionCreated (ip : String , sessionId : String , userName : String = " UNKNOWN" ): Unit = {
176171 val info = new SessionInfo (sessionId, System .currentTimeMillis, ip, userName)
177- sessionList(sessionId) = info
178- trimSessionIfNecessary
172+ sessionList.put (sessionId, info)
173+ trimSessionIfNecessary()
179174 }
180175
181176 def onSessionClosed (sessionId : String ): Unit = {
@@ -190,44 +185,44 @@ object HiveThriftServer2 extends Logging {
190185 userName : String = " UNKNOWN" ): Unit = {
191186 val info = new ExecutionInfo (statement, sessionId, System .currentTimeMillis, userName)
192187 info.state = ExecutionState .STARTED
193- executeList (id) = info
194- trimExecutionIfNecessary
195- sessionList(sessionId).totalExecute += 1
196- executeList (id).groupId = groupId
188+ executionList.put (id, info)
189+ trimExecutionIfNecessary()
190+ sessionList(sessionId).totalExecution += 1
191+ executionList (id).groupId = groupId
197192 totalRunning += 1
198193 }
199194
200- def onStatementParse (id : String , executePlan : String ): Unit = {
201- executeList (id).executePlan = executePlan
202- executeList (id).state = ExecutionState .COMPILED
195+ def onStatementParsed (id : String , executionPlan : String ): Unit = {
196+ executionList (id).executePlan = executionPlan
197+ executionList (id).state = ExecutionState .COMPILED
203198 }
204199
205200 def onStatementError (id : String , errorMessage : String , errorTrace : String ): Unit = {
206- executeList (id).finishTimestamp = System .currentTimeMillis
207- executeList (id).detail = errorMessage
208- executeList (id).state = ExecutionState .FAILED
201+ executionList (id).finishTimestamp = System .currentTimeMillis
202+ executionList (id).detail = errorMessage
203+ executionList (id).state = ExecutionState .FAILED
209204 totalRunning -= 1
210205 }
211206
212207 def onStatementFinish (id : String ): Unit = {
213- executeList (id).finishTimestamp = System .currentTimeMillis
214- executeList (id).state = ExecutionState .FINISHED
208+ executionList (id).finishTimestamp = System .currentTimeMillis
209+ executionList (id).state = ExecutionState .FINISHED
215210 totalRunning -= 1
216211 }
217212
218- private def trimExecutionIfNecessary = synchronized {
219- if (executeList .size > retainedStatements) {
213+ private def trimExecutionIfNecessary () = synchronized {
214+ if (executionList .size > retainedStatements) {
220215 val toRemove = math.max(retainedStatements / 10 , 1 )
221- executeList.toList.sortBy(_._2.startTimestamp) .take(toRemove).foreach { s =>
222- executeList .remove(s._1)
216+ executionList .take(toRemove).foreach { s =>
217+ executionList .remove(s._1)
223218 }
224219 }
225220 }
226221
227- private def trimSessionIfNecessary = synchronized {
222+ private def trimSessionIfNecessary () = synchronized {
228223 if (sessionList.size > retainedSessions) {
229224 val toRemove = math.max(retainedSessions / 10 , 1 )
230- sessionList.toList.sortBy(_._2.startTimestamp). take(toRemove).foreach { s =>
225+ sessionList.take(toRemove).foreach { s =>
231226 sessionList.remove(s._1)
232227 }
233228 }
0 commit comments