diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 6e07df18b0e1..1d6725a75627 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -20,14 +20,19 @@ package org.apache.spark.sql.hive.thriftserver import org.apache.commons.logging.LogFactory import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.cli.SessionHandle +import org.apache.hive.service.cli.session.HiveSession import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor} -import org.apache.spark.Logging +import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} +import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab +import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerApplicationEnd, SparkListener} + +import scala.collection.mutable.HashMap /** * The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a @@ -35,7 +40,7 @@ import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener} */ object HiveThriftServer2 extends Logging { var LOG = LogFactory.getLog(classOf[HiveServer2]) - + var sqlEventListener: HiveThriftServer2Listener = _ /** * :: DeveloperApi :: * Starts a new thrift server with the given context. @@ -45,7 +50,6 @@ object HiveThriftServer2 extends Logging { val server = new HiveThriftServer2(sqlContext) server.init(sqlContext.hiveconf) server.start() - sqlContext.sparkContext.addSparkListener(new HiveThriftServer2Listener(server)) } def main(args: Array[String]) { @@ -70,23 +74,12 @@ object HiveThriftServer2 extends Logging { server.init(SparkSQLEnv.hiveContext.hiveconf) server.start() logInfo("HiveThriftServer2 started") - SparkSQLEnv.sparkContext.addSparkListener(new HiveThriftServer2Listener(server)) } catch { case e: Exception => logError("Error starting HiveThriftServer2", e) System.exit(-1) } } - - /** - * A inner sparkListener called in sc.stop to clean up the HiveThriftServer2 - */ - class HiveThriftServer2Listener(val server: HiveServer2) extends SparkListener { - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - server.stop() - } - } - } private[hive] class HiveThriftServer2(hiveContext: HiveContext) @@ -94,6 +87,16 @@ private[hive] class HiveThriftServer2(hiveContext: HiveContext) with ReflectedCompositeService { override def init(hiveConf: HiveConf) { + import HiveThriftServer2._ + sqlEventListener = new HiveThriftServer2Listener(this, SparkSQLEnv.sparkContext.getConf) + SparkSQLEnv.sparkContext.addSparkListener(sqlEventListener) + + if (hiveContext.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) { + Some(new ThriftServerTab(hiveContext.sparkContext)) + } else { + None + } + val sparkSqlCliService = new SparkSQLCLIService(hiveContext) setSuperField(this, "cliService", sparkSqlCliService) addService(sparkSqlCliService) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala new file mode 100644 index 000000000000..29e31312131c --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import org.apache.hive.service.cli.SessionHandle +import org.apache.hive.service.cli.session.HiveSession +import org.apache.hive.service.server.HiveServer2 +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerApplicationEnd, SparkListener} + +import scala.collection.mutable.HashMap + +private[thriftserver] trait HiveThriftServerEventListener { + /** + * Called when a session created. + */ + def onSessionCreated(ip: String, session: HiveSession): Unit = {} + + /** + * Called when a session closed. + */ + def onSessionClosed(session: HiveSession): Unit = {} + + /** + * Called when a statement started to run. + */ + def onStatementStart( + id: String, + session: HiveSession, + statement: String, + groupId: String): Unit = {} + + /** + * Called when a statement completed compilation. + */ + def onStatementParse(id: String, executePlan: String): Unit = {} + + /** + * Called when a statement got a error during running. + */ + def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {} + + /** + * Called when a statement ran success. + */ + def onStatementFinish(id: String): Unit = {} +} + +private[thriftserver] class SessionInfo( + val session: HiveSession, + val startTimestamp: Long, + val ip: String) { + val sessionID = session.getSessionHandle.getSessionId.toString + val userName = if (session.getUserName == null) "UNKNOWN" else session.getUserName + var finishTimestamp = 0L + var totalExecute = 0 + + def totalTime = { + if (finishTimestamp == 0L) { + System.currentTimeMillis() - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} + +private[thriftserver] object ExecutionState extends Enumeration { + val STARTED, COMPILED, FAILED, FINISHED = Value + type ExecutionState = Value +} + +private[thriftserver] class ExecutionInfo( + val statement: String, + val session: HiveSession, + val startTimestamp: Long) { + val userName = if(session.getUserName == null) "UNKNOWN" else session.getUserName + var finishTimestamp = 0L + var executePlan = "" + var detail = "" + var state: ExecutionState.Value = ExecutionState.STARTED + var jobId = scala.collection.mutable.ArrayBuffer[String]() + var groupId = "" + def totalTime = { + if (finishTimestamp == 0L) { + System.currentTimeMillis() - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} + +/** + * A listener for HiveThriftServer2 + */ +class HiveThriftServer2Listener( + val server: HiveServer2, + val conf:SparkConf) extends SparkListener with HiveThriftServerEventListener{ + + // called in sc.stop to clean up the HiveThriftServer2 + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + server.stop() + } + + var sessionList = new HashMap[SessionHandle, SessionInfo] + var executeList = new HashMap[String, ExecutionInfo] + val retainedStatements = + conf.getInt("spark.thriftserver.ui.retainedStatements", 1000) + val retainedSessions = + conf.getInt("spark.thriftserver.ui.retainedSessions", 1000) + var totalRunning = 0 + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobGroup = for ( + props <- Option(jobStart.properties); + statement <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) + ) yield statement + + jobGroup match { + case Some(groupId: String) => { + val ret = executeList.find( _ match { + case (id: String, info: ExecutionInfo) => { + info.groupId == groupId + } + }) + if(ret.isDefined) { + ret.get._2.jobId += jobStart.jobId.toString + ret.get._2.groupId = groupId + } + } + } + } + + override def onSessionCreated(ip: String, session: HiveSession): Unit = { + val info = new SessionInfo(session, System.currentTimeMillis(), ip) + sessionList(session.getSessionHandle) = info + trimSessionIfNecessary() + } + + override def onSessionClosed(session: HiveSession): Unit = { + sessionList(session.getSessionHandle).finishTimestamp = System.currentTimeMillis() + } + + override def onStatementStart(id: String, session: HiveSession, + statement: String, groupId: String): Unit = { + val info = new ExecutionInfo(statement, session, System.currentTimeMillis()) + info.state = ExecutionState.STARTED + executeList(id) = info + trimExecutionIfNecessary() + sessionList(session.getSessionHandle).totalExecute += 1 + executeList(id).groupId = groupId + totalRunning += 1 + } + + override def onStatementParse(id: String, executePlan: String): Unit = { + executeList(id).executePlan = executePlan + executeList(id).state = ExecutionState.COMPILED + } + + override def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = { + executeList(id).finishTimestamp = System.currentTimeMillis() + executeList(id).detail = errorMessage + executeList(id).state = ExecutionState.FAILED + totalRunning -= 1 + } + + override def onStatementFinish(id: String): Unit = { + executeList(id).finishTimestamp = System.currentTimeMillis() + executeList(id).state = ExecutionState.FINISHED + totalRunning -= 1 + } + + private def trimExecutionIfNecessary() = synchronized { + if (executeList.size > retainedStatements) { + val toRemove = math.max(retainedStatements / 10, 1) + executeList.toList.sortBy(_._2.startTimestamp).take(toRemove).foreach { s => + executeList.remove(s._1) + } + } + } + + private def trimSessionIfNecessary() = synchronized { + if (sessionList.size > retainedSessions) { + val toRemove = math.max(retainedSessions / 10, 1) + sessionList.toList.sortBy(_._2.startTimestamp).take(toRemove).foreach { s => + sessionList.remove(s._1) + } + } + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 158c22515972..1160909ed4b4 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -49,7 +49,6 @@ private[hive] object SparkSQLEnv extends Logging { sparkContext = new SparkContext(sparkConf) sparkContext.addSparkListener(new StatsReportListener()) hiveContext = new HiveContext(sparkContext) - if (log.isDebugEnabled) { hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala deleted file mode 100644 index 89e9ede7261c..000000000000 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.thriftserver - -import java.util.concurrent.Executors - -import org.apache.commons.logging.Log -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hive.service.cli.session.SessionManager - -import org.apache.spark.sql.hive.HiveContext -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager -import org.apache.hive.service.cli.SessionHandle - -private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) - extends SessionManager - with ReflectedCompositeService { - - private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) - - override def init(hiveConf: HiveConf) { - setSuperField(this, "hiveConf", hiveConf) - - val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) - setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize)) - getAncestorField[Log](this, 3, "LOG").info( - s"HiveServer2: Async execution pool size $backgroundPoolSize") - - setSuperField(this, "operationManager", sparkSqlOperationManager) - addService(sparkSqlOperationManager) - - initCompositeService(hiveConf) - } - - override def closeSession(sessionHandle: SessionHandle) { - super.closeSession(sessionHandle) - sparkSqlOperationManager.sessionToActivePool -= sessionHandle - } -} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala new file mode 100644 index 000000000000..55eab02d2673 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import java.util.Calendar +import javax.servlet.http.HttpServletRequest + +import org.apache.commons.lang3.StringEscapeUtils +import org.apache.spark.Logging +import org.apache.spark.sql.hive.thriftserver.{ExecutionState, ExecutionInfo} +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.ui._ + +import scala.xml.Node + +/** Page for Spark Web UI that shows statistics of a streaming job */ +private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") with Logging { + + private val listener = parent.listener + private val startTime = Calendar.getInstance().getTime() + private val emptyCell = "-" + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val content = + generateBasicStats() ++ +
++ +

+ Total {listener.sessionList.size} session online, + Total {listener.totalRunning} sql running +

++ + generateSessionStatsTable() ++ + generateSQLStatsTable() + UIUtils.headerSparkPage("ThriftServer", content, parent, Some(5000)) + } + + /** Generate basic stats of the streaming program */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - startTime.getTime + + } + + /** Generate stats of batch statements of the thrift server program */ + private def generateSQLStatsTable(): Seq[Node] = { + val numStatement = listener.executeList.size + val table = if (numStatement > 0) { + val headerRow = Seq("User", "JobID", "GroupID", "Start Time", "Finish Time", "Duration", + "Statement", "State", "Detail") + val dataRows = listener.executeList.values.toSeq.sortBy(_.startTimestamp).reverse + + def generateDataRow(info: ExecutionInfo): Seq[Node] = { + val jobLink = info.jobId.map { id: String => + + [{id}] + + } + val detail = if(info.state == ExecutionState.FAILED) info.detail else info.executePlan + + {info.userName} + + {jobLink} + + {info.groupId} + {formatDate(info.startTimestamp)} + {formatDate(info.finishTimestamp)} + {formatDurationOption(Some(info.totalTime))} + {info.statement} + {info.state} + {errorMessageCell(detail)} + + } + + Some(UIUtils.listingTable(headerRow, generateDataRow, + dataRows, false, None, Seq(null), false)) + } else { + None + } + + val content = +
SQL Statistics
++ +
+ +
+ + content + } + + private def errorMessageCell(errorMessage: String): Seq[Node] = { + val isMultiline = errorMessage.indexOf('\n') >= 0 + val errorSummary = StringEscapeUtils.escapeHtml4( + if (isMultiline) { + errorMessage.substring(0, errorMessage.indexOf('\n')) + } else { + errorMessage + }) + val details = if (isMultiline) { + // scalastyle:off + + + details + ++ + + // scalastyle:on + } else { + "" + } + {errorSummary}{details} + } + + /** Generate stats of batch sessions of the thrift server program */ + private def generateSessionStatsTable(): Seq[Node] = { + val numBatches = listener.sessionList.size + val table = if (numBatches > 0) { + val dataRows = + listener.sessionList.values.toSeq.sortBy(_.startTimestamp).reverse.map ( session => + Seq( + session.userName, + session.ip, + session.sessionID, + formatDate(session.startTimestamp), + formatDate(session.finishTimestamp), + formatDurationOption(Some(session.totalTime)), + session.totalExecute.toString + ) + ).toSeq + val headerRow = Seq("User", "IP", "Session ID", "Start Time", "Finish Time", "Duration", + "Total Execute") + Some(listingTable(headerRow, dataRows)) + } else { + None + } + + val content = +
Session Statistics
++ +
+ +
+ + content + } + + + /** + * Returns a human-readable string representing a duration such as "5 second 35 ms" + */ + private def formatDurationOption(msOption: Option[Long]): String = { + msOption.map(formatDurationVerbose).getOrElse(emptyCell) + } + + /** Generate HTML table from string data */ + private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = { + def generateDataRow(data: Seq[String]): Seq[Node] = { + {data.map(d => {d})} + } + UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) + } +} + diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala new file mode 100644 index 000000000000..8a1e65ec6467 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import org.apache.spark.sql.hive.thriftserver.{HiveThriftServer2, SparkSQLEnv} +import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab._ +import org.apache.spark.ui.{SparkUI, SparkUITab} +import org.apache.spark.{SparkContext, Logging, SparkException} + +/** + * Spark Web UI tab that shows statistics of a streaming job. + * This assumes the given SparkContext has enabled its SparkUI. + */ +private[thriftserver] class ThriftServerTab(sparkContext: SparkContext) + extends SparkUITab(getSparkUI(sparkContext), "ThriftServer") with Logging { + + val parent = getSparkUI(sparkContext) + val listener = HiveThriftServer2.sqlEventListener + + attachPage(new ThriftServerPage(this)) + parent.attachTab(this) +} + +private[thriftserver] object ThriftServerTab { + def getSparkUI(sparkContext: SparkContext): SparkUI = { + sparkContext.ui.getOrElse { + throw new SparkException("Parent SparkUI to attach this tab to not found!") + } + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala index b52a51d11e4a..10a0c52f16a6 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala @@ -22,10 +22,13 @@ import java.net.ServerSocket import java.sql.{Date, DriverManager, Statement} import java.util.concurrent.TimeoutException +import org.scalatest.concurrent.Eventually._ + import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} +import scala.io.Source import scala.sys.process.{Process, ProcessLogger} import scala.util.Try @@ -61,22 +64,22 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") } - def randomListeningPort = { + def randomListeningPorts(n: Int) = { // Let the system to choose a random available port to avoid collision with other parallel // builds. - val socket = new ServerSocket(0) - val port = socket.getLocalPort - socket.close() - port + val sockets = Array.fill(n)(new ServerSocket(0)) + val ports = sockets.map(_.getLocalPort) + sockets.foreach(_.close()) + ports } def withJdbcStatement( serverStartTimeout: FiniteDuration = 1.minute, httpMode: Boolean = false)( - f: Statement => Unit) { - val port = randomListeningPort + f: (Int, Int, Statement) => Unit) { + val Array(port, uiPort) = randomListeningPorts(2) - startThriftServer(port, serverStartTimeout, httpMode) { + startThriftServer(port, uiPort, serverStartTimeout, httpMode) { val jdbcUri = if (httpMode) { s"jdbc:hive2://${"localhost"}:$port/" + "default?hive.server2.transport.mode=http;hive.server2.thrift.http.path=cliservice" @@ -89,7 +92,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { val statement = connection.createStatement() try { - f(statement) + f(port, uiPort, statement) } finally { statement.close() connection.close() @@ -100,9 +103,9 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def withCLIServiceClient( serverStartTimeout: FiniteDuration = 1.minute)( f: ThriftCLIServiceClient => Unit) { - val port = randomListeningPort + val Array(port, uiPort) = randomListeningPorts(2) - startThriftServer(port) { + startThriftServer(port, uiPort) { // Transport creation logics below mimics HiveConnection.createBinaryTransport val rawTransport = new TSocket("localhost", port) val user = System.getProperty("user.name") @@ -122,9 +125,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging { def startThriftServer( port: Int, + uiPort: Int, serverStartTimeout: FiniteDuration = 1.minute, httpMode: Boolean = false)( - f: => Unit) { + f: => Unit): Unit = { val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator) val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator) @@ -143,7 +147,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { | --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=http | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT}=$port | --driver-class-path ${sys.props("java.class.path")} - | --conf spark.ui.enabled=false + | --conf spark.ui.port=$uiPort """.stripMargin.split("\\s+").toSeq } else { s"""$startScript @@ -154,7 +158,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$port | --driver-class-path ${sys.props("java.class.path")} - | --conf spark.ui.enabled=false + | --conf spark.ui.port=$uiPort """.stripMargin.split("\\s+").toSeq } @@ -228,7 +232,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("Test JDBC query execution") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "SET spark.sql.shuffle.partitions=3", "DROP TABLE IF EXISTS test", @@ -247,7 +251,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("Test JDBC query execution in Http Mode") { - withJdbcStatement(httpMode = true) { statement => + withJdbcStatement(httpMode = true) { (port, uiPort, statement) => val queries = Seq( "SET spark.sql.shuffle.partitions=3", "DROP TABLE IF EXISTS test", @@ -266,7 +270,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-3004 regression: result set containing NULL") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_null", "CREATE TABLE test_null(key INT, val STRING)", @@ -308,7 +312,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("Checks Hive version") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}") @@ -316,7 +320,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("Checks Hive version in Http Mode") { - withJdbcStatement(httpMode = true) { statement => + withJdbcStatement(httpMode = true) { (port, uiPort, statement) => val resultSet = statement.executeQuery("SET spark.sql.hive.version") resultSet.next() assert(resultSet.getString(1) === s"spark.sql.hive.version=${HiveShim.version}") @@ -324,7 +328,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-4292 regression: result set iterator issue") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_4292", "CREATE TABLE test_4292(key INT, val STRING)", @@ -344,7 +348,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-4309 regression: Date type support") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_date", "CREATE TABLE test_date(key INT, value STRING)", @@ -362,7 +366,7 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } test("SPARK-4407 regression: Complex type support") { - withJdbcStatement() { statement => + withJdbcStatement() { (port, uiPort, statement) => val queries = Seq( "DROP TABLE IF EXISTS test_map", "CREATE TABLE test_map(key INT, value STRING)", @@ -384,4 +388,32 @@ class HiveThriftServer2Suite extends FunSuite with Logging { } } } + + test("SPARK-5100 monitor page") { + withJdbcStatement() { (port, uiPort, statement) => + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") + + queries.foreach(statement.execute) + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(s"http://localhost:$uiPort").mkString + + // check whether new page exists + assert(html.toLowerCase.contains("thriftserver")) + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(s"http://localhost:$uiPort/ThriftServer/").mkString + assert(!html.contains("random data that should not be present")) + + // check whether statements exists + queries.foreach { line => + assert(html.toLowerCase.contains(line.toLowerCase)) + } + } + } + } } diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index 166c56b9dfe2..dcf88b564e46 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -18,7 +18,13 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} -import java.util.{ArrayList => JArrayList, Map => JMap} +import java.util.concurrent.Executors +import java.util.{ArrayList => JArrayList, Map => JMap, UUID} + +import org.apache.commons.logging.Log +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -29,9 +35,9 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation -import org.apache.hive.service.cli.session.HiveSession +import org.apache.hive.service.cli.session.{SessionManager, HiveSession} -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.sql.{SQLConf, SchemaRDD, Row => SparkRow} import org.apache.spark.sql.execution.SetCommand import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ @@ -63,6 +69,45 @@ private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveC } } +private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) + extends SessionManager with ReflectedCompositeService { + + private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) + + override def init(hiveConf: HiveConf) { + setAncestorField(this, 1, "hiveConf", hiveConf) + + val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) + setAncestorField(this, 1, "backgroundOperationPool", + Executors.newFixedThreadPool(backgroundPoolSize)) + getAncestorField[Log](this, 3, "LOG").info( + s"HiveServer2: Async execution pool size $backgroundPoolSize") + + setAncestorField(this, 1, "operationManager", sparkSqlOperationManager) + addService(sparkSqlOperationManager) + + initCompositeService(hiveConf) + } + + override def closeSession(sessionHandle: SessionHandle) { + HiveThriftServer2.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) + super.closeSession(sessionHandle) + sparkSqlOperationManager.sessionToActivePool -= sessionHandle + } + + @throws(classOf[HiveSQLException]) + override def openSession( + username: java.lang.String, + password: java.lang.String, + sessionConf: java.util.Map[java.lang.String, java.lang.String], + withImpersonation: Boolean, + delegationToken: java.lang.String): SessionHandle = { + val ret = super.openSession(username, password, sessionConf, withImpersonation, delegationToken) + HiveThriftServer2.sqlEventListener.onSessionCreated("UNKNOWN", super.getSession(ret)) + ret + } +} + private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, @@ -183,8 +228,12 @@ private[hive] class SparkExecuteStatementOperation( } def run(): Unit = { + val statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) + hiveContext.sparkContext.setJobGroup(statementId, statement) + HiveThriftServer2.sqlEventListener + .onStatementStart(statementId, parentSession, statement, statementId) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -194,10 +243,11 @@ private[hive] class SparkExecuteStatementOperation( logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } - hiveContext.sparkContext.setJobDescription(statement) sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } + HiveThriftServer2.sqlEventListener + .onStatementParse(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -214,9 +264,12 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) + HiveThriftServer2.sqlEventListener + .onStatementError(statementId, e.getMessage, e.getStackTraceString) logError("Error executing query:",e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) + HiveThriftServer2.sqlEventListener.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index eaf7a1ddd499..55d1ff22b5dc 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -18,7 +18,15 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.{Date, Timestamp} -import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} +import java.util.concurrent.Executors +import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, UUID} + +import org.apache.commons.logging.Log +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hive.service.auth.TSetIpAddressProcessor +import org.apache.hive.service.cli.thrift.TProtocolVersion +import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} @@ -27,9 +35,9 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation -import org.apache.hive.service.cli.session.HiveSession +import org.apache.hive.service.cli.session._ -import org.apache.spark.Logging +import org.apache.spark.{SparkContext, Logging} import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} import org.apache.spark.sql.execution.SetCommand import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ @@ -62,6 +70,48 @@ private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveC } } +private[hive] class SparkSQLSessionManager(hiveContext: HiveContext) + extends SessionManager with ReflectedCompositeService { + + private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext) + + override def init(hiveConf: HiveConf) { + setAncestorField(this, 1, "hiveConf", hiveConf) + + val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS) + setAncestorField(this, 1, "backgroundOperationPool", + Executors.newFixedThreadPool(backgroundPoolSize)) + getAncestorField[Log](this, 3, "LOG").info( + s"HiveServer2: Async execution pool size $backgroundPoolSize") + + setAncestorField(this, 1, "operationManager", sparkSqlOperationManager) + addService(sparkSqlOperationManager) + + initCompositeService(hiveConf) + } + + override def closeSession(sessionHandle: SessionHandle) { + HiveThriftServer2.sqlEventListener.onSessionClosed(super.getSession(sessionHandle)) + super.closeSession(sessionHandle) + sparkSqlOperationManager.sessionToActivePool -= sessionHandle + } + + @throws(classOf[HiveSQLException]) + override def openSession( + protocol: TProtocolVersion, + username: java.lang.String, + password: java.lang.String, + sessionConf: java.util.Map[java.lang.String, java.lang.String], + withImpersonation: Boolean, + delegationToken: java.lang.String): SessionHandle = { + val ret = super.openSession(protocol, username, password, + sessionConf, withImpersonation, delegationToken) + val session = super.getSession(ret) + HiveThriftServer2.sqlEventListener.onSessionCreated("UNKNOWN", session) + ret + } +} + private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, statement: String, @@ -154,8 +204,12 @@ private[hive] class SparkExecuteStatementOperation( } def run(): Unit = { + val statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement'") setState(OperationState.RUNNING) + hiveContext.sparkContext.setJobGroup(statementId, statement) + HiveThriftServer2.sqlEventListener + .onStatementStart(statementId, parentSession, statement, statementId) try { result = hiveContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -165,10 +219,10 @@ private[hive] class SparkExecuteStatementOperation( logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") case _ => } - hiveContext.sparkContext.setJobDescription(statement) sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool => hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) } + HiveThriftServer2.sqlEventListener.onStatementParse(statementId, result.queryExecution.toString()) iter = { val useIncrementalCollect = hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean @@ -185,9 +239,12 @@ private[hive] class SparkExecuteStatementOperation( // HiveServer will silently swallow them. case e: Throwable => setState(OperationState.ERROR) + HiveThriftServer2.sqlEventListener + .onStatementError(statementId, e.getMessage, e.getStackTraceString) logError("Error executing query:", e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) + HiveThriftServer2.sqlEventListener.onStatementFinish(statementId) } }