Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,22 @@
package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.sql.{Date, Timestamp}
import java.util.{Arrays, Map => JMap, UUID}
import java.util.{Arrays, Map => JMap}
import java.util.concurrent.RejectedExecutionException

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.shims.Utils
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.ExecuteStatementOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
import org.apache.spark.sql.execution.HiveResult.{getTimeFormatters, toHiveString, TimeFormatters}
import org.apache.spark.sql.execution.command.SetCommand
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
Expand Down Expand Up @@ -315,16 +311,11 @@ private[hive] class SparkExecuteStatementOperation(
} else {
logError(s"Error executing query with $statementId, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now the onStatementError is never called?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error running query: " + root.toString, root)
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException("Error running query: " + e.toString, e)
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.UUID

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.OperationState
import org.apache.hive.service.cli.operation.GetCatalogsOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetCatalogsOperation
Expand Down Expand Up @@ -62,22 +58,8 @@ private[hive] class SparkGetCatalogsOperation(
authorizeMetaGets(HiveOperationType.GET_CATALOGS, null)
}
setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
logError(s"Error executing get catalogs operation with $statementId", e)
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting catalogs: " + root.toString, root)
}
}
} catch onError()

HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.hive.thriftserver.ThriftserverShimUtils.toJavaSQLType
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own SparkGetColumnsOperation
Expand Down Expand Up @@ -122,22 +121,8 @@ private[hive] class SparkGetColumnsOperation(
}
}
setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
logError(s"Error executing get columns operation with $statementId", e)
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting columns: " + root.toString, root)
}
}
} catch onError()

HiveThriftServer2.eventManager.onStatementFinish(statementId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,8 @@ private[hive] class SparkGetFunctionsOperation(
}
}
setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
logError(s"Error executing get functions operation with $statementId", e)
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting functions: " + root.toString, root)
}
}
} catch onError()

HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,8 @@ private[hive] class SparkGetSchemasOperation(
rowSet.addRow(Array[AnyRef](globalTempViewDb, DEFAULT_HIVE_CATALOG))
}
setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
logError(s"Error executing get schemas operation with $statementId", e)
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting schemas: " + root.toString, root)
}
}
} catch onError()

HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.thriftserver

import java.util.UUID

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetTableTypesOperation
Expand All @@ -28,7 +27,6 @@ import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetTableTypesOperation
Expand Down Expand Up @@ -69,22 +67,8 @@ private[hive] class SparkGetTableTypesOperation(
rowSet.addRow(Array[AnyRef](tableType))
}
setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
logError(s"Error executing get table types operation with $statementId", e)
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting table types: " + root.toString, root)
}
}
} catch onError()

HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.{List => JList, UUID}
import java.util.{List => JList}
import java.util.regex.Pattern

import scala.collection.JavaConverters._

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.{HiveOperationType, HivePrivilegeObjectUtils}
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetTablesOperation
import org.apache.hive.service.cli.session.HiveSession
Expand All @@ -33,7 +31,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetTablesOperation
Expand Down Expand Up @@ -111,22 +108,8 @@ private[hive] class SparkGetTablesOperation(
}
}
setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
logError(s"Error executing get tables operation with $statementId", e)
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting tables: " + root.toString, root)
}
}
} catch onError()

HiveThriftServer2.eventManager.onStatementFinish(statementId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ package org.apache.spark.sql.hive.thriftserver

import java.util.UUID

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.OperationState
import org.apache.hive.service.cli.operation.GetTypeInfoOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetTypeInfoOperation
Expand Down Expand Up @@ -87,22 +85,8 @@ private[hive] class SparkGetTypeInfoOperation(
rowSet.addRow(rowData)
})
setState(OperationState.FINISHED)
} catch {
case e: Throwable =>
logError(s"Error executing get type info with $statementId", e)
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error getting type info: " + root.toString, root)
}
}
} catch onError()

HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import org.apache.hive.service.cli.OperationState
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
import org.apache.hive.service.cli.operation.Operation

import org.apache.spark.SparkContext
Expand Down Expand Up @@ -93,4 +93,16 @@ private[hive] trait SparkOperation extends Operation with Logging {
case t =>
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}

protected def onError(): PartialFunction[Throwable, Unit] = {
case e: Throwable =>
logError(s"Error executing get catalogs operation with $statementId", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error message still refers to "get catalogs" but is logged for every type of operation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice catch!thanks!

super.setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, Utils.exceptionString(e))
e match {
case _: HiveSQLException => throw e
case _ => throw new HiveSQLException(s"Error operating $getType ${e.getMessage}", e)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.sql.{DriverManager, Statement}
import java.util

import scala.collection.JavaConverters._
import scala.concurrent.duration._
Expand All @@ -27,7 +28,12 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftCLIService
import org.apache.hive.jdbc.HttpBasicAuthInterceptor
import org.apache.hive.service.auth.PlainSaslHelper
import org.apache.hive.service.cli.thrift.{ThriftCLIService, ThriftCLIServiceClient}
import org.apache.http.impl.client.HttpClientBuilder
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.{THttpClient, TSocket}

import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -76,8 +82,9 @@ trait SharedThriftServer extends SharedSparkSession {
s"jdbc:hive2://localhost:$serverPort/"
}

protected def user: String = System.getProperty("user.name")

protected def withJdbcStatement(fs: (Statement => Unit)*): Unit = {
val user = System.getProperty("user.name")
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
val connections =
fs.map { _ => DriverManager.getConnection(jdbcUri, user, "") }
Expand All @@ -91,6 +98,29 @@ trait SharedThriftServer extends SharedSparkSession {
}
}

protected def withCLIServiceClient(f: ThriftCLIServiceClient => Unit): Unit = {
require(serverPort != 0, "Failed to bind an actual port for HiveThriftServer2")
val transport = mode match {
case ServerMode.binary =>
val rawTransport = new TSocket("localhost", serverPort)
PlainSaslHelper.getPlainTransport(user, "anonymous", rawTransport)
case ServerMode.http =>
val interceptor = new HttpBasicAuthInterceptor(
user,
"anonymous",
null, null, true, new util.HashMap[String, String]())
new THttpClient(
s"http://localhost:$serverPort/cliservice",
HttpClientBuilder.create.addInterceptorFirst(interceptor).build())
}

val protocol = new TBinaryProtocol(transport)
val client = new ThriftCLIServiceClient(new ThriftserverShimUtils.Client(protocol))

transport.open()
try f(client) finally transport.close()
}

private def startThriftServer(attempt: Int): Unit = {
logInfo(s"Trying to start HiveThriftServer2: mode=$mode, attempt=$attempt")
val sqlContext = spark.newSession().sqlContext
Expand Down
Loading