From f1aedebf908fbbca7adde4458fccfd29da255b01 Mon Sep 17 00:00:00 2001 From: tianyi Date: Wed, 7 Jan 2015 13:48:37 +0800 Subject: [PATCH 1/7] add thriftserver-ui support --- .../hive/thriftserver/HiveThriftServer2.scala | 8 + .../sql/hive/thriftserver/SparkSQLEnv.scala | 6 +- .../thriftserver/SparkSQLSessionManager.scala | 4 +- .../thriftserver/ui/ThriftServerPage.scala | 179 +++++++++++++++++ .../thriftserver/ui/ThriftServerTab.scala | 45 +++++ .../ui/ThriftServerUIEventListener.scala | 182 ++++++++++++++++++ .../thriftserver/HiveThriftServer2Suite.scala | 73 ++++++- .../spark/sql/hive/thriftserver/Shim12.scala | 7 +- .../spark/sql/hive/thriftserver/Shim13.scala | 7 +- 9 files changed, 505 insertions(+), 6 deletions(-) create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6e07df18b0e1..dd419e1df4a6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} +import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab /** * The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a @@ -93,6 +94,13 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) extends HiveServer2 with ReflectedCompositeService { + private[hive] val uiTab: Option[ThriftServerTab] = + if (hiveContext.hiveconf.getBoolean("spark.ui.enabled", true)) { + Some(new ThriftServerTab()) + } else { + None + } + override def init(hiveConf: HiveConf) { val sparkSqlCliService = new SparkSQLCLIService(hiveContext) setSuperField(this, "cliService", sparkSqlCliService) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 158c22515972..2bba1df099ee 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive.thriftserver +import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerUIEventListener + import scala.collection.JavaConversions._ import org.apache.spark.scheduler.StatsReportListener @@ -29,6 +31,7 @@ private[hive] object SparkSQLEnv extends Logging { var hiveContext: HiveContext = _ var sparkContext: SparkContext = _ + var sqlEventListener: ThriftServerUIEventListener = _ def init() { if (hiveContext == null) { @@ -49,7 +52,8 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = new SparkContext(sparkConf) sparkContext.addSparkListener(new StatsReportListener()) hiveContext = new HiveContext(sparkContext) - + sqlEventListener = new ThriftServerUIEventListener(sparkConf) + sparkContext.addSparkListener(sqlEventListener) if (log.isDebugEnabled) { hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 89e9ede7261c..e41713a78c7f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -23,11 +23,12 @@ import org.apache.commons.logging.Log import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.session.SessionManager +import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -import org.apache.hive.service.cli.SessionHandle +import org.apache.hive.service.cli.{HiveSQLException, SessionHandle} private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) extends SessionManager @@ -50,6 +51,7 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) } override def closeSession(sessionHandle: SessionHandle) { + SparkSQLEnv.sqlEventListener.onDisconnected(super.getSession(sessionHandle)) super.closeSession(sessionHandle) sparkSqlOperationManager.sessionToActivePool -= sessionHandle } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala new file mode 100644 index 000000000000..fa3419eef784 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import java.util.Calendar +import javax.servlet.http.HttpServletRequest + +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.Logging +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.ui._ +import org.apache.spark.util.Distribution + +import scala.xml.Node + +/** Page for Spark Web UI that shows statistics of a streaming job */ +private[ui] class ThriftServerPage(parent: ThriftServerTab) + extends WebUIPage("") with Logging { + + private val listener = parent.listener + private val startTime = Calendar.getInstance().getTime() + private val emptyCell = "-" + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val content = + generateBasicStats() ++

++ +

Total {listener.sessionList.size} session online, + Total {listener.totalRunning} sql running

++ + generateSessionStatsTable() ++ generateSQLStatsTable() + UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000)) + } + + /** Generate basic stats of the streaming program */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - startTime.getTime + + } + + /** Generate stats of batch jobs of the streaming program */ + private def generateSQLStatsTable(): Seq[Node] = { + val numBatches = listener.executeList.size + val table = if (numBatches > 0) { + val headerRow = Seq("User", "JobID", "Start Time", "Finish Time", "Duration", + "Statement", "State", "Detail") + val dataRows = listener.executeList.values.toSeq.sortBy(_.startTimestamp).reverse + + def generateDataRow(info: ExecutionInfo): Seq[Node] = { + val detailUrl = "%s/jobs/job?id=%s" + .format(UIUtils.prependBaseUri(parent.basePath), info.jobId) + val detail = if(info.state == ExecutionState.FAILED) info.detail else info.executePlan + + {info.session.getUsername} + + {info.jobId} + + {formatDate(info.startTimestamp)} + {formatDate(info.finishTimestamp)} + {formatDurationOption(Some(info.totalTime))} + {info.statement} + {info.state} + {errorMessageCell(detail)} + + } + + Some(UIUtils.listingTable(headerRow, generateDataRow, + dataRows, false, None, Seq(null), false)) + } else { + None + } + + val content = +
SQL Statistics
++ +
+ +
+ + content + } + + private def errorMessageCell(errorMessage: String): Seq[Node] = { + val isMultiline = errorMessage.indexOf('\n') >= 0 + val errorSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + errorMessage.substring(0, errorMessage.indexOf('\n')) + } else { + errorMessage + }) + val details = if (isMultiline) { + // scalastyle:off + + +details + ++ + + // scalastyle:on + } else { + "" + } + {errorSummary}{details} + } + + /** Generate stats of batch jobs of the streaming program */ + private def generateSessionStatsTable(): Seq[Node] = { + val numBatches = listener.sessionList.size + val table = if (numBatches > 0) { + val dataRows = + listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map(session =>{ + Seq( + session.session.getUsername, + session.session.getIpAddress, + session.sessionID, + formatDate(session.startTimestamp), + formatDate(session.finishTimestamp), + formatDurationOption(Some(session.totalTime)), + session.totalExecute.toString + ) + }).toSeq + val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration", + "Total Execute") + Some(listingTable(headerRow, dataRows)) + } else { + None + } + + val content = +
Session Statistics
++ +
+ +
+ + content + } + + + /** + * Returns a human-readable string representing a duration such as "5 second 35 ms" + */ + private def formatDurationOption(msOption: Option[Long]): String = { + msOption.map(formatDurationVerbose).getOrElse(emptyCell) + } + + /** Generate HTML table from string data */ + private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = { + def generateDataRow(data: Seq[String]): Seq[Node] = { + {data.map(d => {d})} + } + UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) + } +} + diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala new file mode 100644 index 000000000000..7dbf66b435cf --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import org.apache.spark.sql.hive.thriftserver.SparkSQLEnv +import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab._ +import org.apache.spark.ui.{SparkUI, SparkUITab} +import org.apache.spark.{Logging, SparkException} + +/** + * Spark Web UI tab that shows statistics of a streaming job. + * This assumes the given SparkContext has enabled its SparkUI. + */ +private[hive] class ThriftServerTab() + extends SparkUITab(getSparkUI(), "ThriftServer") with Logging { + + val parent = getSparkUI() + val listener = SparkSQLEnv.sqlEventListener + + attachPage(new ThriftServerPage(this)) + parent.attachTab(this) +} + +private object ThriftServerTab { + def getSparkUI(): SparkUI = { + SparkSQLEnv.sparkContext.ui.getOrElse { + throw new SparkException("Parent SparkUI to attach this tab to not found!") + } + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala new file mode 100644 index 000000000000..6ec44074f731 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import org.apache.hive.service.cli.SessionHandle +import org.apache.hive.service.cli.session.HiveSession +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener} + +import scala.collection.mutable.HashMap + +trait ThriftServerEventListener { + + def onConnected(session: HiveSession) { } + def onDisconnected(session: HiveSession) { } + + def onStart(id: String, session: HiveSession, statement: String) { } + def onParse(id: String, executePlan: String, groupId: String) { } + def onError(id: String, errorMessage: String, errorTrace: String) { } + def onFinish(id: String) { } +} + +class SessionInfo(val session: HiveSession, val startTimestamp: Long) { + val sessionID = session.getSessionHandle.getSessionId.toString + var finishTimestamp = 0L + var totalExecute = 0 + + def totalTime = { + if (finishTimestamp == 0L) { + System.currentTimeMillis() - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} + +object ExecutionState extends Enumeration { + val STARTED, COMPILED, FAILED, FINISHED = Value + type ExecutionState = Value +} + +class ExecutionInfo(val statement: String, val session: HiveSession, val startTimestamp: Long) { + var finishTimestamp = 0L + var executePlan = "" + var detail = "" + var state: ExecutionState.Value = ExecutionState.STARTED + var groupId = "" + var jobId = "" + def totalTime = { + if (finishTimestamp == 0L) { + System.currentTimeMillis() - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} + +class ThriftServerUIEventListener(val conf: SparkConf) + extends ThriftServerEventListener with SparkListener { + + import ThriftServerUIEventListener._ + + var sessionList = new HashMap[SessionHandle, SessionInfo] + var executeList = new HashMap[String, ExecutionInfo] + val retainedStatements = + conf.getInt("spark.thriftserver.ui.retainedStatements", DEFAULT_RETAINED_STATEMENTS) + val retainedSessions = + conf.getInt("spark.thriftserver.ui.retainedSessions", DEFAULT_RETAINED_SESSIONS) + var totalRunning = 0 + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobGroup = for ( + props <- Option(jobStart.properties); + group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) + ) yield group + + executeList.foreach { + case (id: String, info: ExecutionInfo) if info.groupId == jobGroup.get => { + executeList(id).jobId = jobStart.jobId.toString + } + } + } + + override def onConnected(session: HiveSession): Unit = { + val info = new SessionInfo(session, System.currentTimeMillis()) + sessionList(session.getSessionHandle) = info + trimSessionIfNecessary() + } + + override def onDisconnected(session: HiveSession): Unit = { + if(!sessionList.contains(session.getSessionHandle)) { + onConnected(session) + } + sessionList(session.getSessionHandle).finishTimestamp = System.currentTimeMillis() + + } + + override def onStart(id: String, session: HiveSession, statement: String): Unit = { + // TODO: Due to the incompatible interface between different hive version, + // we can't get the session start event. + // So we have to update session information from here. + if(!sessionList.contains(session.getSessionHandle)) { + onConnected(session) + } + val info = new ExecutionInfo(statement, session, System.currentTimeMillis()) + info.state = ExecutionState.STARTED + executeList(id) = info + trimExecutionIfNecessary() + sessionList(session.getSessionHandle).totalExecute += 1 + totalRunning += 1 + } + + override def onParse(id: String, executePlan: String, groupId: String): Unit = { + executeList(id).executePlan = executePlan + executeList(id).groupId = groupId + executeList(id).state = ExecutionState.COMPILED + } + + override def onError(id: String, errorMessage: String, errorTrace: String): Unit = { + executeList(id).finishTimestamp = System.currentTimeMillis() + executeList(id).detail = errorMessage + //+ "

" + errorTrace + executeList(id).state = ExecutionState.FAILED + totalRunning -= 1 + } + + override def onFinish(id: String): Unit = { + executeList(id).finishTimestamp = System.currentTimeMillis() + executeList(id).state = ExecutionState.FINISHED + totalRunning -= 1 + } + + private def trimExecutionIfNecessary() = synchronized { + if (executeList.size > retainedStatements) { + val toRemove = math.max(retainedStatements / 10, 1) + executeList.toList.sortWith(compareExecutionDesc).take(toRemove).foreach { s => + executeList.remove(s._1) + } + } + } + + private def compareExecutionDesc( + l:(String, ExecutionInfo), + r:(String, ExecutionInfo)): Boolean = { + l._2.startTimestamp < r._2.startTimestamp + } + + private def compareSessionDesc( + l:(SessionHandle, SessionInfo), + r:(SessionHandle, SessionInfo)): Boolean = { + l._2.startTimestamp < r._2.startTimestamp + } + + private def trimSessionIfNecessary() = synchronized { + if (sessionList.size > retainedSessions) { + val toRemove = math.max(retainedSessions / 10, 1) + sessionList.toList.sortWith(compareSessionDesc).take(toRemove).foreach { s => + sessionList.remove(s._1) + } + } + } +} + +private object ThriftServerUIEventListener { + val DEFAULT_RETAINED_SESSIONS = 1000 + val DEFAULT_RETAINED_STATEMENTS = 1000 +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index b52a51d11e4a..07a11014c555 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -22,10 +22,13 @@ import java.net.ServerSocket import java.sql.{Date, DriverManager, Statement} import java.util.concurrent.TimeoutException +import org.scalatest.concurrent.Eventually._ + import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} +import scala.io.Source import scala.sys.process.{Process, ProcessLogger} import scala.util.Try @@ -70,6 +73,34 @@ class HiveThriftServer2Suite extends FunSuite with Logging { port } + def withThriftUIAndJDBC( + uiPort: Int, + serverStartTimeout: FiniteDuration = 1.minute, + httpMode: Boolean = false)( + f: Statement => Unit) { + val port = randomListeningPort + + startThriftServerWithUIPort(port, uiPort, serverStartTimeout, httpMode) { + val jdbcUri = if (httpMode) { + s"jdbc:hive2://${"localhost"}:$port/" + + "default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice" + } else { + s"jdbc:hive2://${"localhost"}:$port/" + } + + val user = System.getProperty("user.name") + val connection = DriverManager.getConnection(jdbcUri, user, "") + val statement = connection.createStatement() + + try { + f(statement) + } finally { + statement.close() + connection.close() + } + } + } + def withJdbcStatement( serverStartTimeout: FiniteDuration = 1.minute, httpMode: Boolean = false)( @@ -121,7 +152,16 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } def startThriftServer( + thriftPort: Int, + serverStartTimeout: FiniteDuration = 1.minute, + httpMode: Boolean = false)( + f: => Unit): Unit = { + startThriftServerWithUIPort(thriftPort, randomListeningPort, serverStartTimeout, httpMode)(f) + } + + def startThriftServerWithUIPort( port: Int, + uiPort: Int, serverStartTimeout: FiniteDuration = 1.minute, httpMode: Boolean = false)( f: => Unit) { @@ -143,7 +183,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { | --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=http | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}=$port | --driver-class-path ${sys.props("java.class.path")} - | --conf spark.ui.enabled=false + | --conf spark.ui.port=$uiPort """.stripMargin.split("\\s+").toSeq } else { s"""$startScript @@ -154,7 +194,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port | --driver-class-path ${sys.props("java.class.path")} - | --conf spark.ui.enabled=false + | --conf spark.ui.port=$uiPort """.stripMargin.split("\\s+").toSeq } @@ -384,4 +424,33 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } } + + test("SPARK-5100 monitor page") { + val uiPort = randomListeningPort + withThriftUIAndJDBC(uiPort) { statement => + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") + + queries.foreach(statement.execute) + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(s"http://localhost:$uiPort").mkString + + // check whether new page exists + assert(html.toLowerCase.contains("thriftserver")) + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(s"http://localhost:$uiPort/ThriftServer/").mkString + assert(!html.contains("random data that should not be present")) + + // check whether statements exists + queries.foreach{ line => + assert(html.toLowerCase.contains(line.toLowerCase)) + } + } + } + } } diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index 166c56b9dfe2..34c27fcd84a2 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} -import java.util.{ArrayList => JArrayList, Map => JMap} +import java.util.{ArrayList => JArrayList, Map => JMap, UUID} import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -183,8 +183,10 @@ private[hive] class SparkExecuteStatementOperation( } def run(): Unit = { + val sid = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) + SparkSQLEnv.sqlEventListener.onStart(sid, parentSession, statement) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -198,6 +200,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } + SparkSQLEnv.sqlEventListener.onParse(sid, result.queryExecution.toString(), groupId) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -214,9 +217,11 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) + SparkSQLEnv.sqlEventListener.onError(sid, e.getMessage, e.getStackTraceString) logError("Error executing query:",e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) + SparkSQLEnv.sqlEventListener.onFinish(sid) } } diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index eaf7a1ddd499..d57c0b7b6ce8 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} -import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} +import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID} import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -154,8 +154,10 @@ private[hive] class SparkExecuteStatementOperation( } def run(): Unit = { + val sid = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) + SparkSQLEnv.sqlEventListener.onStart(sid, parentSession, statement) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -169,6 +171,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } + SparkSQLEnv.sqlEventListener.onParse(sid, result.queryExecution.toString(), groupId) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -185,9 +188,11 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) + SparkSQLEnv.sqlEventListener.onError(sid, e.getMessage, e.getStackTraceString) logError("Error executing query:", e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) + SparkSQLEnv.sqlEventListener.onFinish(sid) } } From 3cca2fb8d1c15f6561cc583fb5b96e22609454dd Mon Sep 17 00:00:00 2001 From: tianyi Date: Tue, 13 Jan 2015 18:23:03 +0800 Subject: [PATCH 2/7] fix bugs, remove IP trace due to hive-0.12 not support --- .../hive/thriftserver/HiveThriftServer2.scala | 4 +- .../thriftserver/SparkSQLCLIService.scala | 8 +- .../sql/hive/thriftserver/SparkSQLEnv.scala | 3 +- .../thriftserver/SparkSQLSessionManager.scala | 16 ++-- .../thriftserver/ui/ThriftServerPage.scala | 13 +-- .../thriftserver/ui/ThriftServerTab.scala | 4 +- .../ui/ThriftServerUIEventListener.scala | 93 +++++++++++-------- .../thriftserver/HiveThriftServer2Suite.scala | 81 +++++----------- .../spark/sql/hive/thriftserver/Shim12.scala | 28 +++++- .../spark/sql/hive/thriftserver/Shim13.scala | 31 ++++++- 10 files changed, 149 insertions(+), 132 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index dd419e1df4a6..1c0a6b3a7603 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -27,8 +27,8 @@ import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab +import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} /** * The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a @@ -95,7 +95,7 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) with ReflectedCompositeService { private[hive] val uiTab: Option[ThriftServerTab] = - if (hiveContext.hiveconf.getBoolean("spark.ui.enabled", true)) { + if (hiveContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { Some(new ThriftServerTab()) } else { None diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 499e077d7294..2bb7591f23d5 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -73,15 +73,15 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext) } private[thriftserver] trait ReflectedCompositeService { this: AbstractService => - def initCompositeService(hiveConf: HiveConf) { + def initCompositeService(hiveConf: HiveConf, level: Int = 2) { // Emulating `CompositeService.init(hiveConf)` - val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") + val serviceList = getAncestorField[JList[Service]](this, level, "serviceList") serviceList.foreach(_.init(hiveConf)) // Emulating `AbstractService.init(hiveConf)` invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED) - setAncestorField(this, 3, "hiveConf", hiveConf) + setAncestorField(this, level + 1, "hiveConf", hiveConf) invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED) - getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.") + getAncestorField[Log](this, level + 1, "LOG").info(s"Service: $getName is inited.") } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 2bba1df099ee..d77b928ef0cf 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.hive.thriftserver -import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerUIEventListener - import scala.collection.JavaConversions._ import org.apache.spark.scheduler.StatsReportListener import org.apache.spark.sql.hive.{HiveShim, HiveContext} +import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerUIEventListener import org.apache.spark.{Logging, SparkConf, SparkContext} /** A singleton object for the master program. The slaves should not access this. */ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index e41713a78c7f..ae7c6a82f1ee 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -31,27 +31,29 @@ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager import org.apache.hive.service.cli.{HiveSQLException, SessionHandle} private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) - extends SessionManager + extends SparkSQLSessionManagerShim(hiveContext) with ReflectedCompositeService { private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) override def init(hiveConf: HiveConf) { - setSuperField(this, "hiveConf", hiveConf) + + setAncestorField(this, 2, "hiveConf", hiveConf) val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) - setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) - getAncestorField[Log](this, 3, "LOG").info( + setAncestorField(this, 2, "backgroundOperationPool", + Executors.newFixedThreadPool(backgroundPoolSize)) + getAncestorField[Log](this, 4, "LOG").info( s"HiveServer2: Async execution pool size $backgroundPoolSize") - setSuperField(this, "operationManager", sparkSqlOperationManager) + setAncestorField(this, 2, "operationManager", sparkSqlOperationManager) addService(sparkSqlOperationManager) - initCompositeService(hiveConf) + initCompositeService(hiveConf, 3) } override def closeSession(sessionHandle: SessionHandle) { - SparkSQLEnv.sqlEventListener.onDisconnected(super.getSession(sessionHandle)) + SparkSQLEnv.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) super.closeSession(sessionHandle) sparkSqlOperationManager.sessionToActivePool -= sessionHandle } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index fa3419eef784..a96375e7daa8 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -24,13 +24,11 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.Logging import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui._ -import org.apache.spark.util.Distribution import scala.xml.Node /** Page for Spark Web UI that shows statistics of a streaming job */ -private[ui] class ThriftServerPage(parent: ThriftServerTab) - extends WebUIPage("") with Logging { +private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") with Logging { private val listener = parent.listener private val startTime = Calendar.getInstance().getTime() @@ -59,7 +57,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) } - /** Generate stats of batch jobs of the streaming program */ + /** Generate stats of batch statements of the thrift server program */ private def generateSQLStatsTable(): Seq[Node] = { val numBatches = listener.executeList.size val table = if (numBatches > 0) { @@ -81,7 +79,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) {formatDurationOption(Some(info.totalTime))} {info.statement} {info.state} - {errorMessageCell(detail)} + {errorMessageCell(detail)} } @@ -126,7 +124,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) {errorSummary}{details} } - /** Generate stats of batch jobs of the streaming program */ + /** Generate stats of batch sessions of the thrift server program */ private def generateSessionStatsTable(): Seq[Node] = { val numBatches = listener.sessionList.size val table = if (numBatches > 0) { @@ -134,7 +132,6 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map(session =>{ Seq( session.session.getUsername, - session.session.getIpAddress, session.sessionID, formatDate(session.startTimestamp), formatDate(session.finishTimestamp), @@ -142,7 +139,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) session.totalExecute.toString ) }).toSeq - val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration", + val headerRow = Seq("User", "Session ID", "Start Time", "Finish Time", "Duration", "Total Execute") Some(listingTable(headerRow, dataRows)) } else { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala index 7dbf66b435cf..b0d5f102836c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala @@ -26,7 +26,7 @@ import org.apache.spark.{Logging, SparkException} * Spark Web UI tab that shows statistics of a streaming job. * This assumes the given SparkContext has enabled its SparkUI. */ -private[hive] class ThriftServerTab() +private[thriftserver] class ThriftServerTab() extends SparkUITab(getSparkUI(), "ThriftServer") with Logging { val parent = getSparkUI() @@ -36,7 +36,7 @@ private[hive] class ThriftServerTab() parent.attachTab(this) } -private object ThriftServerTab { +private[thriftserver] object ThriftServerTab { def getSparkUI(): SparkUI = { SparkSQLEnv.sparkContext.ui.getOrElse { throw new SparkException("Parent SparkUI to attach this tab to not found!") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala index 6ec44074f731..e3ffec363f56 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala @@ -24,18 +24,36 @@ import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener} import scala.collection.mutable.HashMap -trait ThriftServerEventListener { - - def onConnected(session: HiveSession) { } - def onDisconnected(session: HiveSession) { } - - def onStart(id: String, session: HiveSession, statement: String) { } - def onParse(id: String, executePlan: String, groupId: String) { } - def onError(id: String, errorMessage: String, errorTrace: String) { } - def onFinish(id: String) { } +private[thriftserver] trait ThriftServerEventListener { + /** + * Called when a session created. + */ + def onSessionCreated(session: HiveSession) { } + + /** + * Called when a session closed. + */ + def onSessionClosed(session: HiveSession) { } + + /** + * Called when a statement started to run. + */ + def onStatementStart(id: String, session: HiveSession, statement: String) { } + /** + * Called when a statement completed compilation. + */ + def onStatementParse(id: String, executePlan: String, groupId: String) { } + /** + * Called when a statement got a error during running. + */ + def onStatementError(id: String, errorMessage: String, errorTrace: String) { } + /** + * Called when a statement ran success. + */ + def onStatementFinish(id: String) { } } -class SessionInfo(val session: HiveSession, val startTimestamp: Long) { +private[thriftserver] class SessionInfo(val session: HiveSession, val startTimestamp: Long) { val sessionID = session.getSessionHandle.getSessionId.toString var finishTimestamp = 0L var totalExecute = 0 @@ -49,12 +67,15 @@ class SessionInfo(val session: HiveSession, val startTimestamp: Long) { } } -object ExecutionState extends Enumeration { +private[thriftserver] object ExecutionState extends Enumeration { val STARTED, COMPILED, FAILED, FINISHED = Value type ExecutionState = Value } -class ExecutionInfo(val statement: String, val session: HiveSession, val startTimestamp: Long) { +private[thriftserver] class ExecutionInfo( + val statement: String, + val session: HiveSession, + val startTimestamp: Long) { var finishTimestamp = 0L var executePlan = "" var detail = "" @@ -70,7 +91,7 @@ class ExecutionInfo(val statement: String, val session: HiveSession, val startTi } } -class ThriftServerUIEventListener(val conf: SparkConf) +private[sql] class ThriftServerUIEventListener(val conf: SparkConf) extends ThriftServerEventListener with SparkListener { import ThriftServerUIEventListener._ @@ -89,34 +110,31 @@ class ThriftServerUIEventListener(val conf: SparkConf) group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) ) yield group - executeList.foreach { - case (id: String, info: ExecutionInfo) if info.groupId == jobGroup.get => { - executeList(id).jobId = jobStart.jobId.toString + jobGroup match { + case Some(groupId: String) => { + val ret = executeList.find( _ match { + case (id: String, info: ExecutionInfo) => { + info.groupId == groupId + } + }) + if(ret.isDefined) { + ret.get._2.jobId = jobStart.jobId.toString + } } } } - override def onConnected(session: HiveSession): Unit = { + override def onSessionCreated(session: HiveSession): Unit = { val info = new SessionInfo(session, System.currentTimeMillis()) sessionList(session.getSessionHandle) = info trimSessionIfNecessary() } - override def onDisconnected(session: HiveSession): Unit = { - if(!sessionList.contains(session.getSessionHandle)) { - onConnected(session) - } + override def onSessionClosed(session: HiveSession): Unit = { sessionList(session.getSessionHandle).finishTimestamp = System.currentTimeMillis() - } - override def onStart(id: String, session: HiveSession, statement: String): Unit = { - // TODO: Due to the incompatible interface between different hive version, - // we can't get the session start event. - // So we have to update session information from here. - if(!sessionList.contains(session.getSessionHandle)) { - onConnected(session) - } + override def onStatementStart(id: String, session: HiveSession, statement: String): Unit = { val info = new ExecutionInfo(statement, session, System.currentTimeMillis()) info.state = ExecutionState.STARTED executeList(id) = info @@ -125,21 +143,20 @@ class ThriftServerUIEventListener(val conf: SparkConf) totalRunning += 1 } - override def onParse(id: String, executePlan: String, groupId: String): Unit = { + override def onStatementParse(id: String, executePlan: String, groupId: String): Unit = { executeList(id).executePlan = executePlan executeList(id).groupId = groupId executeList(id).state = ExecutionState.COMPILED } - override def onError(id: String, errorMessage: String, errorTrace: String): Unit = { + override def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = { executeList(id).finishTimestamp = System.currentTimeMillis() executeList(id).detail = errorMessage - //+ "

" + errorTrace executeList(id).state = ExecutionState.FAILED totalRunning -= 1 } - override def onFinish(id: String): Unit = { + override def onStatementFinish(id: String): Unit = { executeList(id).finishTimestamp = System.currentTimeMillis() executeList(id).state = ExecutionState.FINISHED totalRunning -= 1 @@ -155,14 +172,14 @@ class ThriftServerUIEventListener(val conf: SparkConf) } private def compareExecutionDesc( - l:(String, ExecutionInfo), - r:(String, ExecutionInfo)): Boolean = { + l: (String, ExecutionInfo), + r: (String, ExecutionInfo)): Boolean = { l._2.startTimestamp < r._2.startTimestamp } private def compareSessionDesc( - l:(SessionHandle, SessionInfo), - r:(SessionHandle, SessionInfo)): Boolean = { + l: (SessionHandle, SessionInfo), + r: (SessionHandle, SessionInfo)): Boolean = { l._2.startTimestamp < r._2.startTimestamp } @@ -176,7 +193,7 @@ class ThriftServerUIEventListener(val conf: SparkConf) } } -private object ThriftServerUIEventListener { +private[thriftserver] object ThriftServerUIEventListener { val DEFAULT_RETAINED_SESSIONS = 1000 val DEFAULT_RETAINED_STATEMENTS = 1000 } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index 07a11014c555..10a0c52f16a6 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -64,50 +64,22 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") } - def randomListeningPort = { + def randomListeningPorts(n: Int) = { // Let the system to choose a random available port to avoid collision with other parallel // builds. - val socket = new ServerSocket(0) - val port = socket.getLocalPort - socket.close() - port - } - - def withThriftUIAndJDBC( - uiPort: Int, - serverStartTimeout: FiniteDuration = 1.minute, - httpMode: Boolean = false)( - f: Statement => Unit) { - val port = randomListeningPort - - startThriftServerWithUIPort(port, uiPort, serverStartTimeout, httpMode) { - val jdbcUri = if (httpMode) { - s"jdbc:hive2://${"localhost"}:$port/" + - "default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice" - } else { - s"jdbc:hive2://${"localhost"}:$port/" - } - - val user = System.getProperty("user.name") - val connection = DriverManager.getConnection(jdbcUri, user, "") - val statement = connection.createStatement() - - try { - f(statement) - } finally { - statement.close() - connection.close() - } - } + val sockets = Array.fill(n)(new ServerSocket(0)) + val ports = sockets.map(_.getLocalPort) + sockets.foreach(_.close()) + ports } def withJdbcStatement( serverStartTimeout: FiniteDuration = 1.minute, httpMode: Boolean = false)( - f: Statement => Unit) { - val port = randomListeningPort + f: (Int, Int, Statement) => Unit) { + val Array(port, uiPort) = randomListeningPorts(2) - startThriftServer(port, serverStartTimeout, httpMode) { + startThriftServer(port, uiPort, serverStartTimeout, httpMode) { val jdbcUri = if (httpMode) { s"jdbc:hive2://${"localhost"}:$port/" + "default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice" @@ -120,7 +92,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val statement = connection.createStatement() try { - f(statement) + f(port, uiPort, statement) } finally { statement.close() connection.close() @@ -131,9 +103,9 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def withCLIServiceClient( serverStartTimeout: FiniteDuration = 1.minute)( f: ThriftCLIServiceClient => Unit) { - val port = randomListeningPort + val Array(port, uiPort) = randomListeningPorts(2) - startThriftServer(port) { + startThriftServer(port, uiPort) { // Transport creation logics below mimics HiveConnection.createBinaryTransport val rawTransport = new TSocket("localhost", port) val user = System.getProperty("user.name") @@ -152,19 +124,11 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } def startThriftServer( - thriftPort: Int, - serverStartTimeout: FiniteDuration = 1.minute, - httpMode: Boolean = false)( - f: => Unit): Unit = { - startThriftServerWithUIPort(thriftPort, randomListeningPort, serverStartTimeout, httpMode)(f) - } - - def startThriftServerWithUIPort( port: Int, uiPort: Int, serverStartTimeout: FiniteDuration = 1.minute, httpMode: Boolean = false)( - f: => Unit) { + f: => Unit): Unit = { val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) @@ -268,7 +232,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("Test JDBC query execution") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "SET spark.sql.shuffle.partitions=3", "DROP TABLE IF EXISTS test", @@ -287,7 +251,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("Test JDBC query execution in Http Mode") { - withJdbcStatement(httpMode = true) { statement => + withJdbcStatement(httpMode = true) { (port, uiPort, statement) => val queries = Seq( "SET spark.sql.shuffle.partitions=3", "DROP TABLE IF EXISTS test", @@ -306,7 +270,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-3004 regression: result set containing NULL") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_null", "CREATE TABLE test_null(key INT, val STRING)", @@ -348,7 +312,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("Checks Hive version") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}") @@ -356,7 +320,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("Checks Hive version in Http Mode") { - withJdbcStatement(httpMode = true) { statement => + withJdbcStatement(httpMode = true) { (port, uiPort, statement) => val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}") @@ -364,7 +328,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-4292 regression: result set iterator issue") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_4292", "CREATE TABLE test_4292(key INT, val STRING)", @@ -384,7 +348,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-4309 regression: Date type support") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_date", "CREATE TABLE test_date(key INT, value STRING)", @@ -402,7 +366,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-4407 regression: Complex type support") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", @@ -426,8 +390,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-5100 monitor page") { - val uiPort = randomListeningPort - withThriftUIAndJDBC(uiPort) { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", @@ -447,7 +410,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { assert(!html.contains("random data that should not be present")) // check whether statements exists - queries.foreach{ line => + queries.foreach { line => assert(html.toLowerCase.contains(line.toLowerCase)) } } diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index 34c27fcd84a2..cd4d798b6158 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} import java.util.{ArrayList => JArrayList, Map => JMap, UUID} +import org.apache.hive.service.cli.thrift.TProtocolVersion + import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -29,7 +31,7 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation -import org.apache.hive.service.cli.session.HiveSession +import org.apache.hive.service.cli.session.{SessionManager, HiveSession} import org.apache.spark.Logging import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow} @@ -63,6 +65,22 @@ private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveC } } +private[hive] class SparkSQLSessionManagerShim(hiveContext: HiveContext) extends SessionManager { + + @throws(classOf[HiveSQLException]) + override def openSession( + username: java.lang.String, + password: java.lang.String, + sessionConf: java.util.Map[java.lang.String, java.lang.String], + withImpersonation: Boolean, + delegationToken: java.lang.String): SessionHandle = { + val ret = super.openSession(username, password, + sessionConf, withImpersonation, delegationToken) + SparkSQLEnv.sqlEventListener.onSessionCreated(super.getSession(ret)) + ret + } +} + private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, @@ -186,7 +204,7 @@ private[hive] class SparkExecuteStatementOperation( val sid = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) - SparkSQLEnv.sqlEventListener.onStart(sid, parentSession, statement) + SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -200,7 +218,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } - SparkSQLEnv.sqlEventListener.onParse(sid, result.queryExecution.toString(), groupId) + SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString(), groupId) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -217,11 +235,11 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) - SparkSQLEnv.sqlEventListener.onError(sid, e.getMessage, e.getStackTraceString) + SparkSQLEnv.sqlEventListener.onStatementError(sid, e.getMessage, e.getStackTraceString) logError("Error executing query:",e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) - SparkSQLEnv.sqlEventListener.onFinish(sid) + SparkSQLEnv.sqlEventListener.onStatementFinish(sid) } } diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index d57c0b7b6ce8..a84b49f2d894 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID} +import org.apache.hive.service.auth.TSetIpAddressProcessor +import org.apache.hive.service.cli.thrift.TProtocolVersion + import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -27,7 +30,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation -import org.apache.hive.service.cli.session.HiveSession +import org.apache.hive.service.cli.session._ import org.apache.spark.Logging import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} @@ -62,6 +65,24 @@ private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveC } } +private[hive] class SparkSQLSessionManagerShim( + hiveContext: HiveContext) extends SessionManager { + + @throws(classOf[HiveSQLException]) + override def openSession( + protocol: TProtocolVersion, + username: java.lang.String, + password: java.lang.String, + sessionConf: java.util.Map[java.lang.String, java.lang.String], + withImpersonation: Boolean, + delegationToken: java.lang.String): SessionHandle = { + val ret = super.openSession(protocol, username, password, + sessionConf, withImpersonation, delegationToken) + SparkSQLEnv.sqlEventListener.onSessionCreated(super.getSession(ret)) + ret + } +} + private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, @@ -157,7 +178,7 @@ private[hive] class SparkExecuteStatementOperation( val sid = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) - SparkSQLEnv.sqlEventListener.onStart(sid, parentSession, statement) + SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -171,7 +192,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } - SparkSQLEnv.sqlEventListener.onParse(sid, result.queryExecution.toString(), groupId) + SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString(), groupId) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -188,11 +209,11 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) - SparkSQLEnv.sqlEventListener.onError(sid, e.getMessage, e.getStackTraceString) + SparkSQLEnv.sqlEventListener.onStatementError(sid, e.getMessage, e.getStackTraceString) logError("Error executing query:", e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) - SparkSQLEnv.sqlEventListener.onFinish(sid) + SparkSQLEnv.sqlEventListener.onStatementFinish(sid) } } From 703e3316a92b9a39c61edbd8a34c64d82d653e04 Mon Sep 17 00:00:00 2001 From: tianyi Date: Wed, 14 Jan 2015 15:20:25 +0800 Subject: [PATCH 3/7] fix some bug for hive-0.12 --- .../thriftserver/ui/ThriftServerPage.scala | 10 ++++--- .../ui/ThriftServerUIEventListener.scala | 29 +++++++++++-------- .../spark/sql/hive/thriftserver/Shim12.scala | 6 ++-- .../spark/sql/hive/thriftserver/Shim13.scala | 5 ++-- 4 files changed, 28 insertions(+), 22 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index a96375e7daa8..ba46d19358d2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -61,7 +61,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" private def generateSQLStatsTable(): Seq[Node] = { val numBatches = listener.executeList.size val table = if (numBatches > 0) { - val headerRow = Seq("User", "JobID", "Start Time", "Finish Time", "Duration", + val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration", "Statement", "State", "Detail") val dataRows = listener.executeList.values.toSeq.sortBy(_.startTimestamp).reverse @@ -70,10 +70,11 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" .format(UIUtils.prependBaseUri(parent.basePath), info.jobId) val detail = if(info.state == ExecutionState.FAILED) info.detail else info.executePlan - {info.session.getUsername} + {info.userName} {info.jobId} + {info.groupId} {formatDate(info.startTimestamp)} {formatDate(info.finishTimestamp)} {formatDurationOption(Some(info.totalTime))} @@ -131,7 +132,8 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" val dataRows = listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map(session =>{ Seq( - session.session.getUsername, + session.userName, + session.ip, session.sessionID, formatDate(session.startTimestamp), formatDate(session.finishTimestamp), @@ -139,7 +141,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" session.totalExecute.toString ) }).toSeq - val headerRow = Seq("User", "Session ID", "Start Time", "Finish Time", "Duration", + val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration", "Total Execute") Some(listingTable(headerRow, dataRows)) } else { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala index e3ffec363f56..655be8cee563 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala @@ -28,7 +28,7 @@ private[thriftserver] trait ThriftServerEventListener { /** * Called when a session created. */ - def onSessionCreated(session: HiveSession) { } + def onSessionCreated(ip: String, session: HiveSession) { } /** * Called when a session closed. @@ -42,7 +42,7 @@ private[thriftserver] trait ThriftServerEventListener { /** * Called when a statement completed compilation. */ - def onStatementParse(id: String, executePlan: String, groupId: String) { } + def onStatementParse(id: String, executePlan: String) { } /** * Called when a statement got a error during running. */ @@ -53,8 +53,12 @@ private[thriftserver] trait ThriftServerEventListener { def onStatementFinish(id: String) { } } -private[thriftserver] class SessionInfo(val session: HiveSession, val startTimestamp: Long) { +private[thriftserver] class SessionInfo( + val session: HiveSession, + val startTimestamp: Long, + val ip: String) { val sessionID = session.getSessionHandle.getSessionId.toString + val userName = if(session.getUserName == null) "UNKNOWN" else session.getUserName var finishTimestamp = 0L var totalExecute = 0 @@ -76,12 +80,13 @@ private[thriftserver] class ExecutionInfo( val statement: String, val session: HiveSession, val startTimestamp: Long) { + val userName = if(session.getUserName == null) "UNKNOWN" else session.getUserName var finishTimestamp = 0L var executePlan = "" var detail = "" var state: ExecutionState.Value = ExecutionState.STARTED - var groupId = "" var jobId = "" + var groupId = "" def totalTime = { if (finishTimestamp == 0L) { System.currentTimeMillis() - startTimestamp @@ -107,25 +112,26 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf) override def onJobStart(jobStart: SparkListenerJobStart): Unit = { val jobGroup = for ( props <- Option(jobStart.properties); - group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) - ) yield group + statement <- Option(props.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) + ) yield statement jobGroup match { - case Some(groupId: String) => { + case Some(statement: String) => { val ret = executeList.find( _ match { case (id: String, info: ExecutionInfo) => { - info.groupId == groupId + info.statement == statement } }) if(ret.isDefined) { ret.get._2.jobId = jobStart.jobId.toString + ret.get._2.groupId = jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID,"") } } } } - override def onSessionCreated(session: HiveSession): Unit = { - val info = new SessionInfo(session, System.currentTimeMillis()) + override def onSessionCreated(ip: String, session: HiveSession): Unit = { + val info = new SessionInfo(session, System.currentTimeMillis(), ip) sessionList(session.getSessionHandle) = info trimSessionIfNecessary() } @@ -143,9 +149,8 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf) totalRunning += 1 } - override def onStatementParse(id: String, executePlan: String, groupId: String): Unit = { + override def onStatementParse(id: String, executePlan: String): Unit = { executeList(id).executePlan = executePlan - executeList(id).groupId = groupId executeList(id).state = ExecutionState.COMPILED } diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index cd4d798b6158..7f97ca05e6ae 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} import java.util.{ArrayList => JArrayList, Map => JMap, UUID} -import org.apache.hive.service.cli.thrift.TProtocolVersion - import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -76,7 +74,7 @@ private[hive] class SparkSQLSessionManagerShim(hiveContext: HiveContext) extends delegationToken: java.lang.String): SessionHandle = { val ret = super.openSession(username, password, sessionConf, withImpersonation, delegationToken) - SparkSQLEnv.sqlEventListener.onSessionCreated(super.getSession(ret)) + SparkSQLEnv.sqlEventListener.onSessionCreated("UNKNOWN", super.getSession(ret)) ret } } @@ -218,7 +216,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } - SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString(), groupId) + SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index a84b49f2d894..e48076e44ed1 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -78,7 +78,8 @@ private[hive] class SparkSQLSessionManagerShim( delegationToken: java.lang.String): SessionHandle = { val ret = super.openSession(protocol, username, password, sessionConf, withImpersonation, delegationToken) - SparkSQLEnv.sqlEventListener.onSessionCreated(super.getSession(ret)) + val session = super.getSession(ret) + SparkSQLEnv.sqlEventListener.onSessionCreated("UNKNOWN", session) ret } } @@ -192,7 +193,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } - SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString(), groupId) + SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean From daed3d126a5112d9e4e94fac7592ff804775ec05 Mon Sep 17 00:00:00 2001 From: tianyi Date: Mon, 19 Jan 2015 12:14:35 +0800 Subject: [PATCH 4/7] fix some issues --- .../hive/thriftserver/HiveThriftServer2.scala | 2 +- .../thriftserver/SparkSQLSessionManager.scala | 4 +- .../thriftserver/ui/ThriftServerPage.scala | 38 ++++++++++--------- .../thriftserver/ui/ThriftServerTab.scala | 12 +++--- .../ui/ThriftServerUIEventListener.scala | 35 +++++++---------- .../spark/sql/hive/thriftserver/Shim12.scala | 13 +++++-- .../spark/sql/hive/thriftserver/Shim13.scala | 12 ++++-- 7 files changed, 62 insertions(+), 54 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 1c0a6b3a7603..6011550675c7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -96,7 +96,7 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) private[hive] val uiTab: Option[ThriftServerTab] = if (hiveContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { - Some(new ThriftServerTab()) + Some(new ThriftServerTab(hiveContext.sparkContext)) } else { None } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index ae7c6a82f1ee..556f68a20ad6 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -22,13 +22,11 @@ import java.util.concurrent.Executors import org.apache.commons.logging.Log import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.service.cli.session.SessionManager -import org.apache.hive.service.cli.thrift.TProtocolVersion import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -import org.apache.hive.service.cli.{HiveSQLException, SessionHandle} +import org.apache.hive.service.cli.SessionHandle private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) extends SparkSQLSessionManagerShim(hiveContext) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index ba46d19358d2..2db9e9adf401 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -37,10 +37,14 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" /** Render the page */ def render(request: HttpServletRequest): Seq[Node] = { val content = - generateBasicStats() ++

++ -

Total {listener.sessionList.size} session online, - Total {listener.totalRunning} sql running

++ - generateSessionStatsTable() ++ generateSQLStatsTable() + generateBasicStats() ++ +
++ +

+ Total {listener.sessionList.size} session online, + Total {listener.totalRunning} sql running +

++ + generateSessionStatsTable() ++ + generateSQLStatsTable() UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000)) } @@ -59,8 +63,8 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" /** Generate stats of batch statements of the thrift server program */ private def generateSQLStatsTable(): Seq[Node] = { - val numBatches = listener.executeList.size - val table = if (numBatches > 0) { + val numStatement = listener.executeList.size + val table = if (numStatement > 0) { val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration", "Statement", "State", "Detail") val dataRows = listener.executeList.values.toSeq.sortBy(_.startTimestamp).reverse @@ -113,11 +117,11 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" // scalastyle:off - +details + + details ++ - + // scalastyle:on } else { "" @@ -130,7 +134,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" val numBatches = listener.sessionList.size val table = if (numBatches > 0) { val dataRows = - listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map(session =>{ + listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map ( session => Seq( session.userName, session.ip, @@ -140,7 +144,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" formatDurationOption(Some(session.totalTime)), session.totalExecute.toString ) - }).toSeq + ).toSeq val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration", "Total Execute") Some(listingTable(headerRow, dataRows)) @@ -150,11 +154,11 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" val content =
Session Statistics
++ -
-
    - {table.getOrElse("No statistics have been generated yet.")} -
-
+
+
    + {table.getOrElse("No statistics have been generated yet.")} +
+
content } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala index b0d5f102836c..356efedd6bd7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala @@ -20,16 +20,16 @@ package org.apache.spark.sql.hive.thriftserver.ui import org.apache.spark.sql.hive.thriftserver.SparkSQLEnv import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab._ import org.apache.spark.ui.{SparkUI, SparkUITab} -import org.apache.spark.{Logging, SparkException} +import org.apache.spark.{SparkContext, Logging, SparkException} /** * Spark Web UI tab that shows statistics of a streaming job. * This assumes the given SparkContext has enabled its SparkUI. */ -private[thriftserver] class ThriftServerTab() - extends SparkUITab(getSparkUI(), "ThriftServer") with Logging { +private[thriftserver] class ThriftServerTab(sparkContext: SparkContext) + extends SparkUITab(getSparkUI(sparkContext), "ThriftServer") with Logging { - val parent = getSparkUI() + val parent = getSparkUI(sparkContext) val listener = SparkSQLEnv.sqlEventListener attachPage(new ThriftServerPage(this)) @@ -37,8 +37,8 @@ private[thriftserver] class ThriftServerTab() } private[thriftserver] object ThriftServerTab { - def getSparkUI(): SparkUI = { - SparkSQLEnv.sparkContext.ui.getOrElse { + def getSparkUI(sparkContext: SparkContext): SparkUI = { + sparkContext.ui.getOrElse { throw new SparkException("Parent SparkUI to attach this tab to not found!") } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala index 655be8cee563..39d776e5062e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala @@ -38,15 +38,18 @@ private[thriftserver] trait ThriftServerEventListener { /** * Called when a statement started to run. */ - def onStatementStart(id: String, session: HiveSession, statement: String) { } + def onStatementStart(id: String, session: HiveSession, statement: String, groupId: String) { } + /** * Called when a statement completed compilation. */ def onStatementParse(id: String, executePlan: String) { } + /** * Called when a statement got a error during running. */ def onStatementError(id: String, errorMessage: String, errorTrace: String) { } + /** * Called when a statement ran success. */ @@ -58,7 +61,7 @@ private[thriftserver] class SessionInfo( val startTimestamp: Long, val ip: String) { val sessionID = session.getSessionHandle.getSessionId.toString - val userName = if(session.getUserName == null) "UNKNOWN" else session.getUserName + val userName = if (session.getUserName == null) "UNKNOWN" else session.getUserName var finishTimestamp = 0L var totalExecute = 0 @@ -112,19 +115,19 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf) override def onJobStart(jobStart: SparkListenerJobStart): Unit = { val jobGroup = for ( props <- Option(jobStart.properties); - statement <- Option(props.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) + statement <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) ) yield statement jobGroup match { - case Some(statement: String) => { + case Some(groupId: String) => { val ret = executeList.find( _ match { case (id: String, info: ExecutionInfo) => { - info.statement == statement + info.jobId == "" && info.groupId == groupId } }) if(ret.isDefined) { ret.get._2.jobId = jobStart.jobId.toString - ret.get._2.groupId = jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID,"") + ret.get._2.groupId = groupId } } } @@ -140,12 +143,14 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf) sessionList(session.getSessionHandle).finishTimestamp = System.currentTimeMillis() } - override def onStatementStart(id: String, session: HiveSession, statement: String): Unit = { + override def onStatementStart(id: String, session: HiveSession, + statement: String, groupId:String): Unit = { val info = new ExecutionInfo(statement, session, System.currentTimeMillis()) info.state = ExecutionState.STARTED executeList(id) = info trimExecutionIfNecessary() sessionList(session.getSessionHandle).totalExecute += 1 + executeList(id).groupId = groupId totalRunning += 1 } @@ -170,28 +175,16 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf) private def trimExecutionIfNecessary() = synchronized { if (executeList.size > retainedStatements) { val toRemove = math.max(retainedStatements / 10, 1) - executeList.toList.sortWith(compareExecutionDesc).take(toRemove).foreach { s => + executeList.toList.sortBy(_._2.startTimestamp).take(toRemove).foreach { s => executeList.remove(s._1) } } } - private def compareExecutionDesc( - l: (String, ExecutionInfo), - r: (String, ExecutionInfo)): Boolean = { - l._2.startTimestamp < r._2.startTimestamp - } - - private def compareSessionDesc( - l: (SessionHandle, SessionInfo), - r: (SessionHandle, SessionInfo)): Boolean = { - l._2.startTimestamp < r._2.startTimestamp - } - private def trimSessionIfNecessary() = synchronized { if (sessionList.size > retainedSessions) { val toRemove = math.max(retainedSessions / 10, 1) - sessionList.toList.sortWith(compareSessionDesc).take(toRemove).foreach { s => + sessionList.toList.sortBy(_._2.startTimestamp).take(toRemove).foreach { s => sessionList.remove(s._1) } } diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index 7f97ca05e6ae..49e1d475056c 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -31,8 +31,9 @@ import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.{SessionManager, HiveSession} -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow} +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.SetCommand import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} @@ -202,7 +203,14 @@ private[hive] class SparkExecuteStatementOperation( val sid = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) - SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement) + val group = hiveContext.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) match { + case groupId: String => + hiveContext.sparkContext.setJobDescription(statement) + groupId + case _ => hiveContext.sparkContext.setJobGroup(sid, statement) + sid + } + SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement, group) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -212,7 +220,6 @@ private[hive] class SparkExecuteStatementOperation( logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } - hiveContext.sparkContext.setJobDescription(statement) sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index e48076e44ed1..7e9423ea262d 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -32,7 +32,7 @@ import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session._ -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} import org.apache.spark.sql.execution.SetCommand import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ @@ -179,7 +179,14 @@ private[hive] class SparkExecuteStatementOperation( val sid = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) - SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement) + val group = hiveContext.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) match { + case groupId: String => + hiveContext.sparkContext.setJobDescription(statement) + groupId + case _ => hiveContext.sparkContext.setJobGroup(sid, statement) + sid + } + SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement, group) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -189,7 +196,6 @@ private[hive] class SparkExecuteStatementOperation( logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } - hiveContext.sparkContext.setJobDescription(statement) sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } From fb507df555db0084ea7d91ae8a7167d0164480c0 Mon Sep 17 00:00:00 2001 From: tianyi Date: Mon, 19 Jan 2015 13:07:06 +0800 Subject: [PATCH 5/7] fix shim12 rebase issue; make session id as the default groupId --- .../scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index 49e1d475056c..79ddb53d89a8 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -33,7 +33,6 @@ import org.apache.hive.service.cli.session.{SessionManager, HiveSession} import org.apache.spark.{SparkContext, Logging} import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow} -import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.execution.SetCommand import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} @@ -207,7 +206,8 @@ private[hive] class SparkExecuteStatementOperation( case groupId: String => hiveContext.sparkContext.setJobDescription(statement) groupId - case _ => hiveContext.sparkContext.setJobGroup(sid, statement) + case _ => hiveContext.sparkContext + .setJobGroup(parentSession.getSessionHandle.getSessionId.toString, statement) sid } SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement, group) diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index 7e9423ea262d..0468358a2a8f 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -183,7 +183,8 @@ private[hive] class SparkExecuteStatementOperation( case groupId: String => hiveContext.sparkContext.setJobDescription(statement) groupId - case _ => hiveContext.sparkContext.setJobGroup(sid, statement) + case _ => hiveContext.sparkContext + .setJobGroup(parentSession.getSessionHandle.getSessionId.toString, statement) sid } SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement, group) From 32332cb7830fbf108c6697fb66929a28ee914b3a Mon Sep 17 00:00:00 2001 From: tianyi Date: Tue, 20 Jan 2015 12:35:20 +0800 Subject: [PATCH 6/7] move SparkSQLSessionManager to Shim. use statementId as groupId to support multiple jobs for one sql. support multiple jobs for one sql in ThriftServerPage. --- .../thriftserver/SparkSQLCLIService.scala | 8 +-- .../thriftserver/SparkSQLSessionManager.scala | 58 ------------------- .../thriftserver/ui/ThriftServerPage.scala | 9 ++- .../ui/ThriftServerUIEventListener.scala | 24 ++++---- .../spark/sql/hive/thriftserver/Shim12.scala | 56 +++++++++++++----- .../spark/sql/hive/thriftserver/Shim13.scala | 53 ++++++++++++----- 6 files changed, 102 insertions(+), 106 deletions(-) delete mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 2bb7591f23d5..499e077d7294 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -73,15 +73,15 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext) } private[thriftserver] trait ReflectedCompositeService { this: AbstractService => - def initCompositeService(hiveConf: HiveConf, level: Int = 2) { + def initCompositeService(hiveConf: HiveConf) { // Emulating `CompositeService.init(hiveConf)` - val serviceList = getAncestorField[JList[Service]](this, level, "serviceList") + val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") serviceList.foreach(_.init(hiveConf)) // Emulating `AbstractService.init(hiveConf)` invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED) - setAncestorField(this, level + 1, "hiveConf", hiveConf) + setAncestorField(this, 3, "hiveConf", hiveConf) invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED) - getAncestorField[Log](this, level + 1, "LOG").info(s"Service: $getName is inited.") + getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.") } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala deleted file mode 100644 index 556f68a20ad6..000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import java.util.concurrent.Executors - -import org.apache.commons.logging.Log -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars - -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -import org.apache.hive.service.cli.SessionHandle - -private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) - extends SparkSQLSessionManagerShim(hiveContext) - with ReflectedCompositeService { - - private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) - - override def init(hiveConf: HiveConf) { - - setAncestorField(this, 2, "hiveConf", hiveConf) - - val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) - setAncestorField(this, 2, "backgroundOperationPool", - Executors.newFixedThreadPool(backgroundPoolSize)) - getAncestorField[Log](this, 4, "LOG").info( - s"HiveServer2: Async execution pool size $backgroundPoolSize") - - setAncestorField(this, 2, "operationManager", sparkSqlOperationManager) - addService(sparkSqlOperationManager) - - initCompositeService(hiveConf, 3) - } - - override def closeSession(sessionHandle: SessionHandle) { - SparkSQLEnv.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) - super.closeSession(sessionHandle) - sparkSqlOperationManager.sessionToActivePool -= sessionHandle - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index 2db9e9adf401..cdefbe2f969a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -70,13 +70,16 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" val dataRows = listener.executeList.values.toSeq.sortBy(_.startTimestamp).reverse def generateDataRow(info: ExecutionInfo): Seq[Node] = { - val detailUrl = "%s/jobs/job?id=%s" - .format(UIUtils.prependBaseUri(parent.basePath), info.jobId) + val jobLink = info.jobId.map { id: String => + + [{id}] + + } val detail = if(info.state == ExecutionState.FAILED) info.detail else info.executePlan {info.userName} - {info.jobId} + {jobLink} {info.groupId} {formatDate(info.startTimestamp)} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala index 39d776e5062e..08b088ec7737 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala @@ -28,32 +28,36 @@ private[thriftserver] trait ThriftServerEventListener { /** * Called when a session created. */ - def onSessionCreated(ip: String, session: HiveSession) { } + def onSessionCreated(ip: String, session: HiveSession): Unit = {} /** * Called when a session closed. */ - def onSessionClosed(session: HiveSession) { } + def onSessionClosed(session: HiveSession): Unit = {} /** * Called when a statement started to run. */ - def onStatementStart(id: String, session: HiveSession, statement: String, groupId: String) { } + def onStatementStart( + id: String, + session: HiveSession, + statement: String, + groupId: String): Unit = {} /** * Called when a statement completed compilation. */ - def onStatementParse(id: String, executePlan: String) { } + def onStatementParse(id: String, executePlan: String): Unit = {} /** * Called when a statement got a error during running. */ - def onStatementError(id: String, errorMessage: String, errorTrace: String) { } + def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {} /** * Called when a statement ran success. */ - def onStatementFinish(id: String) { } + def onStatementFinish(id: String): Unit = {} } private[thriftserver] class SessionInfo( @@ -88,7 +92,7 @@ private[thriftserver] class ExecutionInfo( var executePlan = "" var detail = "" var state: ExecutionState.Value = ExecutionState.STARTED - var jobId = "" + var jobId = scala.collection.mutable.ArrayBuffer[String]() var groupId = "" def totalTime = { if (finishTimestamp == 0L) { @@ -122,11 +126,11 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf) case Some(groupId: String) => { val ret = executeList.find( _ match { case (id: String, info: ExecutionInfo) => { - info.jobId == "" && info.groupId == groupId + info.groupId == groupId } }) if(ret.isDefined) { - ret.get._2.jobId = jobStart.jobId.toString + ret.get._2.jobId += jobStart.jobId.toString ret.get._2.groupId = groupId } } @@ -144,7 +148,7 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf) } override def onStatementStart(id: String, session: HiveSession, - statement: String, groupId:String): Unit = { + statement: String, groupId: String): Unit = { val info = new ExecutionInfo(statement, session, System.currentTimeMillis()) info.state = ExecutionState.STARTED executeList(id) = info diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index 79ddb53d89a8..df1cf3644b95 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -18,8 +18,14 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} +import java.util.concurrent.Executors import java.util.{ArrayList => JArrayList, Map => JMap, UUID} +import org.apache.commons.logging.Log +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager + import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -63,7 +69,31 @@ private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveC } } -private[hive] class SparkSQLSessionManagerShim(hiveContext: HiveContext) extends SessionManager { +private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) + extends SessionManager with ReflectedCompositeService { + + private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) + + override def init(hiveConf: HiveConf) { + setAncestorField(this, 1, "hiveConf", hiveConf) + + val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) + setAncestorField(this, 1, "backgroundOperationPool", + Executors.newFixedThreadPool(backgroundPoolSize)) + getAncestorField[Log](this, 3, "LOG").info( + s"HiveServer2: Async execution pool size $backgroundPoolSize") + + setAncestorField(this, 1, "operationManager", sparkSqlOperationManager) + addService(sparkSqlOperationManager) + + initCompositeService(hiveConf) + } + + override def closeSession(sessionHandle: SessionHandle) { + SparkSQLEnv.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) + super.closeSession(sessionHandle) + sparkSqlOperationManager.sessionToActivePool -= sessionHandle + } @throws(classOf[HiveSQLException]) override def openSession( @@ -72,8 +102,7 @@ private[hive] class SparkSQLSessionManagerShim(hiveContext: HiveContext) extends sessionConf: java.util.Map[java.lang.String, java.lang.String], withImpersonation: Boolean, delegationToken: java.lang.String): SessionHandle = { - val ret = super.openSession(username, password, - sessionConf, withImpersonation, delegationToken) + val ret = super.openSession(username, password, sessionConf, withImpersonation, delegationToken) SparkSQLEnv.sqlEventListener.onSessionCreated("UNKNOWN", super.getSession(ret)) ret } @@ -199,18 +228,12 @@ private[hive] class SparkExecuteStatementOperation( } def run(): Unit = { - val sid = UUID.randomUUID().toString + val statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) - val group = hiveContext.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) match { - case groupId: String => - hiveContext.sparkContext.setJobDescription(statement) - groupId - case _ => hiveContext.sparkContext - .setJobGroup(parentSession.getSessionHandle.getSessionId.toString, statement) - sid - } - SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement, group) + hiveContext.sparkContext.setJobGroup(statementId, statement) + SparkSQLEnv.sqlEventListener + .onStatementStart(statementId, parentSession, statement, statementId) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -223,7 +246,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } - SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString()) + SparkSQLEnv.sqlEventListener.onStatementParse(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -240,11 +263,12 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) - SparkSQLEnv.sqlEventListener.onStatementError(sid, e.getMessage, e.getStackTraceString) + SparkSQLEnv.sqlEventListener + .onStatementError(statementId, e.getMessage, e.getStackTraceString) logError("Error executing query:",e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) - SparkSQLEnv.sqlEventListener.onStatementFinish(sid) + SparkSQLEnv.sqlEventListener.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index 0468358a2a8f..a8a84e41dfd8 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -18,10 +18,15 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} +import java.util.concurrent.Executors import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID} +import org.apache.commons.logging.Log +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.auth.TSetIpAddressProcessor import org.apache.hive.service.cli.thrift.TProtocolVersion +import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -65,8 +70,31 @@ private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveC } } -private[hive] class SparkSQLSessionManagerShim( - hiveContext: HiveContext) extends SessionManager { +private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) + extends SessionManager with ReflectedCompositeService { + + private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) + + override def init(hiveConf: HiveConf) { + setAncestorField(this, 1, "hiveConf", hiveConf) + + val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) + setAncestorField(this, 1, "backgroundOperationPool", + Executors.newFixedThreadPool(backgroundPoolSize)) + getAncestorField[Log](this, 3, "LOG").info( + s"HiveServer2: Async execution pool size $backgroundPoolSize") + + setAncestorField(this, 1, "operationManager", sparkSqlOperationManager) + addService(sparkSqlOperationManager) + + initCompositeService(hiveConf) + } + + override def closeSession(sessionHandle: SessionHandle) { + SparkSQLEnv.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) + super.closeSession(sessionHandle) + sparkSqlOperationManager.sessionToActivePool -= sessionHandle + } @throws(classOf[HiveSQLException]) override def openSession( @@ -176,18 +204,12 @@ private[hive] class SparkExecuteStatementOperation( } def run(): Unit = { - val sid = UUID.randomUUID().toString + val statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) - val group = hiveContext.sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) match { - case groupId: String => - hiveContext.sparkContext.setJobDescription(statement) - groupId - case _ => hiveContext.sparkContext - .setJobGroup(parentSession.getSessionHandle.getSessionId.toString, statement) - sid - } - SparkSQLEnv.sqlEventListener.onStatementStart(sid, parentSession, statement, group) + hiveContext.sparkContext.setJobGroup(statementId, statement) + SparkSQLEnv.sqlEventListener + .onStatementStart(statementId, parentSession, statement, statementId) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -200,7 +222,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } - SparkSQLEnv.sqlEventListener.onStatementParse(sid, result.queryExecution.toString()) + SparkSQLEnv.sqlEventListener.onStatementParse(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -217,11 +239,12 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) - SparkSQLEnv.sqlEventListener.onStatementError(sid, e.getMessage, e.getStackTraceString) + SparkSQLEnv.sqlEventListener + .onStatementError(statementId, e.getMessage, e.getStackTraceString) logError("Error executing query:", e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) - SparkSQLEnv.sqlEventListener.onStatementFinish(sid) + SparkSQLEnv.sqlEventListener.onStatementFinish(statementId) } } From fd352621c215cf39efb6a3d289d8659366b02c53 Mon Sep 17 00:00:00 2001 From: tianyi Date: Sun, 25 Jan 2015 17:32:13 +0800 Subject: [PATCH 7/7] combine ThriftServerUIEventListener with exists listener in HiveThriftServer2 --- .../hive/thriftserver/HiveThriftServer2.scala | 29 ++++++--------- ....scala => HiveThriftServer2Listener.scala} | 37 ++++++++++--------- .../sql/hive/thriftserver/SparkSQLEnv.scala | 4 -- .../thriftserver/ui/ThriftServerPage.scala | 1 + .../thriftserver/ui/ThriftServerTab.scala | 4 +- .../spark/sql/hive/thriftserver/Shim12.scala | 13 ++++--- .../spark/sql/hive/thriftserver/Shim13.scala | 12 +++--- 7 files changed, 48 insertions(+), 52 deletions(-) rename sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/{ui/ThriftServerUIEventListener.scala => HiveThriftServer2Listener.scala} (86%) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6011550675c7..1d6725a75627 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -20,15 +20,19 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.commons.logging.LogFactory import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.cli.SessionHandle +import org.apache.hive.service.cli.session.HiveSession import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor} -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab -import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} +import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerApplicationEnd, SparkListener} + +import scala.collection.mutable.HashMap /** * The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a @@ -36,7 +40,7 @@ import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} */ object HiveThriftServer2 extends Logging { var LOG = LogFactory.getLog(classOf[HiveServer2]) - + var sqlEventListener: HiveThriftServer2Listener = _ /** * :: DeveloperApi :: * Starts a new thrift server with the given context. @@ -46,7 +50,6 @@ object HiveThriftServer2 extends Logging { val server = new HiveThriftServer2(sqlContext) server.init(sqlContext.hiveconf) server.start() - sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener(server)) } def main(args: Array[String]) { @@ -71,37 +74,29 @@ object HiveThriftServer2 extends Logging { server.init(SparkSQLEnv.hiveContext.hiveconf) server.start() logInfo("HiveThriftServer2 started") - SparkSQLEnv.sparkContext.addSparkListener(new HiveThriftServer2Listener(server)) } catch { case e: Exception => logError("Error starting HiveThriftServer2", e) System.exit(-1) } } - - /** - * A inner sparkListener called in sc.stop to clean up the HiveThriftServer2 - */ - class HiveThriftServer2Listener(val server: HiveServer2) extends SparkListener { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - server.stop() - } - } - } private[hive] class HiveThriftServer2(hiveContext: HiveContext) extends HiveServer2 with ReflectedCompositeService { - private[hive] val uiTab: Option[ThriftServerTab] = + override def init(hiveConf: HiveConf) { + import HiveThriftServer2._ + sqlEventListener = new HiveThriftServer2Listener(this, SparkSQLEnv.sparkContext.getConf) + SparkSQLEnv.sparkContext.addSparkListener(sqlEventListener) + if (hiveContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { Some(new ThriftServerTab(hiveContext.sparkContext)) } else { None } - override def init(hiveConf: HiveConf) { val sparkSqlCliService = new SparkSQLCLIService(hiveContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala similarity index 86% rename from sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala rename to sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala index 08b088ec7737..29e31312131c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerUIEventListener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala @@ -15,16 +15,17 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.thriftserver.ui +package org.apache.spark.sql.hive.thriftserver import org.apache.hive.service.cli.SessionHandle import org.apache.hive.service.cli.session.HiveSession -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener} +import org.apache.hive.service.server.HiveServer2 +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerApplicationEnd, SparkListener} import scala.collection.mutable.HashMap -private[thriftserver] trait ThriftServerEventListener { +private[thriftserver] trait HiveThriftServerEventListener { /** * Called when a session created. */ @@ -61,9 +62,9 @@ private[thriftserver] trait ThriftServerEventListener { } private[thriftserver] class SessionInfo( - val session: HiveSession, - val startTimestamp: Long, - val ip: String) { + val session: HiveSession, + val startTimestamp: Long, + val ip: String) { val sessionID = session.getSessionHandle.getSessionId.toString val userName = if (session.getUserName == null) "UNKNOWN" else session.getUserName var finishTimestamp = 0L @@ -103,17 +104,24 @@ private[thriftserver] class ExecutionInfo( } } -private[sql] class ThriftServerUIEventListener(val conf: SparkConf) - extends ThriftServerEventListener with SparkListener { +/** + * A listener for HiveThriftServer2 + */ +class HiveThriftServer2Listener( + val server: HiveServer2, + val conf:SparkConf) extends SparkListener with HiveThriftServerEventListener{ - import ThriftServerUIEventListener._ + // called in sc.stop to clean up the HiveThriftServer2 + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + server.stop() + } var sessionList = new HashMap[SessionHandle, SessionInfo] var executeList = new HashMap[String, ExecutionInfo] val retainedStatements = - conf.getInt("spark.thriftserver.ui.retainedStatements", DEFAULT_RETAINED_STATEMENTS) + conf.getInt("spark.thriftserver.ui.retainedStatements", 1000) val retainedSessions = - conf.getInt("spark.thriftserver.ui.retainedSessions", DEFAULT_RETAINED_SESSIONS) + conf.getInt("spark.thriftserver.ui.retainedSessions", 1000) var totalRunning = 0 override def onJobStart(jobStart: SparkListenerJobStart): Unit = { @@ -194,8 +202,3 @@ private[sql] class ThriftServerUIEventListener(val conf: SparkConf) } } } - -private[thriftserver] object ThriftServerUIEventListener { - val DEFAULT_RETAINED_SESSIONS = 1000 - val DEFAULT_RETAINED_STATEMENTS = 1000 -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index d77b928ef0cf..1160909ed4b4 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -21,7 +21,6 @@ import scala.collection.JavaConversions._ import org.apache.spark.scheduler.StatsReportListener import org.apache.spark.sql.hive.{HiveShim, HiveContext} -import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerUIEventListener import org.apache.spark.{Logging, SparkConf, SparkContext} /** A singleton object for the master program. The slaves should not access this. */ @@ -30,7 +29,6 @@ private[hive] object SparkSQLEnv extends Logging { var hiveContext: HiveContext = _ var sparkContext: SparkContext = _ - var sqlEventListener: ThriftServerUIEventListener = _ def init() { if (hiveContext == null) { @@ -51,8 +49,6 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = new SparkContext(sparkConf) sparkContext.addSparkListener(new StatsReportListener()) hiveContext = new HiveContext(sparkContext) - sqlEventListener = new ThriftServerUIEventListener(sparkConf) - sparkContext.addSparkListener(sqlEventListener) if (log.isDebugEnabled) { hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index cdefbe2f969a..55eab02d2673 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.Logging +import org.apache.spark.sql.hive.thriftserver.{ExecutionState, ExecutionInfo} import org.apache.spark.ui.UIUtils._ import org.apache.spark.ui._ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala index 356efedd6bd7..8a1e65ec6467 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.thriftserver.ui -import org.apache.spark.sql.hive.thriftserver.SparkSQLEnv +import org.apache.spark.sql.hive.thriftserver.{HiveThriftServer2, SparkSQLEnv} import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab._ import org.apache.spark.ui.{SparkUI, SparkUITab} import org.apache.spark.{SparkContext, Logging, SparkException} @@ -30,7 +30,7 @@ private[thriftserver] class ThriftServerTab(sparkContext: SparkContext) extends SparkUITab(getSparkUI(sparkContext), "ThriftServer") with Logging { val parent = getSparkUI(sparkContext) - val listener = SparkSQLEnv.sqlEventListener + val listener = HiveThriftServer2.sqlEventListener attachPage(new ThriftServerPage(this)) parent.attachTab(this) diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index df1cf3644b95..dcf88b564e46 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -90,7 +90,7 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) } override def closeSession(sessionHandle: SessionHandle) { - SparkSQLEnv.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) + HiveThriftServer2.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) super.closeSession(sessionHandle) sparkSqlOperationManager.sessionToActivePool -= sessionHandle } @@ -103,7 +103,7 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) withImpersonation: Boolean, delegationToken: java.lang.String): SessionHandle = { val ret = super.openSession(username, password, sessionConf, withImpersonation, delegationToken) - SparkSQLEnv.sqlEventListener.onSessionCreated("UNKNOWN", super.getSession(ret)) + HiveThriftServer2.sqlEventListener.onSessionCreated("UNKNOWN", super.getSession(ret)) ret } } @@ -232,7 +232,7 @@ private[hive] class SparkExecuteStatementOperation( logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) hiveContext.sparkContext.setJobGroup(statementId, statement) - SparkSQLEnv.sqlEventListener + HiveThriftServer2.sqlEventListener .onStatementStart(statementId, parentSession, statement, statementId) try { result = hiveContext.sql(statement) @@ -246,7 +246,8 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } - SparkSQLEnv.sqlEventListener.onStatementParse(statementId, result.queryExecution.toString()) + HiveThriftServer2.sqlEventListener + .onStatementParse(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -263,12 +264,12 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) - SparkSQLEnv.sqlEventListener + HiveThriftServer2.sqlEventListener .onStatementError(statementId, e.getMessage, e.getStackTraceString) logError("Error executing query:",e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) - SparkSQLEnv.sqlEventListener.onStatementFinish(statementId) + HiveThriftServer2.sqlEventListener.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index a8a84e41dfd8..55d1ff22b5dc 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -91,7 +91,7 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) } override def closeSession(sessionHandle: SessionHandle) { - SparkSQLEnv.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) + HiveThriftServer2.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) super.closeSession(sessionHandle) sparkSqlOperationManager.sessionToActivePool -= sessionHandle } @@ -107,7 +107,7 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) val ret = super.openSession(protocol, username, password, sessionConf, withImpersonation, delegationToken) val session = super.getSession(ret) - SparkSQLEnv.sqlEventListener.onSessionCreated("UNKNOWN", session) + HiveThriftServer2.sqlEventListener.onSessionCreated("UNKNOWN", session) ret } } @@ -208,7 +208,7 @@ private[hive] class SparkExecuteStatementOperation( logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) hiveContext.sparkContext.setJobGroup(statementId, statement) - SparkSQLEnv.sqlEventListener + HiveThriftServer2.sqlEventListener .onStatementStart(statementId, parentSession, statement, statementId) try { result = hiveContext.sql(statement) @@ -222,7 +222,7 @@ private[hive] class SparkExecuteStatementOperation( sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } - SparkSQLEnv.sqlEventListener.onStatementParse(statementId, result.queryExecution.toString()) + HiveThriftServer2.sqlEventListener.onStatementParse(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -239,12 +239,12 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) - SparkSQLEnv.sqlEventListener + HiveThriftServer2.sqlEventListener .onStatementError(statementId, e.getMessage, e.getStackTraceString) logError("Error executing query:", e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) - SparkSQLEnv.sqlEventListener.onStatementFinish(statementId) + HiveThriftServer2.sqlEventListener.onStatementFinish(statementId) } }