Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
<hbase.version>0.94.6</hbase.version>
<flume.version>1.4.0</flume.version>
<zookeeper.version>3.4.5</zookeeper.version>
<hive.version>0.12.0</hive.version>
<hive.version>0.13.1</hive.version>
<parquet.version>1.4.3</parquet.version>
<jblas.version>1.2.3</jblas.version>
<jetty.version>8.1.14.v20131031</jetty.version>
Expand Down Expand Up @@ -428,7 +428,7 @@
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.4.2.0</version>
<version>10.10.1.1</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
Expand Down
6 changes: 3 additions & 3 deletions sql/hive-thriftserver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<groupId>org.apache.hive</groupId>
<artifactId>hive-cli</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<groupId>org.apache.hive</groupId>
<artifactId>hive-beeline</artifactId>
<version>${hive.version}</version>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private[hive] object SparkSQLCLIDriver {
}
}

if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
if (!sessionState.isRemoteMode) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
Expand Down Expand Up @@ -258,7 +258,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
} else {
var ret = 0
val hconf = conf.asInstanceOf[HiveConf]
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
val proc: CommandProcessor = CommandProcessorFactory.get(Array(tokens(0)), hconf)

if (proc != null) {
if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
val authentication = "hive.server2.authentication"

try {
HiveAuthFactory.loginFromKeytab(hiveConf)
val serverUserName = ShimLoader.getHadoopShims
.getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
setSuperField(this, "serverUserName", serverUserName)
if(hiveConf.get(authentication)!="NONE") {
HiveAuthFactory.loginFromKeytab(hiveConf)
}
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo

override def getSchema: Schema = tableSchema

override def getResults(res: JArrayList[String]): Boolean = {
override def getResults(res: java.util.List[_]): Boolean = {
if (hiveResponse == null) {
false
} else {
res.addAll(hiveResponse)
res.asInstanceOf[java.util.ArrayList[String]].addAll(hiveResponse)
hiveResponse = null
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@

package org.apache.spark.sql.hive.thriftserver.server

import java.security.PrivilegedExceptionAction
import java.sql.Timestamp
import java.util.concurrent.Future
import java.util.{Map => JMap}

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hadoop.security.UserGroupInformation

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, Map}
import scala.math.{random, round}
Expand Down Expand Up @@ -54,102 +62,95 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {

val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) {
val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay, true) {
private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _

private def runInternal(cmd: String) = {
try {
result = hiveContext.sql(cmd)
logDebug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = {
val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
resultRdd.toLocalIterator
} else {
resultRdd.collect().iterator
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
}

def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logDebug("CLOSING")
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
if (!iter.hasNext) {
new RowSet()
reultRowSet
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
var curRow = 0
var rowSet = new ArrayBuffer[Row](maxRows.min(1024))

while (curRow < maxRows && iter.hasNext) {
val sparkRow = iter.next()
val row = new Row()
val row = ArrayBuffer[Any]()
var curCol = 0

while (curCol < sparkRow.length) {
if (sparkRow.isNullAt(curCol)) {
addNullColumnValue(sparkRow, row, curCol)
} else {
addNonNullColumnValue(sparkRow, row, curCol)
dataTypes(curCol) match {
case StringType =>
row += sparkRow.get(curCol).asInstanceOf[String]
case IntegerType =>
row += sparkRow.getInt(curCol)
case BooleanType =>
row += sparkRow.getBoolean(curCol)
case DoubleType =>
row += sparkRow.getDouble(curCol)
case FloatType =>
row += sparkRow.getFloat(curCol)
case DecimalType =>
row += sparkRow.get(curCol).asInstanceOf[BigDecimal].bigDecimal
case LongType =>
row += sparkRow.getLong(curCol)
case ByteType =>
row += sparkRow.getByte(curCol)
case ShortType =>
row += sparkRow.getShort(curCol)
case TimestampType =>
row += sparkRow.get(curCol).asInstanceOf[Timestamp]
case BinaryType =>
row += sparkRow.get(curCol).asInstanceOf[String]
case _: ArrayType =>
row += sparkRow.get(curCol).asInstanceOf[String]
case _: StructType =>
row += sparkRow.get(curCol).asInstanceOf[String]
case _: MapType =>
row += sparkRow.get(curCol).asInstanceOf[String]
}
curCol += 1
}
rowSet += row
reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
row.clear()
curRow += 1
}
new RowSet(rowSet, 0)
}
}

def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to.addString(from(ordinal).asInstanceOf[String])
case IntegerType =>
to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal)))
case BooleanType =>
to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal)))
case DoubleType =>
to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal)))
case FloatType =>
to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal)))
case DecimalType =>
val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
case LongType =>
to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal)))
case ByteType =>
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
case ShortType =>
to.addColumnValue(ColumnValue.intValue(from.getShort(ordinal)))
case TimestampType =>
to.addColumnValue(
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = result
.queryExecution
.asInstanceOf[HiveContext#QueryExecution]
.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to.addColumnValue(ColumnValue.stringValue(hiveString))
}
}

