From ae47489fde5443a4f916dd5407617eec08191ad4 Mon Sep 17 00:00:00 2001 From: scwf Date: Mon, 13 Oct 2014 22:01:49 -0700 Subject: [PATCH 01/18] update and fix conflicts --- pom.xml | 28 +- sql/hive-thriftserver/pom.xml | 150 +++++++++- ...ver.scala => AbstractSparkSQLDriver.scala} | 18 +- .../hive/thriftserver/SparkSQLCLIDriver.scala | 5 +- .../thriftserver/SparkSQLCLIService.scala | 19 +- .../server/SparkSQLOperationManager.scala | 169 +---------- .../spark/sql/hive/thriftserver/Shim.scala | 229 +++++++++++++++ .../spark/sql/hive/thriftserver/Shim.scala | 266 ++++++++++++++++++ 8 files changed, 677 insertions(+), 207 deletions(-) rename sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/{SparkSQLDriver.scala => AbstractSparkSQLDriver.scala} (86%) create mode 100644 sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala create mode 100644 sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala diff --git a/pom.xml b/pom.xml index 288bbf1114bea..04da8c586f124 100644 --- a/pom.xml +++ b/pom.xml @@ -127,7 +127,6 @@ 0.94.6 1.4.0 3.4.5 - 0.12.0-protobuf-2.5 1.4.3 1.2.3 8.1.14.v20131031 @@ -441,7 +440,7 @@ org.apache.derby derby - 10.4.2.0 + ${derby.version} com.codahale.metrics @@ -1272,7 +1271,18 @@ - + + hive-default + + + !hive.version + + + + 0.12.0-protobuf-2.5 + 10.4.2.0 + + hive @@ -1282,6 +1292,16 @@ sql/hive-thriftserver - + + hive-versions + + + hive.version + + + + 10.10.1.1 + + diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 124fc107cb8aa..32721c14b931d 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -41,27 +41,147 @@ spark-hive_${scala.binary.version} ${project.version} - - org.spark-project.hive - hive-cli - ${hive.version} - - - org.spark-project.hive - hive-jdbc - ${hive.version} - - - org.spark-project.hive - hive-beeline - ${hive.version} - org.scalatest scalatest_${scala.binary.version} test + + + + hive-default + + + !hive.version + + + + + org.spark-project.hive + hive-cli + ${hive.version} + + + org.spark-project.hive + hive-jdbc + ${hive.version} + + + org.spark-project.hive + hive-beeline + ${hive.version} + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-default-sources + generate-sources + + add-source + + + + v${hive.version}/src/main/scala + + + + + + + + + + hive + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + + + hive-versions + + + hive.version + + + + + org.spark-project.hive + hive-cli + ${hive.version} + + + org.spark-project.hive + hive-jdbc + ${hive.version} + + + org.spark-project.hive + hive-beeline + ${hive.version} + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-v13-sources + generate-sources + + add-source + + + + v${hive.version}/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala similarity index 86% rename from sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala rename to sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala index 7463df1f47d43..bdcc07066ecd1 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/AbstractSparkSQLDriver.scala @@ -29,11 +29,11 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.Logging import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext) - extends Driver with Logging { +private[hive] abstract class AbstractSparkSQLDriver( + val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver with Logging { - private var tableSchema: Schema = _ - private var hiveResponse: Seq[String] = _ + private[hive] var tableSchema: Schema = _ + private[hive] var hiveResponse: Seq[String] = _ override def init(): Unit = { } @@ -74,16 +74,6 @@ private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveCo override def getSchema: Schema = tableSchema - override def getResults(res: JArrayList[String]): Boolean = { - if (hiveResponse == null) { - false - } else { - res.addAll(hiveResponse) - hiveResponse = null - true - } - } - override def destroy() { super.destroy() hiveResponse = null diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7ba4564602ecd..6d9de63348624 100755 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.thrift.transport.TSocket import org.apache.spark.Logging +import org.apache.spark.sql.hive.thriftserver.HiveShim private[hive] object SparkSQLCLIDriver { private var prompt = "spark-sql" @@ -116,7 +117,7 @@ private[hive] object SparkSQLCLIDriver { } } - if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) { + if (!sessionState.isRemoteMode) { // Hadoop-20 and above - we need to augment classpath using hiveconf // components. // See also: code in ExecDriver.java @@ -258,7 +259,7 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { } else { var ret = 0 val hconf = conf.asInstanceOf[HiveConf] - val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf) + val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hconf) if (proc != null) { if (proc.isInstanceOf[Driver] || proc.isInstanceOf[SetProcessor]) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 42cbf363b274f..f0f7fe8cea122 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -24,6 +24,7 @@ import java.util.{List => JList} import javax.security.auth.login.LoginException import org.apache.commons.logging.Log +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hive.service.Service.STATE @@ -44,15 +45,17 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext) val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext) setSuperField(this, "sessionManager", sparkSqlSessionManager) addService(sparkSqlSessionManager) + var sparkServiceUGI: UserGroupInformation = null - try { - HiveAuthFactory.loginFromKeytab(hiveConf) - val serverUserName = ShimLoader.getHadoopShims - .getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf)) - setSuperField(this, "serverUserName", serverUserName) - } catch { - case e @ (_: IOException | _: LoginException) => - throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) + if (ShimLoader.getHadoopShims().isSecurityEnabled()) { + try { + HiveAuthFactory.loginFromKeytab(hiveConf) + sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf) + HiveShim.setServerUserName(sparkServiceUGI, this) + } catch { + case e @ (_: IOException | _: LoginException) => + throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) + } } initCompositeService(hiveConf) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index accf61576b804..2a4f24132cc5e 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -17,24 +17,15 @@ package org.apache.spark.sql.hive.thriftserver.server -import java.sql.Timestamp import java.util.{Map => JMap} +import scala.collection.mutable.Map -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, Map} -import scala.math.{random, round} - -import org.apache.hadoop.hive.common.`type`.HiveDecimal -import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager} import org.apache.hive.service.cli.session.HiveSession import org.apache.spark.Logging -import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} -import org.apache.spark.sql.catalyst.plans.logical.SetCommand -import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -import org.apache.spark.sql.hive.thriftserver.ReflectionUtils +import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.hive.thriftserver.{SparkExecuteStatementOperation, ReflectionUtils} /** * Executes queries using Spark SQL, and maintains a list of handles to active queries. @@ -54,158 +45,8 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) { - private var result: SchemaRDD = _ - private var iter: Iterator[SparkRow] = _ - private var dataTypes: Array[DataType] = _ - - def close(): Unit = { - // RDDs will be cleaned automatically upon garbage collection. - logDebug("CLOSING") - } - - def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { - if (!iter.hasNext) { - new RowSet() - } else { - // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int - val maxRows = maxRowsL.toInt - var curRow = 0 - var rowSet = new ArrayBuffer[Row](maxRows.min(1024)) - - while (curRow < maxRows && iter.hasNext) { - val sparkRow = iter.next() - val row = new Row() - var curCol = 0 - - while (curCol < sparkRow.length) { - if (sparkRow.isNullAt(curCol)) { - addNullColumnValue(sparkRow, row, curCol) - } else { - addNonNullColumnValue(sparkRow, row, curCol) - } - curCol += 1 - } - rowSet += row - curRow += 1 - } - new RowSet(rowSet, 0) - } - } - - def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) { - dataTypes(ordinal) match { - case StringType => - to.addString(from(ordinal).asInstanceOf[String]) - case IntegerType => - to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal))) - case BooleanType => - to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal))) - case DoubleType => - to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal))) - case FloatType => - to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal))) - case DecimalType => - val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal - to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal))) - case LongType => - to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal))) - case ByteType => - to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal))) - case ShortType => - to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal))) - case TimestampType => - to.addColumnValue( - ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp])) - case BinaryType | _: ArrayType | _: StructType | _: MapType => - val hiveString = result - .queryExecution - .asInstanceOf[HiveContext#QueryExecution] - .toHiveString((from.get(ordinal), dataTypes(ordinal))) - to.addColumnValue(ColumnValue.stringValue(hiveString)) - } - } - - def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) { - dataTypes(ordinal) match { - case StringType => - to.addString(null) - case IntegerType => - to.addColumnValue(ColumnValue.intValue(null)) - case BooleanType => - to.addColumnValue(ColumnValue.booleanValue(null)) - case DoubleType => - to.addColumnValue(ColumnValue.doubleValue(null)) - case FloatType => - to.addColumnValue(ColumnValue.floatValue(null)) - case DecimalType => - to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal)) - case LongType => - to.addColumnValue(ColumnValue.longValue(null)) - case ByteType => - to.addColumnValue(ColumnValue.byteValue(null)) - case ShortType => - to.addColumnValue(ColumnValue.shortValue(null)) - case TimestampType => - to.addColumnValue(ColumnValue.timestampValue(null)) - case BinaryType | _: ArrayType | _: StructType | _: MapType => - to.addColumnValue(ColumnValue.stringValue(null: String)) - } - } - - def getResultSetSchema: TableSchema = { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - if (result.queryExecution.analyzed.output.size == 0) { - new TableSchema(new FieldSchema("Result", "string", "") :: Nil) - } else { - val schema = result.queryExecution.analyzed.output.map { attr => - new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") - } - new TableSchema(schema) - } - } - - def run(): Unit = { - logInfo(s"Running query '$statement'") - setState(OperationState.RUNNING) - try { - result = hiveContext.sql(statement) - logDebug(result.queryExecution.toString()) - result.queryExecution.logical match { - case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) => - sessionToActivePool(parentSession) = value - logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") - case _ => - } - - val groupId = round(random * 1000000).toString - hiveContext.sparkContext.setJobGroup(groupId, statement) - sessionToActivePool.get(parentSession).foreach { pool => - hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) - } - iter = { - val resultRdd = result.queryExecution.toRdd - val useIncrementalCollect = - hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean - if (useIncrementalCollect) { - resultRdd.toLocalIterator - } else { - resultRdd.collect().iterator - } - } - dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray - setHasResultSet(true) - } catch { - // Actually do need to catch Throwable as some failures don't inherit from Exception and - // HiveServer will silently swallow them. - case e: Throwable => - logError("Error executing query:",e) - throw new HiveSQLException(e.toString) - } - setState(OperationState.FINISHED) - } - } - + val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)( + hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) operation } diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala new file mode 100644 index 0000000000000..8538b9761009d --- /dev/null +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.sql.Timestamp +import java.util.{ArrayList => JArrayList, Map => JMap} + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, Map => SMap} +import scala.math._ + +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.shims.ShimLoader +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hive.service.cli._ +import org.apache.hive.service.cli.operation.ExecuteStatementOperation +import org.apache.hive.service.cli.session.HiveSession +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.plans.logical.SetCommand +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} +import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext} +import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ + +/** + * A compatibility layer for interacting with Hive version 0.12.0. + */ +private[thriftserver] object HiveShim { + val version = "0.12.0" + + def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = { + val serverUserName = ShimLoader.getHadoopShims.getShortUserName(sparkServiceUGI) + setSuperField(sparkCliService, "serverUserName", serverUserName)// is this alright? + } + + def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { + CommandProcessorFactory.get(cmd(0), conf) + } +} + +private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext) + extends AbstractSparkSQLDriver(_context) { + override def getResults(res: JArrayList[String]): Boolean = { + if (hiveResponse == null) { + false + } else { + res.addAll(hiveResponse) + hiveResponse = null + true + } + } +} + +private[hive] class SparkExecuteStatementOperation( + parentSession: HiveSession, + statement: String, + confOverlay: JMap[String, String])( + hiveContext: HiveContext, + sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation( + parentSession, statement, confOverlay) with Logging { + private var result: SchemaRDD = _ + private var iter: Iterator[SparkRow] = _ + private var dataTypes: Array[DataType] = _ + + def close(): Unit = { + // RDDs will be cleaned automatically upon garbage collection. + logDebug("CLOSING") + } + + def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { + if (!iter.hasNext) { + new RowSet() + } else { + // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int + val maxRows = maxRowsL.toInt + var curRow = 0 + var rowSet = new ArrayBuffer[Row](maxRows.min(1024)) + + while (curRow < maxRows && iter.hasNext) { + val sparkRow = iter.next() + val row = new Row() + var curCol = 0 + + while (curCol < sparkRow.length) { + if (sparkRow.isNullAt(curCol)) { + addNullColumnValue(sparkRow, row, curCol) + } else { + addNonNullColumnValue(sparkRow, row, curCol) + } + curCol += 1 + } + rowSet += row + curRow += 1 + } + new RowSet(rowSet, 0) + } + } + + def addNonNullColumnValue(from: SparkRow, to: Row, ordinal: Int) { + dataTypes(ordinal) match { + case StringType => + to.addString(from(ordinal).asInstanceOf[String]) + case IntegerType => + to.addColumnValue(ColumnValue.intValue(from.getInt(ordinal))) + case BooleanType => + to.addColumnValue(ColumnValue.booleanValue(from.getBoolean(ordinal))) + case DoubleType => + to.addColumnValue(ColumnValue.doubleValue(from.getDouble(ordinal))) + case FloatType => + to.addColumnValue(ColumnValue.floatValue(from.getFloat(ordinal))) + case DecimalType => + val hiveDecimal = from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal + to.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal))) + case LongType => + to.addColumnValue(ColumnValue.longValue(from.getLong(ordinal))) + case ByteType => + to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal))) + case ShortType => + to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal))) + case TimestampType => + to.addColumnValue( + ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp])) + case BinaryType | _: ArrayType | _: StructType | _: MapType => + val hiveString = result + .queryExecution + .asInstanceOf[HiveContext#QueryExecution] + .toHiveString((from.get(ordinal), dataTypes(ordinal))) + to.addColumnValue(ColumnValue.stringValue(hiveString)) + } + } + + def addNullColumnValue(from: SparkRow, to: Row, ordinal: Int) { + dataTypes(ordinal) match { + case StringType => + to.addString(null) + case IntegerType => + to.addColumnValue(ColumnValue.intValue(null)) + case BooleanType => + to.addColumnValue(ColumnValue.booleanValue(null)) + case DoubleType => + to.addColumnValue(ColumnValue.doubleValue(null)) + case FloatType => + to.addColumnValue(ColumnValue.floatValue(null)) + case DecimalType => + to.addColumnValue(ColumnValue.stringValue(null: HiveDecimal)) + case LongType => + to.addColumnValue(ColumnValue.longValue(null)) + case ByteType => + to.addColumnValue(ColumnValue.byteValue(null)) + case ShortType => + to.addColumnValue(ColumnValue.shortValue(null)) + case TimestampType => + to.addColumnValue(ColumnValue.timestampValue(null)) + case BinaryType | _: ArrayType | _: StructType | _: MapType => + to.addColumnValue(ColumnValue.stringValue(null: String)) + } + } + + def getResultSetSchema: TableSchema = { + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") + if (result.queryExecution.analyzed.output.size == 0) { + new TableSchema(new FieldSchema("Result", "string", "") :: Nil) + } else { + val schema = result.queryExecution.analyzed.output.map { attr => + new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") + } + new TableSchema(schema) + } + } + + def run(): Unit = { + logInfo(s"Running query '$statement'") + setState(OperationState.RUNNING) + try { + result = hiveContext.sql(statement) + logDebug(result.queryExecution.toString()) + result.queryExecution.logical match { + case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) => + sessionToActivePool(parentSession) = value + logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") + case _ => + } + + val groupId = round(random * 1000000).toString + hiveContext.sparkContext.setJobGroup(groupId, statement) + sessionToActivePool.get(parentSession).foreach { pool => + hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) + } + iter = { + val resultRdd = result.queryExecution.toRdd + val useIncrementalCollect = + hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + if (useIncrementalCollect) { + resultRdd.toLocalIterator + } else { + resultRdd.collect().iterator + } + } + dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray + setHasResultSet(true) + } catch { + // Actually do need to catch Throwable as some failures don't inherit from Exception and + // HiveServer will silently swallow them. + case e: Throwable => + logError("Error executing query:",e) + throw new HiveSQLException(e.toString) + } + setState(OperationState.FINISHED) + } +} diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala new file mode 100644 index 0000000000000..7f61970c344c1 --- /dev/null +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver + +import java.security.PrivilegedExceptionAction +import java.sql.Timestamp +import java.util.concurrent.Future +import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} + +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory +import org.apache.hadoop.hive.ql.session.SessionState + +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, Map => SMap} +import scala.math._ + +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.shims.ShimLoader +import org.apache.hadoop.security.UserGroupInformation +import org.apache.hive.service.cli._ +import org.apache.hive.service.cli.operation.ExecuteStatementOperation +import org.apache.hive.service.cli.session.HiveSession +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.plans.logical.SetCommand +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} +import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext} +import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ + +/** + * A compatibility layer for interacting with Hive version 0.12.0. + */ +private[thriftserver] object HiveShim { + val version = "0.13.1" + + def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = { + setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)// is this alright? + } + + def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { + CommandProcessorFactory.get(cmd, conf) + } +} + +private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext) + extends AbstractSparkSQLDriver(_context) { + override def getResults(res: JList[_]): Boolean = { + if (hiveResponse == null) { + false + } else { + res.asInstanceOf[JArrayList[String]].addAll(hiveResponse) + hiveResponse = null + true + } + } +} + +private[hive] class SparkExecuteStatementOperation( + parentSession: HiveSession, + statement: String, + confOverlay: JMap[String, String], + runInBackground: Boolean = true)( + hiveContext: HiveContext, + sessionToActivePool: SMap[HiveSession, String]) extends ExecuteStatementOperation( + parentSession, statement, confOverlay, runInBackground) with Logging { + + private var result: SchemaRDD = _ + private var iter: Iterator[SparkRow] = _ + private var dataTypes: Array[DataType] = _ + + private def runInternal(cmd: String) = { + try { + result = hiveContext.sql(cmd) + logDebug(result.queryExecution.toString()) + val groupId = round(random * 1000000).toString + hiveContext.sparkContext.setJobGroup(groupId, statement) + iter = { + val resultRdd = result.queryExecution.toRdd + val useIncrementalCollect = + hiveContext.getConf("spark.sql.thriftServer.incrementalCollect", "false").toBoolean + if (useIncrementalCollect) { + resultRdd.toLocalIterator + } else { + resultRdd.collect().iterator + } + } + dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray + } catch { + // Actually do need to catch Throwable as some failures don't inherit from Exception and + // HiveServer will silently swallow them. + case e: Throwable => + logError("Error executing query:",e) + throw new HiveSQLException(e.toString) + } + } + + def close(): Unit = { + // RDDs will be cleaned automatically upon garbage collection. + logDebug("CLOSING") + } + + def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { + validateDefaultFetchOrientation(order) + assertState(OperationState.FINISHED) + setHasResultSet(true) + val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion) + if (!iter.hasNext) { + reultRowSet + } else { + // maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int + val maxRows = maxRowsL.toInt + var curRow = 0 + while (curRow < maxRows && iter.hasNext) { + val sparkRow = iter.next() + val row = ArrayBuffer[Any]() + var curCol = 0 + while (curCol < sparkRow.length) { + dataTypes(curCol) match { + case StringType => + row += sparkRow.get(curCol).asInstanceOf[String] + case IntegerType => + row += sparkRow.getInt(curCol) + case BooleanType => + row += sparkRow.getBoolean(curCol) + case DoubleType => + row += sparkRow.getDouble(curCol) + case FloatType => + row += sparkRow.getFloat(curCol) + case DecimalType => + row += sparkRow.get(curCol).asInstanceOf[BigDecimal].bigDecimal + case LongType => + row += sparkRow.getLong(curCol) + case ByteType => + row += sparkRow.getByte(curCol) + case ShortType => + row += sparkRow.getShort(curCol) + case TimestampType => + row += sparkRow.get(curCol).asInstanceOf[Timestamp] + case BinaryType => + row += sparkRow.get(curCol).asInstanceOf[String] + case _: ArrayType => + row += sparkRow.get(curCol).asInstanceOf[String] + case _: StructType => + row += sparkRow.get(curCol).asInstanceOf[String] + case _: MapType => + row += sparkRow.get(curCol).asInstanceOf[String] + } + curCol += 1 + } + reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) + row.clear() + curRow += 1 + } + reultRowSet + } + } + + def getResultSetSchema: TableSchema = { + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") + if (result.queryExecution.analyzed.output.size == 0) { + new TableSchema(new FieldSchema("Result", "string", "") :: Nil) + } else { + val schema = result.queryExecution.analyzed.output.map { attr => + new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") + } + new TableSchema(schema) + } + } + + private def getConfigForOperation: HiveConf = { + var sqlOperationConf: HiveConf = getParentSession.getHiveConf + if (!getConfOverlay.isEmpty || shouldRunAsync) { + sqlOperationConf = new HiveConf(sqlOperationConf) + import scala.collection.JavaConversions._ + for (confEntry <- getConfOverlay.entrySet) { + try { + sqlOperationConf.verifyAndSet(confEntry.getKey, confEntry.getValue) + } + catch { + case e: IllegalArgumentException => { + throw new HiveSQLException("Error applying statement specific settings", e) + } + } + } + } + return sqlOperationConf + } + + def run(): Unit = { + logInfo(s"Running query '$statement'") + val opConfig: HiveConf = getConfigForOperation + setState(OperationState.RUNNING) + setHasResultSet(true) + + if (!shouldRunAsync) { + runInternal(statement) + setState(OperationState.FINISHED) + } else { + val parentSessionState = SessionState.get + val sessionHive: Hive = Hive.get + val currentUGI: UserGroupInformation = ShimLoader.getHadoopShims.getUGIForConf(opConfig) + + val backgroundOperation: Runnable = new Runnable { + def run { + val doAsAction: PrivilegedExceptionAction[AnyRef] = + new PrivilegedExceptionAction[AnyRef] { + def run: AnyRef = { + Hive.set(sessionHive) + SessionState.setCurrentSessionState(parentSessionState) + try { + runInternal(statement) + } + catch { + case e: HiveSQLException => { + setOperationException(e) + logError("Error running hive query: ", e) + } + } + return null + } + } + try { + ShimLoader.getHadoopShims.doAs(currentUGI, doAsAction) + } + catch { + case e: Exception => { + setOperationException(new HiveSQLException(e)) + logError("Error running hive query as user : " + currentUGI.getShortUserName, e) + } + } + setState(OperationState.FINISHED) + } + } + + try { + val backgroundHandle: Future[_] = getParentSession.getSessionManager. + submitBackgroundOperation(backgroundOperation) + setBackgroundHandle(backgroundHandle) + } catch { + // Actually do need to catch Throwable as some failures don't inherit from Exception and + // HiveServer will silently swallow them. + case e: Throwable => + logError("Error executing query:",e) + throw new HiveSQLException(e.toString) + } + } + } +} From 7c66b8e4e82f9f1ff6857eae19f2494fb05dab67 Mon Sep 17 00:00:00 2001 From: scwf Date: Tue, 14 Oct 2014 00:40:21 -0700 Subject: [PATCH 02/18] update pom according spark-2706 --- assembly/pom.xml | 6 ++ pom.xml | 37 ++++---- sql/hive-thriftserver/pom.xml | 167 +++++++--------------------------- 3 files changed, 57 insertions(+), 153 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 31a01e4d8e1de..bfef95b8deb95 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -197,6 +197,12 @@ spark-hive_${scala.binary.version} ${project.version} + + + + + hive-0.12.0 + org.apache.spark spark-hive-thriftserver_${scala.binary.version} diff --git a/pom.xml b/pom.xml index 04da8c586f124..f2990bc8cbb6b 100644 --- a/pom.xml +++ b/pom.xml @@ -127,6 +127,11 @@ 0.94.6 1.4.0 3.4.5 + + 0.13.1 + + 0.13.1 + 10.10.1.1 1.4.3 1.2.3 8.1.14.v20131031 @@ -887,9 +892,9 @@ by Spark SQL for code generation. --> - org.scalamacros - paradise_${scala.version} - ${scala.macros.version} + org.scalamacros + paradise_${scala.version} + ${scala.macros.version} @@ -1272,34 +1277,28 @@ - hive-default + hive-0.12.0 - - !hive.version - + false + + + sql/hive-thriftserver + 0.12.0-protobuf-2.5 + 0.12.0 10.4.2.0 - hive + hive-0.13.1 false - - sql/hive-thriftserver - - - - hive-versions - - - hive.version - - + 0.13.1 + 0.13.1 10.10.1.1 diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 32721c14b931d..389ddbbb70d6d 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -46,142 +46,23 @@ scalatest_${scala.binary.version} test + + org.spark-project.hive + hive-cli + ${hive.version} + + + org.spark-project.hive + hive-jdbc + ${hive.version} + + + org.spark-project.hive + hive-beeline + ${hive.version} + - - - hive-default - - - !hive.version - - - - - org.spark-project.hive - hive-cli - ${hive.version} - - - org.spark-project.hive - hive-jdbc - ${hive.version} - - - org.spark-project.hive - hive-beeline - ${hive.version} - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-default-sources - generate-sources - - add-source - - - - v${hive.version}/src/main/scala - - - - - - - - - - hive - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-scala-test-sources - generate-test-sources - - add-test-source - - - - src/test/scala - - - - - - - - - - hive-versions - - - hive.version - - - - - org.spark-project.hive - hive-cli - ${hive.version} - - - org.spark-project.hive - hive-jdbc - ${hive.version} - - - org.spark-project.hive - hive-beeline - ${hive.version} - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-v13-sources - generate-sources - - add-source - - - - v${hive.version}/src/main/scala - - - - - add-scala-test-sources - generate-test-sources - - add-test-source - - - - src/test/scala - - - - - - - - - - target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes @@ -190,6 +71,24 @@ org.scalatest scalatest-maven-plugin + + org.codehaus.mojo + build-helper-maven-plugin + + + add-default-sources + generate-sources + + add-source + + + + v${hive.version.short}/src/main/scala + + + + + org.apache.maven.plugins maven-deploy-plugin From dfd1c63cd63580a19180f0440ff583c23462675b Mon Sep 17 00:00:00 2001 From: scwf Date: Tue, 14 Oct 2014 09:53:43 -0700 Subject: [PATCH 03/18] update run-tests to run hive-0.12.0 default now --- dev/run-tests | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index f47fcf66ff7e7..7d06c86eb4b41 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -140,7 +140,7 @@ CURRENT_BLOCK=$BLOCK_BUILD { # We always build with Hive because the PySpark Spark SQL tests need it. - BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" + BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS" @@ -167,7 +167,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled. # This must be a single argument, as it is. if [ -n "$_RUN_SQL_TESTS" ]; then - SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" + SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" fi if [ -n "$_SQL_TESTS_ONLY" ]; then From 0bc53aa4d6b96d17d161e237315dc97ee7770815 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 15 Oct 2014 22:16:31 -0700 Subject: [PATCH 04/18] fixed when result filed is null --- .../spark/sql/hive/thriftserver/Shim.scala | 8 +- .../spark/sql/hive/thriftserver/Shim.scala | 85 ++++++++++--------- 2 files changed, 49 insertions(+), 44 deletions(-) diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala index 8538b9761009d..fcfc5c5541b8e 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala @@ -20,20 +20,20 @@ package org.apache.spark.sql.hive.thriftserver import java.sql.Timestamp import java.util.{ArrayList => JArrayList, Map => JMap} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory - import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.math._ import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession + import org.apache.spark.Logging import org.apache.spark.sql.catalyst.plans.logical.SetCommand import org.apache.spark.sql.catalyst.types._ @@ -49,7 +49,7 @@ private[thriftserver] object HiveShim { def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = { val serverUserName = ShimLoader.getHadoopShims.getShortUserName(sparkServiceUGI) - setSuperField(sparkCliService, "serverUserName", serverUserName)// is this alright? + setSuperField(sparkCliService, "serverUserName", serverUserName) } def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala index 7f61970c344c1..223189c98139a 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala @@ -22,27 +22,25 @@ import java.sql.Timestamp import java.util.concurrent.Future import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory -import org.apache.hadoop.hive.ql.session.SessionState - import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.math._ -import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory +import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.shims.ShimLoader import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession + import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.plans.logical.SetCommand import org.apache.spark.sql.catalyst.types._ -import org.apache.spark.sql.{Row => SparkRow, SQLConf, SchemaRDD} -import org.apache.spark.sql.hive.{HiveMetastoreTypes, HiveContext} +import org.apache.spark.sql.{Row => SparkRow, SchemaRDD} +import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ /** @@ -52,7 +50,7 @@ private[thriftserver] object HiveShim { val version = "0.13.1" def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = { - setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI)// is this alright? + setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI) } def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { @@ -117,6 +115,39 @@ private[hive] class SparkExecuteStatementOperation( logDebug("CLOSING") } + def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) { + dataTypes(ordinal) match { + case StringType => + to += from.get(ordinal).asInstanceOf[String] + case IntegerType => + to += from.getInt(ordinal) + case BooleanType => + to += from.getBoolean(ordinal) + case DoubleType => + to += from.getDouble(ordinal) + case FloatType => + to += from.getFloat(ordinal) + case DecimalType => + to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal + case LongType => + to += from.getLong(ordinal) + case ByteType => + to += from.getByte(ordinal) + case ShortType => + to += from.getShort(ordinal) + case TimestampType => + to += from.get(ordinal).asInstanceOf[Timestamp] + case BinaryType => + to += from.get(ordinal).asInstanceOf[String] + case _: ArrayType => + to += from.get(ordinal).asInstanceOf[String] + case _: StructType => + to += from.get(ordinal).asInstanceOf[String] + case _: MapType => + to += from.get(ordinal).asInstanceOf[String] + } + } + def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) @@ -133,40 +164,14 @@ private[hive] class SparkExecuteStatementOperation( val row = ArrayBuffer[Any]() var curCol = 0 while (curCol < sparkRow.length) { - dataTypes(curCol) match { - case StringType => - row += sparkRow.get(curCol).asInstanceOf[String] - case IntegerType => - row += sparkRow.getInt(curCol) - case BooleanType => - row += sparkRow.getBoolean(curCol) - case DoubleType => - row += sparkRow.getDouble(curCol) - case FloatType => - row += sparkRow.getFloat(curCol) - case DecimalType => - row += sparkRow.get(curCol).asInstanceOf[BigDecimal].bigDecimal - case LongType => - row += sparkRow.getLong(curCol) - case ByteType => - row += sparkRow.getByte(curCol) - case ShortType => - row += sparkRow.getShort(curCol) - case TimestampType => - row += sparkRow.get(curCol).asInstanceOf[Timestamp] - case BinaryType => - row += sparkRow.get(curCol).asInstanceOf[String] - case _: ArrayType => - row += sparkRow.get(curCol).asInstanceOf[String] - case _: StructType => - row += sparkRow.get(curCol).asInstanceOf[String] - case _: MapType => - row += sparkRow.get(curCol).asInstanceOf[String] + if (sparkRow.isNullAt(curCol)) { + row += null + } else { + row += addNonNullColumnValue(sparkRow, row, curCol) } curCol += 1 } reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]]) - row.clear() curRow += 1 } reultRowSet From 4b681f4e612685e55f1eb844a80e3d87958ffa28 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 15 Oct 2014 22:22:18 -0700 Subject: [PATCH 05/18] enable thriftserver in profile hive-0.13.1 --- assembly/pom.xml | 6 ------ pom.xml | 4 +++- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index bfef95b8deb95..31a01e4d8e1de 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -197,12 +197,6 @@ spark-hive_${scala.binary.version} ${project.version} - - - - - hive-0.12.0 - org.apache.spark spark-hive-thriftserver_${scala.binary.version} diff --git a/pom.xml b/pom.xml index f2990bc8cbb6b..ddd8b100ae86d 100644 --- a/pom.xml +++ b/pom.xml @@ -1281,7 +1281,6 @@ false - sql/hive-thriftserver @@ -1296,6 +1295,9 @@ false + + sql/hive-thriftserver + 0.13.1 0.13.1 From 13afde04ac3b57a3c231e5447affeb50525f3aa6 Mon Sep 17 00:00:00 2001 From: scwf Date: Thu, 16 Oct 2014 01:05:36 -0700 Subject: [PATCH 06/18] fix small bug --- .../scala/org/apache/spark/sql/hive/thriftserver/Shim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala index 223189c98139a..c05311dabda16 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala @@ -167,7 +167,7 @@ private[hive] class SparkExecuteStatementOperation( if (sparkRow.isNullAt(curCol)) { row += null } else { - row += addNonNullColumnValue(sparkRow, row, curCol) + addNonNullColumnValue(sparkRow, row, curCol) } curCol += 1 } From 41f727bf0f99f93bda060a6dd48a2dd39bd22e92 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 22 Oct 2014 19:25:25 -0700 Subject: [PATCH 07/18] revert pom changes --- sql/hive-thriftserver/pom.xml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 389ddbbb70d6d..6e950afbd6cf3 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -41,11 +41,6 @@ spark-hive_${scala.binary.version} ${project.version} - - org.scalatest - scalatest_${scala.binary.version} - test - org.spark-project.hive hive-cli @@ -61,6 +56,11 @@ hive-beeline ${hive.version} + + org.scalatest + scalatest_${scala.binary.version} + test + From 3529e986528bed46933ce4d850938e51abdbdc03 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 26 Oct 2014 20:07:01 -0700 Subject: [PATCH 08/18] move hive module to hive profile --- dev/run-tests | 2 +- pom.xml | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 7d06c86eb4b41..f55497ae2bfbd 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -167,7 +167,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled. # This must be a single argument, as it is. if [ -n "$_RUN_SQL_TESTS" ]; then - SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" + SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" fi if [ -n "$_SQL_TESTS_ONLY" ]; then diff --git a/pom.xml b/pom.xml index 411407b08b81a..12b1fe8ddd2c3 100644 --- a/pom.xml +++ b/pom.xml @@ -94,8 +94,6 @@ streaming sql/catalyst sql/core - sql/hive - sql/hive-thriftserver repl assembly external/twitter @@ -1313,6 +1311,16 @@ + + hive + + false + + + sql/hive + sql/hive-thriftserver + + hive-0.12.0 From 52674a4c3ac6b44e833b09ca61a1028f7747fd22 Mon Sep 17 00:00:00 2001 From: scwf Date: Sun, 26 Oct 2014 20:32:09 -0700 Subject: [PATCH 09/18] sql/hive included since examples depend on it --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 12b1fe8ddd2c3..197ec4f53675d 100644 --- a/pom.xml +++ b/pom.xml @@ -94,6 +94,7 @@ streaming sql/catalyst sql/core + sql/hive repl assembly external/twitter @@ -1317,7 +1318,6 @@ false - sql/hive sql/hive-thriftserver From c3598225e1d8e7d6d9535785af12bf7578fe67bd Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 28 Oct 2014 09:46:30 -0700 Subject: [PATCH 10/18] reuse getCommandProcessor in hiveshim --- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 3 ++- .../spark/sql/hive/thriftserver/SparkSQLCLIService.scala | 2 +- .../sql/hive/thriftserver/{Shim.scala => Shim12.scala} | 6 +----- .../sql/hive/thriftserver/{Shim.scala => Shim13.scala} | 6 +----- 4 files changed, 5 insertions(+), 12 deletions(-) rename sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/{Shim.scala => Shim12.scala} (98%) rename sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/{Shim.scala => Shim13.scala} (98%) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 6d9de63348624..2cd02ae9269f5 100755 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -38,7 +38,8 @@ import org.apache.hadoop.hive.shims.ShimLoader import org.apache.thrift.transport.TSocket import org.apache.spark.Logging -import org.apache.spark.sql.hive.thriftserver.HiveShim +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.hive.thriftserver.HiveThriftServerShim private[hive] object SparkSQLCLIDriver { private var prompt = "spark-sql" diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index f0f7fe8cea122..a78311fc48635 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -51,7 +51,7 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext) try { HiveAuthFactory.loginFromKeytab(hiveConf) sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf) - HiveShim.setServerUserName(sparkServiceUGI, this) + HiveThriftServerShim.setServerUserName(sparkServiceUGI, this) } catch { case e @ (_: IOException | _: LoginException) => throw new ServiceException("Unable to login to kerberos with given principal/keytab", e) diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala similarity index 98% rename from sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala rename to sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala index fcfc5c5541b8e..bbd727c686bbc 100644 --- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala +++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala @@ -44,17 +44,13 @@ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ /** * A compatibility layer for interacting with Hive version 0.12.0. */ -private[thriftserver] object HiveShim { +private[thriftserver] object HiveThriftServerShim { val version = "0.12.0" def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = { val serverUserName = ShimLoader.getHadoopShims.getShortUserName(sparkServiceUGI) setSuperField(sparkCliService, "serverUserName", serverUserName) } - - def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { - CommandProcessorFactory.get(cmd(0), conf) - } } private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext) diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala similarity index 98% rename from sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala rename to sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala index c05311dabda16..e59681bfbe43e 100644 --- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim.scala +++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala @@ -46,16 +46,12 @@ import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ /** * A compatibility layer for interacting with Hive version 0.12.0. */ -private[thriftserver] object HiveShim { +private[thriftserver] object HiveThriftServerShim { val version = "0.13.1" def setServerUserName(sparkServiceUGI: UserGroupInformation, sparkCliService:SparkSQLCLIService) = { setSuperField(sparkCliService, "serviceUGI", sparkServiceUGI) } - - def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { - CommandProcessorFactory.get(cmd, conf) - } } private[hive] class SparkSQLDriver(val _context: HiveContext = SparkSQLEnv.hiveContext) From f7c93ae312c7e06eafbdb214bcfeedb53e280df5 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 28 Oct 2014 15:16:23 -0700 Subject: [PATCH 11/18] adding build with hive 0.13 before running tests --- dev/run-tests | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 972c8c8a21567..00d7ca362bdbc 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -133,26 +133,51 @@ CURRENT_BLOCK=$BLOCK_PYTHON_STYLE echo "" echo "=========================================================================" -echo "Building Spark" +echo "Building Spark with hive 0.12" echo "=========================================================================" CURRENT_BLOCK=$BLOCK_BUILD { # We always build with Hive because the PySpark Spark SQL tests need it. - BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" + BUILD_MVN_PROFILE_ARGS_12="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" - echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS" + echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS_12" # NOTE: echo "q" is needed because sbt on encountering a build file with failure #+ (either resolution or compilation) prompts the user for input either q, r, etc #+ to quit or retry. This echo is there to make it not block. - # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a + # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS_12 or else it will be interpreted as a #+ single argument! # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? echo -e "q\n" \ - | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean package assembly/assembly \ + | sbt/sbt $BUILD_MVN_PROFILE_ARGS_12 clean package assembly/assembly \ + | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" +} + +echo "" +echo "=========================================================================" +echo "Building Spark with hive 0.13" +echo "=========================================================================" + +CURRENT_BLOCK=$BLOCK_BUILD + +{ + # We always build with Hive because the PySpark Spark SQL tests need it. + BUILD_MVN_PROFILE_ARGS_13="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.13.1" + + echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS_13" + + # NOTE: echo "q" is needed because sbt on encountering a build file with failure + #+ (either resolution or compilation) prompts the user for input either q, r, etc + #+ to quit or retry. This echo is there to make it not block. + # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS_13 or else it will be interpreted as a + #+ single argument! + # QUESTION: Why doesn't 'yes "q"' work? + # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? + echo -e "q\n" \ + | sbt/sbt $BUILD_MVN_PROFILE_ARGS_13 clean package assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" } From 0d7f6cf958cb293051719c60020d26a552063d31 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 28 Oct 2014 15:44:39 -0700 Subject: [PATCH 12/18] address comments --- dev/run-tests | 35 ++++++++--------------------------- 1 file changed, 8 insertions(+), 27 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 00d7ca362bdbc..4e9315b79c37e 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -133,51 +133,32 @@ CURRENT_BLOCK=$BLOCK_PYTHON_STYLE echo "" echo "=========================================================================" -echo "Building Spark with hive 0.12" +echo "Building Spark" echo "=========================================================================" CURRENT_BLOCK=$BLOCK_BUILD { # We always build with Hive because the PySpark Spark SQL tests need it. - BUILD_MVN_PROFILE_ARGS_12="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" + BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" - echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS_12" + echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS" # NOTE: echo "q" is needed because sbt on encountering a build file with failure #+ (either resolution or compilation) prompts the user for input either q, r, etc #+ to quit or retry. This echo is there to make it not block. - # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS_12 or else it will be interpreted as a + # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a #+ single argument! # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? + # First build with 0.12 to ensure patches do not break the hive 12 build echo -e "q\n" \ - | sbt/sbt $BUILD_MVN_PROFILE_ARGS_12 clean package assembly/assembly \ + | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thiftserver/compile \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" -} - -echo "" -echo "=========================================================================" -echo "Building Spark with hive 0.13" -echo "=========================================================================" -CURRENT_BLOCK=$BLOCK_BUILD - -{ - # We always build with Hive because the PySpark Spark SQL tests need it. - BUILD_MVN_PROFILE_ARGS_13="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.13.1" - - echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS_13" - - # NOTE: echo "q" is needed because sbt on encountering a build file with failure - #+ (either resolution or compilation) prompts the user for input either q, r, etc - #+ to quit or retry. This echo is there to make it not block. - # NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS_13 or else it will be interpreted as a - #+ single argument! - # QUESTION: Why doesn't 'yes "q"' work? - # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? + # Then build with default version(0.13.1) because tests based on this version echo -e "q\n" \ - | sbt/sbt $BUILD_MVN_PROFILE_ARGS_13 clean package assembly/assembly \ + | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive clean hive/compile hive-thiftserver/compile \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" } From 8a4daf2a3d9a1529eb8f6598bbb619e6a0283002 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 28 Oct 2014 15:59:26 -0700 Subject: [PATCH 13/18] minor fix --- dev/run-tests | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 4e9315b79c37e..4e6dda1d24b65 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -153,12 +153,12 @@ CURRENT_BLOCK=$BLOCK_BUILD # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? # First build with 0.12 to ensure patches do not break the hive 12 build echo -e "q\n" \ - | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thiftserver/compile \ + | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" # Then build with default version(0.13.1) because tests based on this version echo -e "q\n" \ - | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive clean hive/compile hive-thiftserver/compile \ + | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive clean assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" } From fa21d090c8f9ac64b9c2f60c7b3f088760b1f6fc Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 28 Oct 2014 18:25:24 -0700 Subject: [PATCH 14/18] clean package assembly/assembly --- dev/run-tests | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index 4e6dda1d24b65..45e55c529f439 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -142,7 +142,6 @@ CURRENT_BLOCK=$BLOCK_BUILD # We always build with Hive because the PySpark Spark SQL tests need it. BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" - echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS" # NOTE: echo "q" is needed because sbt on encountering a build file with failure #+ (either resolution or compilation) prompts the user for input either q, r, etc @@ -152,13 +151,15 @@ CURRENT_BLOCK=$BLOCK_BUILD # QUESTION: Why doesn't 'yes "q"' work? # QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work? # First build with 0.12 to ensure patches do not break the hive 12 build + echo "[info] Compile with hive 0.12" echo -e "q\n" \ | sbt/sbt $BUILD_MVN_PROFILE_ARGS clean hive/compile hive-thriftserver/compile \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" - # Then build with default version(0.13.1) because tests based on this version + # Then build with default version(0.13.1) because tests are based on this version + echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive" echo -e "q\n" \ - | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive clean assembly/assembly \ + | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive clean package assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" } From 18fb1fff1c2a097604b573fffba92b9a7a3f3e8f Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 29 Oct 2014 18:34:36 -0700 Subject: [PATCH 15/18] exclude kryo in hive pom --- sql/hive/pom.xml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index db01363b4d629..67e36a951e506 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -65,6 +65,10 @@ commons-logging commons-logging + + com.esotericsoftware.kryo + kryo + From 578234d398ecd40c3ba024752f04bcf98a78710f Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 30 Oct 2014 13:37:05 -0700 Subject: [PATCH 16/18] use new shaded hive --- pom.xml | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 96a945fe96fb5..6840af0a424dc 100644 --- a/pom.xml +++ b/pom.xml @@ -128,7 +128,7 @@ 1.4.0 3.4.5 - 0.13.1 + 0.13.1a 0.13.1 10.10.1.1 @@ -239,6 +239,18 @@ false + + + spark-staging-hive13 + Spring Staging Repository Hive 13 + https://oss.sonatype.org/content/repositories/orgspark-project-1089/ + + true + + + false + + From f5cac74bfcf683cfe9129ea7917fa44cc023c277 Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 30 Oct 2014 17:30:12 -0700 Subject: [PATCH 17/18] remove local hivecontext test --- python/pyspark/sql.py | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 7daf306f68479..23e418af41faa 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -1379,33 +1379,6 @@ def hql(self, hqlQuery): class LocalHiveContext(HiveContext): - """Starts up an instance of hive where metadata is stored locally. - - An in-process metadata data is created with data stored in ./metadata. - Warehouse data is stored in in ./warehouse. - - >>> import os - >>> hiveCtx = LocalHiveContext(sc) - >>> try: - ... supress = hiveCtx.sql("DROP TABLE src") - ... except Exception: - ... pass - >>> kv1 = os.path.join(os.environ["SPARK_HOME"], - ... 'examples/src/main/resources/kv1.txt') - >>> supress = hiveCtx.sql( - ... "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") - >>> supress = hiveCtx.sql("LOAD DATA LOCAL INPATH '%s' INTO TABLE src" - ... % kv1) - >>> results = hiveCtx.sql("FROM src SELECT value" - ... ).map(lambda r: int(r.value.split('_')[1])) - >>> num = results.count() - >>> reduce_sum = results.reduce(lambda x, y: x + y) - >>> num - 500 - >>> reduce_sum - 130091 - """ - def __init__(self, sparkContext, sqlContext=None): HiveContext.__init__(self, sparkContext, sqlContext) warnings.warn("LocalHiveContext is deprecated. " From f26f3beca0fdc522a04b4491cf7fa391edbce445 Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 30 Oct 2014 23:38:23 -0700 Subject: [PATCH 18/18] remove clean to save time --- dev/run-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/run-tests b/dev/run-tests index 45e55c529f439..0e9eefa76a18b 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -159,7 +159,7 @@ CURRENT_BLOCK=$BLOCK_BUILD # Then build with default version(0.13.1) because tests are based on this version echo "[info] Building Spark with these arguments: $SBT_MAVEN_PROFILES_ARGS -Phive" echo -e "q\n" \ - | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive clean package assembly/assembly \ + | sbt/sbt $SBT_MAVEN_PROFILES_ARGS -Phive package assembly/assembly \ | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including" }