@@ -22,20 +22,27 @@ import org.apache.hadoop.hive.conf.HiveConf
2222import org .apache .hadoop .hive .conf .HiveConf .ConfVars
2323import org .apache .hive .service .cli .thrift .{ThriftBinaryCLIService , ThriftHttpCLIService }
2424import org .apache .hive .service .server .{HiveServer2 , ServerOptionsProcessor }
25+ import org .apache .spark .sql .SQLConf
2526
26- import org .apache .spark .Logging
27+ import org .apache .spark .{ SparkContext , SparkConf , Logging }
2728import org .apache .spark .annotation .DeveloperApi
2829import org .apache .spark .sql .hive .HiveContext
2930import org .apache .spark .sql .hive .thriftserver .ReflectionUtils ._
30- import org .apache .spark .scheduler .{SparkListenerApplicationEnd , SparkListener }
31+ import org .apache .spark .scheduler .{SparkListenerJobStart , SparkListenerApplicationEnd , SparkListener }
32+ import org .apache .spark .sql .hive .thriftserver .ui .ThriftServerTab
3133import org .apache .spark .util .Utils
3234
35+ import scala .collection .mutable
36+ import scala .collection .mutable .ArrayBuffer
37+
3338/**
3439 * The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
3540 * `HiveThriftServer2` thrift server.
3641 */
3742object HiveThriftServer2 extends Logging {
3843 var LOG = LogFactory .getLog(classOf [HiveServer2 ])
44+ var uiTab : Option [ThriftServerTab ] = _
45+ var listener : HiveThriftServer2Listener = _
3946
4047 /**
4148 * :: DeveloperApi ::
@@ -46,7 +53,13 @@ object HiveThriftServer2 extends Logging {
4653 val server = new HiveThriftServer2 (sqlContext)
4754 server.init(sqlContext.hiveconf)
4855 server.start()
49- sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener (server))
56+ listener = new HiveThriftServer2Listener (server, sqlContext.conf)
57+ sqlContext.sparkContext.addSparkListener(listener)
58+ uiTab = if (sqlContext.sparkContext.getConf.getBoolean(" spark.ui.enabled" , true )) {
59+ Some (new ThriftServerTab (sqlContext.sparkContext))
60+ } else {
61+ None
62+ }
5063 }
5164
5265 def main (args : Array [String ]) {
@@ -58,30 +71,164 @@ object HiveThriftServer2 extends Logging {
5871 logInfo(" Starting SparkContext" )
5972 SparkSQLEnv .init()
6073
61- Utils .addShutdownHook { () => SparkSQLEnv .stop() }
74+ Utils .addShutdownHook { () =>
75+ SparkSQLEnv .stop()
76+ uiTab.foreach(_.detach())
77+ }
6278
6379 try {
6480 val server = new HiveThriftServer2 (SparkSQLEnv .hiveContext)
6581 server.init(SparkSQLEnv .hiveContext.hiveconf)
6682 server.start()
6783 logInfo(" HiveThriftServer2 started" )
68- SparkSQLEnv .sparkContext.addSparkListener(new HiveThriftServer2Listener (server))
84+ listener = new HiveThriftServer2Listener (server, SparkSQLEnv .hiveContext.conf)
85+ SparkSQLEnv .sparkContext.addSparkListener(listener)
86+ uiTab = if (SparkSQLEnv .sparkContext.getConf.getBoolean(" spark.ui.enabled" , true )) {
87+ Some (new ThriftServerTab (SparkSQLEnv .sparkContext))
88+ } else {
89+ None
90+ }
6991 } catch {
7092 case e : Exception =>
7193 logError(" Error starting HiveThriftServer2" , e)
7294 System .exit(- 1 )
7395 }
7496 }
7597
98+ private [thriftserver] class SessionInfo (
99+ val sessionId : String ,
100+ val startTimestamp : Long ,
101+ val ip : String ,
102+ val userName : String ) {
103+ var finishTimestamp : Long = 0L
104+ var totalExecution : Int = 0
105+ def totalTime : Long = {
106+ if (finishTimestamp == 0L ) {
107+ System .currentTimeMillis - startTimestamp
108+ } else {
109+ finishTimestamp - startTimestamp
110+ }
111+ }
112+ }
113+
114+ private [thriftserver] object ExecutionState extends Enumeration {
115+ val STARTED, COMPILED, FAILED, FINISHED = Value
116+ type ExecutionState = Value
117+ }
118+
119+ private [thriftserver] class ExecutionInfo (
120+ val statement : String ,
121+ val sessionId : String ,
122+ val startTimestamp : Long ,
123+ val userName : String ) {
124+ var finishTimestamp : Long = 0L
125+ var executePlan : String = " "
126+ var detail : String = " "
127+ var state : ExecutionState .Value = ExecutionState .STARTED
128+ val jobId : ArrayBuffer [String ] = ArrayBuffer [String ]()
129+ var groupId : String = " "
130+ def totalTime : Long = {
131+ if (finishTimestamp == 0L ) {
132+ System .currentTimeMillis - startTimestamp
133+ } else {
134+ finishTimestamp - startTimestamp
135+ }
136+ }
137+ }
138+
139+
76140 /**
77141 * A inner sparkListener called in sc.stop to clean up the HiveThriftServer2
78142 */
79- class HiveThriftServer2Listener (val server : HiveServer2 ) extends SparkListener {
143+ private [thriftserver] class HiveThriftServer2Listener (
144+ val server : HiveServer2 ,
145+ val conf : SQLConf ) extends SparkListener {
146+
80147 override def onApplicationEnd (applicationEnd : SparkListenerApplicationEnd ): Unit = {
81148 server.stop()
82149 }
83- }
84150
151+ val sessionList = new mutable.LinkedHashMap [String , SessionInfo ]
152+ val executionList = new mutable.LinkedHashMap [String , ExecutionInfo ]
153+ val retainedStatements =
154+ conf.getConf(SQLConf .THRIFTSERVER_UI_STATEMENT_LIMIT , " 200" ).toInt
155+ val retainedSessions =
156+ conf.getConf(SQLConf .THRIFTSERVER_UI_SESSION_LIMIT , " 200" ).toInt
157+ var totalRunning = 0
158+
159+ override def onJobStart (jobStart : SparkListenerJobStart ): Unit = {
160+ for {
161+ props <- Option (jobStart.properties)
162+ groupId <- Option (props.getProperty(SparkContext .SPARK_JOB_GROUP_ID ))
163+ (_, info) <- executionList if info.groupId == groupId
164+ } {
165+ info.jobId += jobStart.jobId.toString
166+ info.groupId = groupId
167+ }
168+ }
169+
170+ def onSessionCreated (ip : String , sessionId : String , userName : String = " UNKNOWN" ): Unit = {
171+ val info = new SessionInfo (sessionId, System .currentTimeMillis, ip, userName)
172+ sessionList.put(sessionId, info)
173+ trimSessionIfNecessary()
174+ }
175+
176+ def onSessionClosed (sessionId : String ): Unit = {
177+ sessionList(sessionId).finishTimestamp = System .currentTimeMillis
178+ }
179+
180+ def onStatementStart (
181+ id : String ,
182+ sessionId : String ,
183+ statement : String ,
184+ groupId : String ,
185+ userName : String = " UNKNOWN" ): Unit = {
186+ val info = new ExecutionInfo (statement, sessionId, System .currentTimeMillis, userName)
187+ info.state = ExecutionState .STARTED
188+ executionList.put(id, info)
189+ trimExecutionIfNecessary()
190+ sessionList(sessionId).totalExecution += 1
191+ executionList(id).groupId = groupId
192+ totalRunning += 1
193+ }
194+
195+ def onStatementParsed (id : String , executionPlan : String ): Unit = {
196+ executionList(id).executePlan = executionPlan
197+ executionList(id).state = ExecutionState .COMPILED
198+ }
199+
200+ def onStatementError (id : String , errorMessage : String , errorTrace : String ): Unit = {
201+ executionList(id).finishTimestamp = System .currentTimeMillis
202+ executionList(id).detail = errorMessage
203+ executionList(id).state = ExecutionState .FAILED
204+ totalRunning -= 1
205+ }
206+
207+ def onStatementFinish (id : String ): Unit = {
208+ executionList(id).finishTimestamp = System .currentTimeMillis
209+ executionList(id).state = ExecutionState .FINISHED
210+ totalRunning -= 1
211+ }
212+
213+ private def trimExecutionIfNecessary () = synchronized {
214+ if (executionList.size > retainedStatements) {
215+ val toRemove = math.max(retainedStatements / 10 , 1 )
216+ executionList.take(toRemove).foreach { s =>
217+ executionList.remove(s._1)
218+ }
219+ }
220+ }
221+
222+ private def trimSessionIfNecessary () = synchronized {
223+ if (sessionList.size > retainedSessions) {
224+ val toRemove = math.max(retainedSessions / 10 , 1 )
225+ sessionList.take(toRemove).foreach { s =>
226+ sessionList.remove(s._1)
227+ }
228+ }
229+
230+ }
231+ }
85232}
86233
87234private [hive] class HiveThriftServer2 (hiveContext : HiveContext )
0 commit comments