def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to.addString(null)
case IntegerType =>
to.addColumnValue(ColumnValue.intValue(null))
case BooleanType =>
to.addColumnValue(ColumnValue.booleanValue(null))
case DoubleType =>
to.addColumnValue(ColumnValue.doubleValue(null))
case FloatType =>
to.addColumnValue(ColumnValue.floatValue(null))
case DecimalType =>
to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal))
case LongType =>
to.addColumnValue(ColumnValue.longValue(null))
case ByteType =>
to.addColumnValue(ColumnValue.byteValue(null))
case ShortType =>
to.addColumnValue(ColumnValue.intValue(null))
case TimestampType =>
to.addColumnValue(ColumnValue.timestampValue(null))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
to.addColumnValue(ColumnValue.stringValue(null: String))
reultRowSet
}
}

Expand All @@ -165,44 +166,83 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
}
}

private def getConfigForOperation: HiveConf = {
var sqlOperationConf: HiveConf = getParentSession.getHiveConf
if (!getConfOverlay.isEmpty || shouldRunAsync) {
sqlOperationConf = new HiveConf(sqlOperationConf)
import scala.collection.JavaConversions._
for (confEntry <- getConfOverlay.entrySet) {
try {
sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue)
}
catch {
case e: IllegalArgumentException => {
throw new HiveSQLException("Error applying statement specific settings", e)
}
}
}
}
return sqlOperationConf
}

def run(): Unit = {
logInfo(s"Running query '$statement'")
val opConfig: HiveConf = getConfigForOperation
setState(OperationState.RUNNING)
try {
result = hiveContext.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some(key), Some(value)) if (key == SQLConf.THRIFTSERVER_POOL) =>
sessionToActivePool(parentSession) = value
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
setHasResultSet(true)

val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
sessionToActivePool.get(parentSession).foreach { pool =>
hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
iter = {
val resultRdd = result.queryExecution.toRdd
val useIncrementalCollect =
hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
resultRdd.toLocalIterator
} else {
resultRdd.collect().iterator
if (!shouldRunAsync) {
runInternal(statement)
setState(OperationState.FINISHED)
} else {
val parentSessionState = SessionState.get
val sessionHive: Hive = Hive.get
val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig)

val backgroundOperation: Runnable = new Runnable {
def run {
val doAsAction: PrivilegedExceptionAction[AnyRef] =
new PrivilegedExceptionAction[AnyRef] {
def run: AnyRef = {
Hive.set(sessionHive)
SessionState.setCurrentSessionState(parentSessionState)
try {
runInternal(statement)
}
catch {
case e: HiveSQLException => {
setOperationException(e)
logError("Error running hive query: ", e)
}
}
return null
}
}
try {
ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction)
}
catch {
case e: Exception => {
setOperationException(new HiveSQLException(e))
logError("Error running hive query as user : " + currentUGI.getShortUserName, e)
}
}
setState(OperationState.FINISHED)
}
}
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)

try {
val backgroundHandle: Future[_] = getParentSession.getSessionManager.
submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logError("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
}
setState(OperationState.FINISHED)
}
}

Expand Down
Loading