From 7d5fce2f1dad4a4f42c21b82e8feabf3dbe50903 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Fri, 8 Aug 2014 10:47:18 -0700 Subject: [PATCH 01/19] test --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4ab027bad55c0..796619ed72b70 100644 --- a/pom.xml +++ b/pom.xml @@ -1,4 +1,4 @@ - +test org.apache.avro @@ -109,6 +73,74 @@ + + hive-default + + + !hive.version + + + + + com.twitter + parquet-hive-bundle + 1.5.0 + + + org.spark-project.hive + hive-metastore + ${hive.version} + + + org.spark-project.hive + hive-exec + ${hive.version} + + + commons-logging + commons-logging + + + + + org.spark-project.hive + hive-serde + ${hive.version} + + + commons-logging + commons-logging + + + commons-logging + commons-logging-api + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-default-sources + generate-sources + + add-source + + + + v0.12/src/main/scala + + + + + + + + hive @@ -135,6 +167,82 @@ + + hive-0.13 + + + hive.version + 0.13.1 + + + + + org.apache.hive + hive-metastore + ${hive.version} + + + org.apache.hive + hive-exec + ${hive.version} + + + commons-logging + commons-logging + + + + + org.apache.hive + hive-serde + ${hive.version} + + + commons-logging + commons-logging + + + commons-logging + commons-logging-api + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-v13-sources + generate-sources + + add-source + + + + v0.13/src/main/scala + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index ab7862f4f9e06..cac5b1b4ee190 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -24,11 +24,12 @@ import java.util.Date import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} -import org.apache.hadoop.hive.ql.plan.FileSinkDesc import org.apache.hadoop.mapred._ import org.apache.hadoop.io.Writable import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} +import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.hive.HiveShim._ /** * Internal helper class that saves an RDD using a Hive OutputFormat. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d9b2bc7348ad2..717cdafac1e15 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.Table import org.apache.hadoop.hive.ql.processors._ import org.apache.hadoop.hive.ql.session.SessionState -import org.apache.hadoop.hive.ql.stats.StatsSetupConst import org.apache.hadoop.hive.serde2.io.TimestampWritable import org.apache.spark.SparkContext @@ -46,6 +45,8 @@ import org.apache.spark.sql.execution.ExtractPythonUdfs import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.execution.{Command => PhysicalCommand} import org.apache.spark.sql.hive.execution.DescribeHiveTableCommand +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.hive.HiveShim._ /** * DEPRECATED: Use HiveContext instead. @@ -170,13 +171,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val tableParameters = relation.hiveQlTable.getParameters val oldTotalSize = - Option(tableParameters.get(StatsSetupConst.TOTAL_SIZE)).map(_.toLong).getOrElse(0L) + Option(tableParameters.get(HiveShim.getStatsSetupConstTotalSize)). + map(_.toLong).getOrElse(0L) val newTotalSize = getFileSizeForTable(hiveconf, relation.hiveQlTable) // Update the Hive metastore if the total size of the table is different than the size // recorded in the Hive metastore. // This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats(). if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - tableParameters.put(StatsSetupConst.TOTAL_SIZE, newTotalSize.toString) + tableParameters.put(HiveShim.getStatsSetupConstTotalSize, newTotalSize.toString) val hiveTTable = relation.hiveQlTable.getTTable hiveTTable.setParameters(tableParameters) val tableFullName = @@ -286,24 +288,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim() - val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hiveconf) - - SessionState.start(sessionState) + val proc: CommandProcessor = HiveShim.getCommandProcessor(Array(tokens(0)), hiveconf) proc match { case driver: Driver => - driver.init() - val results = new JArrayList[String] val response: CommandProcessorResponse = driver.run(cmd) // Throw an exception if there is an error in query processing. if (response.getResponseCode != 0) { - driver.destroy() + driver.close throw new QueryExecutionException(response.getErrorMessage) } driver.setMaxRows(maxRows) driver.getResults(results) - driver.destroy() + driver.close results case _ => sessionState.out.println(tokens(0) + " " + cmd_1) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 943bbaa8ce25e..068a8285859c1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.types._ /* Implicit conversions */ import scala.collection.JavaConversions._ +import org.apache.spark.sql.hive.HiveShim private[hive] trait HiveInspectors { @@ -137,7 +138,7 @@ private[hive] trait HiveInspectors { /** Converts native catalyst types to the types expected by Hive */ def wrap(a: Any): AnyRef = a match { - case s: String => new hadoopIo.Text(s) // TODO why should be Text? + case s: String => HiveShim.convertCatalystString2Hive(s) case i: Int => i: java.lang.Integer case b: Boolean => b: java.lang.Boolean case f: Float => f: java.lang.Float diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6571c35499ef4..1f5d64328a228 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -23,7 +23,6 @@ import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, Ser import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.ql.stats.StatsSetupConst import org.apache.hadoop.hive.serde2.Deserializer import org.apache.spark.annotation.DeveloperApi @@ -39,6 +38,8 @@ import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.util.Utils +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.hive.HiveShim._ /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -59,7 +60,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val table = client.getTable(databaseName, tblName) val partitions: Seq[Partition] = if (table.isPartitioned) { - client.getAllPartitionsForPruner(table).toSeq + client.getAllPartitionsOf(table).toSeq } else { Nil } @@ -180,6 +181,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with override def unregisterAllTables() = {} } + /** * :: DeveloperApi :: * Provides conversions between Spark SQL data types and Hive Metastore types. @@ -196,7 +198,7 @@ object HiveMetastoreTypes extends RegexParsers { "bigint" ^^^ LongType | "binary" ^^^ BinaryType | "boolean" ^^^ BooleanType | - "decimal" ^^^ DecimalType | + HiveShim.metastoreDecimal ^^^ DecimalType | "timestamp" ^^^ TimestampType | "varchar\\((\\d+)\\)".r ^^^ StringType @@ -280,13 +282,13 @@ private[hive] case class MetastoreRelation // of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`, // `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future. BigInt( - Option(hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)) + Option(hiveQlTable.getParameters.get(HiveShim.getStatsSetupConstTotalSize)) .map(_.toLong) .getOrElse(sqlContext.defaultSizeInBytes)) } ) - val tableDesc = new TableDesc( + val tableDesc = HiveShim.getTableDesc( Class.forName( hiveQlTable.getSerializationLib, true, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index a4dd6be5f9e35..a30f8c3d35188 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.conf.HiveConf /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -210,7 +212,13 @@ private[hive] object HiveQl { /** * Returns the AST for the given SQL string. */ - def getAst(sql: String): ASTNode = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql)) + def getAst(sql: String): ASTNode = { + val hContext = new Context(new HiveConf()) + val node = ParseUtils.findRootNonNullToken((new ParseDriver).parse(sql, hContext)) + hContext.clear + node + } + /** Returns a LogicalPlan for a given HiveQL string. */ def parseSql(sql: String): LogicalPlan = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 329f80cad471e..552e71777b64f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -35,6 +35,7 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions.{Attribute, Row, GenericMutableRow, Literal, Cast} import org.apache.spark.sql.catalyst.types.DataType +import org.apache.spark.sql.hive.HiveShim._ /** * A trait for subclasses that handle table scans. @@ -142,7 +143,7 @@ class HadoopTableReader( filterOpt: Option[PathFilter]): RDD[Row] = { val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) => val partDesc = Utilities.getPartitionDesc(partition) - val partPath = partition.getPartitionPath + val partPath = partition.getDataLocationPath val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) val ifc = partDesc.getInputFileFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index a013f3f7a805f..aacff83ce24c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ - +import org.apache.hadoop.hive.ql.processors._ /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -76,7 +76,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // For some hive test case which contain ${system:test.tmp.dir} System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath) - + CommandProcessorFactory.clean(hiveconf); configure() // Must be called before initializing the catalog below. /** The location of the compiled hive distribution */ @@ -297,6 +297,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { * tests. */ protected val originalUdfs: JavaSet[String] = FunctionRegistry.getFunctionNames + HiveShim.createDefaultDBIfNeeded(this) /** * Resets the test instance by deleting any tables that have been created. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala index a40e89e0d382b..7f891a5171cc6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} import org.apache.spark.sql.execution.{Command, LeafNode} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} +import org.apache.spark.sql.hive.HiveShim /** * Implementation for "describe [extended] table". @@ -49,7 +50,8 @@ case class DescribeHiveTableCommand( case (name, dataType, comment) => String.format("%-" + alignment + "s", name) + delim + String.format("%-" + alignment + "s", dataType) + delim + - String.format("%-" + alignment + "s", Option(comment).getOrElse("None")) + String.format("%-" + alignment + "s", Option(comment). + getOrElse(HiveShim.getEmptyCommentsFieldValue)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 577ca928b43b6..5e6b2ee76aa63 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.spark.sql.hive.HiveShim import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector.primitive._ @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive._ + /** * :: DeveloperApi :: * The Hive table scan operator. Column and partition pruning are both handled. @@ -71,14 +72,14 @@ case class HiveTableScan( Cast(Literal(value), dataType).eval(null) } + private def addColumnMetadataToConf(hiveConf: HiveConf) { // Specifies needed column IDs for those non-partitioning columns. val neededColumnIDs = attributes.map(a => relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0) - ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) - ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name)) // Specifies types and object inspectors of columns to be scanned. val structOI = ObjectInspectorUtils diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 39033bdeac4b0..33efb4b84744a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -21,11 +21,10 @@ import scala.collection.JavaConversions._ import java.util.{HashMap => JHashMap} -import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.common.`type`.{HiveVarchar} import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.ql.plan.{TableDesc} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -40,6 +39,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} +import org.apache.spark.sql.hive.{ShimFileSinkDesc => FileSinkDesc, ShimContext => Context} +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.hive.HiveShim._ /** * :: DeveloperApi :: @@ -76,7 +78,7 @@ case class InsertIntoHiveTable( new HiveVarchar(s, s.size) case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) => - new HiveDecimal(bd.underlying()) + HiveShim.createDecimal(bd.underlying()) case (row: Row, oi: StandardStructObjectInspector) => val struct = oi.create() @@ -176,7 +178,7 @@ case class InsertIntoHiveTable( // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) + val tmpLocation = hiveContext.getExternalTmpPath(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) val rdd = childRdd.mapPartitions { iter => val serializer = newSerializer(fileSinkConf.getTableInfo) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 7d1ad53d8bdb3..537e21eac7b0f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -18,8 +18,6 @@ package org.apache.spark.sql.hive import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.hive.common.`type`.HiveDecimal import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry} import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} @@ -33,6 +31,7 @@ import org.apache.spark.util.Utils.getContextOrSparkClassLoader /* Implicit conversions */ import scala.collection.JavaConversions._ +import org.apache.spark.sql.hive.HiveShim private[hive] abstract class HiveFunctionRegistry extends analysis.FunctionRegistry with HiveInspectors { @@ -110,7 +109,8 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[ val primitiveClasses = Seq( Integer.TYPE, classOf[java.lang.Integer], classOf[java.lang.String], java.lang.Double.TYPE, classOf[java.lang.Double], java.lang.Long.TYPE, classOf[java.lang.Long], - classOf[HiveDecimal], java.lang.Byte.TYPE, classOf[java.lang.Byte], + classOf[org.apache.hadoop.hive.common.`type`.HiveDecimal], + java.lang.Byte.TYPE, classOf[java.lang.Byte], classOf[java.sql.Timestamp] ) val matchingConstructor = argClass.getConstructors.find { c => @@ -128,7 +128,7 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[ } else { constructor.newInstance(a match { case i: Int => i: java.lang.Integer - case bd: BigDecimal => new HiveDecimal(bd.underlying()) + case bd: BigDecimal => HiveShim.createDecimal(bd.underlying()) case other: AnyRef => other }).asInstanceOf[AnyRef] } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index a35c40efdc207..0bc1ffa7c26a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.HiveShim class StatisticsSuite extends QueryTest with BeforeAndAfterAll { TestHive.reset() @@ -79,9 +80,9 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect() sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() - - assert(queryTotalSize("analyzeTable") === defaultSizeInBytes) - + if (HiveShim.version.equals("0.12.0")) { + assert(queryTotalSize("analyzeTable") === defaultSizeInBytes) + } sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan") assert(queryTotalSize("analyzeTable") === BigInt(11624)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index c4abb3eb4861f..ebfd754f8666e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.{Row, SchemaRDD} +import org.apache.spark.sql.hive.HiveShim case class TestData(a: Int, b: String) @@ -451,14 +452,14 @@ class HiveQuerySuite extends HiveComparisonTest { // Describe a partition is a native command assertResult( Array( - Array("key", "int", "None"), - Array("value", "string", "None"), - Array("dt", "string", "None"), + Array("key", "int", HiveShim.getEmptyCommentsFieldValue), + Array("value", "string", HiveShim.getEmptyCommentsFieldValue), + Array("dt", "string", HiveShim.getEmptyCommentsFieldValue), Array("", "", ""), Array("# Partition Information", "", ""), Array("# col_name", "data_type", "comment"), Array("", "", ""), - Array("dt", "string", "None")) + Array("dt", "string", HiveShim.getEmptyCommentsFieldValue)) ) { sql("DESCRIBE test_describe_commands1 PARTITION (dt='2008-06-08')") .select('result) @@ -538,8 +539,8 @@ class HiveQuerySuite extends HiveComparisonTest { sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - sql(s"SET").collect().map(_.getString(0)) + assertResult(Set(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { + sql(s"SET").collect().map(_.getString(0)).toSet } // "set key" @@ -566,8 +567,8 @@ class HiveQuerySuite extends HiveComparisonTest { sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) - assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { - sql("SET").collect().map(_.getString(0)) + assertResult(Set(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) { + sql("SET").collect().map(_.getString(0)).toSet } assertResult(Array(s"$testKey=$testVal")) { diff --git a/sql/hive/v0.12/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.12/src/main/scala/org/apache/spark/sql/hive/Shim.scala new file mode 100644 index 0000000000000..5dca21bf2eca5 --- /dev/null +++ b/sql/hive/v0.12/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.util.Properties +import scala.language.implicitConversions +import org.apache.hadoop.hive.ql.metadata.Partition +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import scala.collection.JavaConversions._ +import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import java.net.URI +import org.apache.hadoop.{io => hadoopIo} +import org.apache.hadoop.hive.ql.stats.StatsSetupConst +import org.apache.hadoop.mapred.InputFormat +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.metadata.Table +import org.apache.hadoop.hive.ql.processors._ +import org.apache.hadoop.hive.conf.HiveConf + +/*hive-0.12.0 support shimmer layer*/ +object HiveShim { + val version = "0.12.0" + val metastoreDecimal = "decimal" + def getTableDesc(serdeClass: Class[_ <: Deserializer], inputFormatClass: Class[_ <: InputFormat[_, _]], outputFormatClass: Class[_], properties: Properties) = { + new TableDesc(serdeClass, inputFormatClass, outputFormatClass, properties) + } + def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE + def createDefaultDBIfNeeded(context: HiveContext) ={ } + + /*handle the difference in "None" and empty ""*/ + def getEmptyCommentsFieldValue = "None" + + def convertCatalystString2Hive(s: String) = new hadoopIo.Text(s) + + def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { + CommandProcessorFactory.get(cmd(0), conf) + } + + def createDecimal(bd: java.math.BigDecimal): HiveDecimal = { + new HiveDecimal(bd) + } + + def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) { + ColumnProjectionUtils.appendReadColumnIDs(conf, ids) + ColumnProjectionUtils.appendReadColumnNames(conf, names) + } + + implicit class wrapperToPartition(p: Partition) { + def getDataLocationPath: Path = p.getPartitionPath + } + implicit class wrapperToHive(client: Hive) { + def getAllPartitionsOf(tbl: Table) = { + client.getAllPartitionsForPruner(tbl) + } + } +} + +class ShimContext(conf: Configuration) extends Context(conf) { + def getExternalTmpPath(uri: URI): String = { + super.getExternalTmpFileURI(uri) + } +} + +class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean) + extends FileSinkDesc(dir, tableInfo, compressed) { +} diff --git a/sql/hive/v0.13/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13/src/main/scala/org/apache/spark/sql/hive/Shim.scala new file mode 100644 index 0000000000000..2834184cbd1c1 --- /dev/null +++ b/sql/hive/v0.13/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory +import scala.language.implicitConversions +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.common.`type`.{HiveDecimal} +import scala.collection.JavaConversions._ +import org.apache.spark.Logging +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.metadata.Partition +import org.apache.hadoop.{io => hadoopIo} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.mapred.InputFormat +import java.util.Properties +import org.apache.hadoop.hive.serde2.Deserializer + +/*hive-0.13.1 support shimmer layer*/ +object HiveShim { + val version = "0.13.1" + /* + * hive-0.13 support DECIMAL(precision, scale), DECIMAL in hive-0.12 is actually DECIMAL(10,0) + * Full support of new decimal feature need to be fixed in seperate PR. + */ + val metastoreDecimal = "decimal(10,0)" + def getTableDesc(serdeClass: Class[_ <: Deserializer], inputFormatClass: Class[_ <: InputFormat[_, _]], outputFormatClass: Class[_], properties: Properties) = { + new TableDesc(inputFormatClass, outputFormatClass, properties) + } + def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE + def createDefaultDBIfNeeded(context: HiveContext) ={ + context.runSqlHive("CREATE DATABASE default") + context.runSqlHive("USE default") + } + /*handle the difference in HiveQuerySuite*/ + def getEmptyCommentsFieldValue = "" + + def convertCatalystString2Hive(s: String) = s + + def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { + CommandProcessorFactory.get(cmd, conf) + } + + def createDecimal(bd: java.math.BigDecimal): HiveDecimal = { + HiveDecimal.create(bd) + } + + /* + * This function in hive-0.13 become private, but we have to do this to walkaround hive bug + * */ + private def appendReadColumnNames(conf: Configuration, cols: Seq[String]) { + val old: String = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "") + val result: StringBuilder = new StringBuilder(old) + var first: Boolean = old.isEmpty + + for (col <- cols) { + if (first) { + first = false + } + else { + result.append(',') + } + result.append(col) + } + conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, result.toString) + } + + /* + * Cannot use ColumnProjectionUtils.appendReadColumns directly, if ids is null or empty + * */ + def appendReadColumns(conf: Configuration, ids: Seq[Integer], names: Seq[String]) { + if (ids != null && ids.size > 0) { + ColumnProjectionUtils.appendReadColumns(conf, ids) + } else { + appendReadColumnNames(conf, names) + } + } + + /* + * Bug introdiced in hive-0.13. FileSinkDesc is serilizable, but its member path is not. + * Fix it through wrapper. + * */ + implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = { + var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed) + f.setCompressed(w.compressed) + f.setCompressCodec(w.compressCodec) + f.setCompressType(w.compressType) + f.setTableInfo(w.tableInfo) + f + } + + implicit class wrapperToPartition(p: Partition) { + def getDataLocationPath: Path = p.getDataLocation + } +} + +class ShimContext(conf: Configuration) extends Context(conf) { + def getExternalTmpPath(path: Path): Path = { + super.getExternalTmpPath (path.toUri) + } +} + +class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean) + extends Serializable with Logging { + var compressCodec: String = _ + var compressType: String = _ + var destTableId: Int = _ + + def setCompressed(compressed: Boolean) { + this.compressed = compressed + } + def getDirName = dir + def setDestTableId(destTableId: Int) { + this.destTableId = destTableId + } + + def setTableInfo(tableInfo: TableDesc) { + this.tableInfo = tableInfo + } + + def setCompressCodec(intermediateCompressorCodec: String) { + compressCodec = intermediateCompressorCodec + } + + def setCompressType(intermediateCompressType: String) { + compressType = intermediateCompressType + } +} From 5f5619f153be2af9f24e15164a8c0aae9a094b4b Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 10 Sep 2014 13:18:20 -0700 Subject: [PATCH 08/19] restructure the directory and different hive version support --- assembly/pom.xml | 3 +-- pom.xml | 4 +--- sql/hive/pom.xml | 7 +++---- .../src/main/scala/org/apache/spark/sql/hive/Shim.scala | 0 .../src/main/scala/org/apache/spark/sql/hive/Shim.scala | 0 5 files changed, 5 insertions(+), 9 deletions(-) rename sql/hive/{v0.12 => v0.12.0}/src/main/scala/org/apache/spark/sql/hive/Shim.scala (100%) rename sql/hive/{v0.13 => v0.13.1}/src/main/scala/org/apache/spark/sql/hive/Shim.scala (100%) diff --git a/assembly/pom.xml b/assembly/pom.xml index cb66a14a48d02..a1a2048e5f003 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -189,11 +189,10 @@ - hive-0.13 + hive-versions hive.version - 0.13.1 diff --git a/pom.xml b/pom.xml index 94302f97b7951..d4650aff7b364 100644 --- a/pom.xml +++ b/pom.xml @@ -1246,17 +1246,15 @@ - hive-0.13 + hive-versions hive.version - 0.13.1 10.10.1.1 - diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 67f0f46507070..35cccb9e5803d 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -132,7 +132,7 @@ - v0.12/src/main/scala + v${hive.version}/src/main/scala @@ -168,11 +168,10 @@ - hive-0.13 + hive-versions hive.version - 0.13.1 @@ -222,7 +221,7 @@ - v0.13/src/main/scala + v${hive.version}/src/main/scala diff --git a/sql/hive/v0.12/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala similarity index 100% rename from sql/hive/v0.12/src/main/scala/org/apache/spark/sql/hive/Shim.scala rename to sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala diff --git a/sql/hive/v0.13/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala similarity index 100% rename from sql/hive/v0.13/src/main/scala/org/apache/spark/sql/hive/Shim.scala rename to sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala From 9412d2439f94105eb8843d7a31bd4fc942cad477 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 17 Sep 2014 09:24:01 -0700 Subject: [PATCH 09/19] address review comments --- .../src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala | 2 +- .../v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index 537e21eac7b0f..8c1e39b76161c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -28,10 +28,10 @@ import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.util.Utils.getContextOrSparkClassLoader +import org.apache.spark.sql.hive.HiveShim /* Implicit conversions */ import scala.collection.JavaConversions._ -import org.apache.spark.sql.hive.HiveShim private[hive] abstract class HiveFunctionRegistry extends analysis.FunctionRegistry with HiveInspectors { diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala index 5dca21bf2eca5..b23d1f027e2e8 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -49,7 +49,7 @@ object HiveShim { /*handle the difference in "None" and empty ""*/ def getEmptyCommentsFieldValue = "None" - def convertCatalystString2Hive(s: String) = new hadoopIo.Text(s) + def convertCatalystString2Hive(s: String) = new hadoopIo.Text(s) // TODO why should be Text? def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { CommandProcessorFactory.get(cmd(0), conf) From 3ced0d7cc6bdc37825755756184636426d62915c Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 1 Oct 2014 15:32:54 -0700 Subject: [PATCH 10/19] rebase --- pom.xml | 12 ++++++++++++ sql/hive/pom.xml | 6 +++--- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 4d3c3db4efb55..fd19dba90a2c3 100644 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,18 @@ + + spark-hive + + Spark hive Repository + https://oss.sonatype.org/content/repositories/orgspark-project-1079/ + + true + + + false + + central diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index d788f0f2ac72f..0e51e1c2f474c 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -185,12 +185,12 @@ - org.apache.hive + org.spark-project.hive hive-metastore ${hive.version} - org.apache.hive + org.spark-project.hive hive-exec ${hive.version} @@ -201,7 +201,7 @@ - org.apache.hive + org.spark-project.hive hive-serde ${hive.version} From 6bc9204fe2c1b1af12cb3f22fa170b41ea1b86f6 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 1 Oct 2014 21:01:05 -0700 Subject: [PATCH 11/19] rebase and remove temparory repo --- pom.xml | 12 ------------ sql/hive/pom.xml | 6 +++--- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/pom.xml b/pom.xml index fd19dba90a2c3..4d3c3db4efb55 100644 --- a/pom.xml +++ b/pom.xml @@ -143,18 +143,6 @@ - - spark-hive - - Spark hive Repository - https://oss.sonatype.org/content/repositories/orgspark-project-1079/ - - true - - - false - - central diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 0e51e1c2f474c..d788f0f2ac72f 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -185,12 +185,12 @@ - org.spark-project.hive + org.apache.hive hive-metastore ${hive.version} - org.spark-project.hive + org.apache.hive hive-exec ${hive.version} @@ -201,7 +201,7 @@ - org.spark-project.hive + org.apache.hive hive-serde ${hive.version} From 10c35655327dfccb0a150875c8ed2dd21e63a130 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Sun, 5 Oct 2014 00:48:39 -0700 Subject: [PATCH 12/19] address review comments --- .../org/apache/spark/sql/hive/HiveQl.scala | 2 +- .../org/apache/spark/sql/hive/TestHive.scala | 4 ++- .../spark/sql/hive/StatisticsSuite.scala | 2 +- .../sql/hive/execution/HiveQuerySuite.scala | 3 ++- .../org/apache/spark/sql/hive/Shim.scala | 2 +- .../org/apache/spark/sql/hive/Shim.scala | 26 +++++++++---------- 6 files changed, 21 insertions(+), 18 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 08892c428ff5f..27557cb3453fe 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -216,7 +216,7 @@ private[hive] object HiveQl { */ def getAst(sql: String): ASTNode = { /* - * Context has to be passed in in hive0.13.1. + * Context has to be passed in hive0.13.1. * Otherwise, there will be Null pointer exception, * when retrieving properties form HiveConf. */ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 6ea657f74673d..1d6f43ec224a9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -61,6 +61,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // By clearing the port we force Spark to pick a new one. This allows us to rerun tests // without restarting the JVM. System.clearProperty("spark.hostPort") + CommandProcessorFactory.clean(hiveconf) lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath @@ -79,7 +80,6 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { // For some hive test case which contain ${system:test.tmp.dir} System.setProperty("test.tmp.dir", testTempDir.getCanonicalPath) - CommandProcessorFactory.clean(hiveconf) configure() // Must be called before initializing the catalog below. /** The location of the compiled hive distribution */ @@ -371,6 +371,8 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { * tests. */ protected val originalUdfs: JavaSet[String] = FunctionRegistry.getFunctionNames + + // Database default may not exist in 0.13.1, create it if not exist HiveShim.createDefaultDBIfNeeded(this) /** diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 629725faf840e..5dfc3ae428108 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -81,7 +81,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect() - // TODO: How it works? needs to add it back for other hive version. + // TODO: How does it works? needs to add it back for other hive version. if (HiveShim.version =="0.12.0") { assert(queryTotalSize("analyzeTable") === defaultSizeInBytes) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 16ca48f9c8c22..8e797caaf3417 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -557,7 +557,8 @@ class HiveQuerySuite extends HiveComparisonTest { |WITH serdeproperties('s1'='9') """.stripMargin) } - // now only verify 0.12.0, and ignore other versions due to binary compatability + // Now only verify 0.12.0, and ignore other versions due to binary compatability + // current TestSerDe.jar is from 0.12.0 if (HiveShim.version == "0.12.0") { sql(s"ADD JAR $testJar") sql( diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala index faaeacbe1f5de..2901fe9414c7c 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -69,7 +69,7 @@ private[hive] object HiveShim { ColumnProjectionUtils.appendReadColumnNames(conf, names) } - def getExternalTmpPath(context: Context, uri: URI): String = { + def getExternalTmpPath(context: Context, uri: URI) = { context.getExternalTmpFileURI(uri) } diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala index 25af0af34bd50..d4ca9562f21a4 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -40,10 +40,10 @@ import scala.language.implicitConversions private[hive] object HiveShim { val version = "0.13.1" /* - * TODO: hive-0.13 support DECIMAL(precision, scale), DECIMAL in hive-0.12 is actually DECIMAL(10,0) + * TODO: hive-0.13 support DECIMAL(precision, scale), DECIMAL in hive-0.12 is actually DECIMAL(38,unbounded) * Full support of new decimal feature need to be fixed in seperate PR. */ - val metastoreDecimal = "decimal(10,0)" + val metastoreDecimal = "decimal\\((\\d+),(\\d+)\\)".r def getTableDesc( serdeClass: Class[_ <: Deserializer], @@ -82,8 +82,7 @@ private[hive] object HiveShim { for (col <- cols) { if (first) { first = false - } - else { + } else { result.append(',') } result.append(col) @@ -98,7 +97,9 @@ private[hive] object HiveShim { if (ids != null && ids.size > 0) { ColumnProjectionUtils.appendReadColumns(conf, ids) } - appendReadColumnNames(conf, names) + if (names == null && names.size > 0) { + appendReadColumnNames(conf, names) + } } def getExternalTmpPath(context: Context, path: Path) = { @@ -110,17 +111,16 @@ private[hive] object HiveShim { def getAllPartitionsOf(client: Hive, tbl: Table) = client.getAllPartitionsOf(tbl) /* - * Bug introdiced in hive-0.13. FileSinkDesc is serilizable, but its member path is not. + * Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not. * Fix it through wrapper. * */ implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = { - var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed) - f.setCompressed(w.compressed) - f.setCompressCodec(w.compressCodec) - f.setCompressType(w.compressType) - f.setTableInfo(w.tableInfo) - f.setDestTableId(w.destTableId) - f + var f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed) + f.setCompressCodec(w.compressCodec) + f.setCompressType(w.compressType) + f.setTableInfo(w.tableInfo) + f.setDestTableId(w.destTableId) + f } } From 20f6cf7507f608930bef36112b577672c81e44f6 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Mon, 6 Oct 2014 20:50:48 -0700 Subject: [PATCH 13/19] solve compatability issue --- .../scala/org/apache/spark/sql/hive/HiveContext.scala | 9 +++++++-- .../src/main/scala/org/apache/spark/sql/hive/Shim.scala | 7 +++++-- .../src/main/scala/org/apache/spark/sql/hive/Shim.scala | 7 +++++-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 2febe5c0379e9..30bd25a088f14 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -293,7 +293,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { proc match { case driver: Driver => - val results = new JArrayList[String] + val results = HiveShim.createDriverResultsArray val response: CommandProcessorResponse = driver.run(cmd) // Throw an exception if there is an error in query processing. if (response.getResponseCode != 0) { @@ -303,7 +303,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { driver.setMaxRows(maxRows) driver.getResults(results) driver.close() - results + results.map { r => + r match { + case s: String => s + case o => o.toString + } + } case _ => sessionState.out.println(tokens(0) + " " + cmd_1) Seq(proc.run(cmd_1).getResponseCode.toString) diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala index 2901fe9414c7c..c9e9ff04916af 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.net.URI +import java.util.{ArrayList => JArrayList} import java.util.Properties import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -49,14 +50,16 @@ private[hive] object HiveShim { new TableDesc(serdeClass, inputFormatClass, outputFormatClass, properties) } + def createDriverResultsArray = new JArrayList[String] + def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE - def createDefaultDBIfNeeded(context: HiveContext) ={ } + def createDefaultDBIfNeeded(context: HiveContext) = { } /** The string used to denote an empty comments field in the schema. */ def getEmptyCommentsFieldValue = "None" - def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { + def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { CommandProcessorFactory.get(cmd(0), conf) } diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala index 3ab6eeabaa2ca..62a1f2aa7d744 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive +import java.util.{ArrayList => JArrayList} import java.util.Properties import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -53,9 +54,11 @@ private[hive] object HiveShim { new TableDesc(inputFormatClass, outputFormatClass, properties) } + def createDriverResultsArray = new JArrayList[Object] + def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE - def createDefaultDBIfNeeded(context: HiveContext) ={ + def createDefaultDBIfNeeded(context: HiveContext) = { context.runSqlHive("CREATE DATABASE default") context.runSqlHive("USE default") } @@ -63,7 +66,7 @@ private[hive] object HiveShim { /* The string used to denote an empty comments field in the schema. */ def getEmptyCommentsFieldValue = "" - def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { + def getCommandProcessor(cmd: Array[String], conf: HiveConf) = { CommandProcessorFactory.get(cmd, conf) } From cb228638d4ee35eb3fc1cc184c9c14498286eee2 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 9 Oct 2014 11:59:30 -0700 Subject: [PATCH 14/19] correct the typo --- .../v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala index 62a1f2aa7d744..d8a34e07820ca 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -100,7 +100,7 @@ private[hive] object HiveShim { if (ids != null && ids.size > 0) { ColumnProjectionUtils.appendReadColumns(conf, ids) } - if (names == null && names.size > 0) { + if (names != null && names.size > 0) { appendReadColumnNames(conf, names) } } From b0478c093c48fe608b8011fdaf76648f9f67ecda Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 12 Oct 2014 23:27:07 -0700 Subject: [PATCH 15/19] Changes to simplify the build of SPARK-2706 This simply uses two profiles and defines the source location based on the profiles. It also removes a bunch of other excludes. Finally, this doesn't use dynamic profile activiation which could interfere with third-party libraries that try to link against the spark-hive artifact. --- assembly/pom.xml | 6 ++ docs/_config.yml | 2 - docs/building-spark.md | 14 ++- pom.xml | 33 ++++---- sql/hive/pom.xml | 187 +++++++++++------------------------------ 5 files changed, 82 insertions(+), 160 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 81219a37a0e6b..ccdc150e143da 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -197,6 +197,12 @@ spark-hive_${scala.binary.version} ${project.version} + + + + + hive-0.12.0 + org.apache.spark spark-hive-thriftserver_${scala.binary.version} diff --git a/docs/_config.yml b/docs/_config.yml index f4bf242ac191b..78c92281a49d5 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -1,7 +1,5 @@ highlighter: pygments markdown: kramdown -gems: - - jekyll-redirect-from # For some reason kramdown seems to behave differently on different # OS/packages wrt encoding. So we hard code this config. diff --git a/docs/building-spark.md b/docs/building-spark.md index b2940ee4029e8..2150f223f4915 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -97,12 +97,20 @@ mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -DskipTests clean package {% endhighlight %} + + # Building With Hive and JDBC Support To enable Hive integration for Spark SQL along with its JDBC server and CLI, -add the `-Phive` profile to your existing build options. +add the `-Phive` profile to your existing build options. By default Spark +will build with Hive 0.12.0 bindings. You can also build for Hive 0.13.1 using +the `-Phive-0.13.1` profile. NOTE: currently the JDBC server is only +supported for Hive 12. {% highlight bash %} -# Apache Hadoop 2.4.X with Hive support +# Apache Hadoop 2.4.X with Hive 12 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package + +# Apache Hadoop 2.4.X with Hive 13 support +mvn -Pyarn -Phive-0.13.1 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package {% endhighlight %} # Spark Tests in Maven @@ -192,4 +200,4 @@ To run test suites of a specific sub project as follows: compiler. When run locally as a background process, it speeds up builds of Scala-based projects like Spark. Developers who regularly recompile Spark with Maven will be the most interested in Zinc. The project site gives instructions for building and running `zinc`; OS X users can -install it using `brew install zinc`. \ No newline at end of file +install it using `brew install zinc`. diff --git a/pom.xml b/pom.xml index 8e5720aa8cc8c..31f456a03bb3b 100644 --- a/pom.xml +++ b/pom.xml @@ -127,6 +127,11 @@ 0.94.6 1.4.0 3.4.5 + + 0.12.0-protobuf-2.5 + + 0.12.0 + 10.4.2.0 1.4.3 1.2.3 8.1.14.v20131031 @@ -1260,34 +1265,28 @@ - hive-default - - - !hive.version - - - - 0.12.0 - 10.4.2.0 - - - - hive + hive-0.12.0 false + sql/hive-thriftserver + + 0.12.0-protobuf-2.5 + 0.12.0-protobuf-2.5 + 10.4.2.0 + - hive-versions + hive-0.13.1 - - hive.version - + false + 0.13.1 + 0.13.1 10.10.1.1 diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index d788f0f2ac72f..cdeba8accb70d 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -36,6 +36,11 @@ + + com.twitter + parquet-hive-bundle + 1.5.0 + org.apache.spark spark-core_${scala.binary.version} @@ -46,15 +51,46 @@ spark-sql_${scala.binary.version} ${project.version} + + org.spark-project.hive + hive-metastore + ${hive.version} + commons-httpclient commons-httpclient 3.1 + + org.spark-project.hive + hive-exec + ${hive.version} + + + commons-logging + commons-logging + + + org.codehaus.jackson jackson-mapper-asl + + org.spark-project.hive + hive-serde + ${hive.version} + + + commons-logging + commons-logging + + + commons-logging + commons-logging-api + + + org.apache.avro @@ -80,76 +116,7 @@ test - - - hive-default - - - !hive.version - - - - - com.twitter - parquet-hive-bundle - 1.5.0 - - - org.spark-project.hive - hive-metastore - ${hive.version} - - - org.spark-project.hive - hive-exec - ${hive.version} - - - commons-logging - commons-logging - - - - - org.spark-project.hive - hive-serde - ${hive.version} - - - commons-logging - commons-logging - - - commons-logging - commons-logging-api - - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-default-sources - generate-sources - - add-source - - - - v${hive.version}/src/main/scala - - - - - - - - hive @@ -176,91 +143,35 @@ - - hive-versions - - - hive.version - - - - - org.apache.hive - hive-metastore - ${hive.version} - - - org.apache.hive - hive-exec - ${hive.version} - - - commons-logging - commons-logging - - - - - org.apache.hive - hive-serde - ${hive.version} - - - commons-logging - commons-logging - - - commons-logging - commons-logging-api - - - - - - + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + org.scalatest + scalatest-maven-plugin + + org.codehaus.mojo build-helper-maven-plugin - add-v13-sources + add-default-sources generate-sources add-source - v${hive.version}/src/main/scala - - - - - add-scala-test-sources - generate-test-sources - - add-test-source - - - - src/test/scala + v${hive.version.short}/src/main/scala - - - - - - - target/scala-${scala.binary.version}/classes - target/scala-${scala.binary.version}/test-classes - - - org.scalatest - scalatest-maven-plugin - From 8fad1cf35f891cdf5cbc42a3425f240366d94098 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Mon, 13 Oct 2014 13:50:53 -0700 Subject: [PATCH 16/19] change the pom file and make hive-0.13.1 as the default --- assembly/pom.xml | 15 --------------- docs/building-spark.md | 24 ++++++++++++------------ pom.xml | 8 ++++---- sql/hive/pom.xml | 18 +++++++++++++----- 4 files changed, 29 insertions(+), 36 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index ccdc150e143da..bfef95b8deb95 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -210,21 +210,6 @@ - - hive-versions - - - hive.version - - - - - org.apache.spark - spark-hive_${scala.binary.version} - ${project.version} - - - spark-ganglia-lgpl diff --git a/docs/building-spark.md b/docs/building-spark.md index 2150f223f4915..11fd56c145c01 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -102,15 +102,15 @@ mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -Dski # Building With Hive and JDBC Support To enable Hive integration for Spark SQL along with its JDBC server and CLI, add the `-Phive` profile to your existing build options. By default Spark -will build with Hive 0.12.0 bindings. You can also build for Hive 0.13.1 using -the `-Phive-0.13.1` profile. NOTE: currently the JDBC server is only -supported for Hive 12. +will build with Hive 0.13.1 bindings. You can also build for Hive 0.12.0 using +the `-Phive-0.12.0` profile. NOTE: currently the JDBC server is only +supported for Hive 0.12.0. {% highlight bash %} -# Apache Hadoop 2.4.X with Hive 12 support +# Apache Hadoop 2.4.X with Hive 13 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package -# Apache Hadoop 2.4.X with Hive 13 support -mvn -Pyarn -Phive-0.13.1 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package +# Apache Hadoop 2.4.X with Hive 12 support +mvn -Pyarn -Phive-0.12.0 -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package {% endhighlight %} # Spark Tests in Maven @@ -119,8 +119,8 @@ Tests are run by default via the [ScalaTest Maven plugin](http://www.scalatest.o Some of the tests require Spark to be packaged first, so always run `mvn package` with `-DskipTests` the first time. The following is an example of a correct (build, test) sequence: - mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive clean package - mvn -Pyarn -Phadoop-2.3 -Phive test + mvn -Pyarn -Phadoop-2.3 -DskipTests -Phive -Phive-0.12.0 clean package + mvn -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 test The ScalaTest plugin also supports running only a specific test suite as follows: @@ -183,16 +183,16 @@ can be set to control the SBT build. For example: Some of the tests require Spark to be packaged first, so always run `sbt/sbt assembly` the first time. The following is an example of a correct (build, test) sequence: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive assembly - sbt/sbt -Pyarn -Phadoop-2.3 -Phive test + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 assembly + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 test To run only a specific test suite as follows: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive "test-only org.apache.spark.repl.ReplSuite" + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 "test-only org.apache.spark.repl.ReplSuite" To run test suites of a specific sub project as follows: - sbt/sbt -Pyarn -Phadoop-2.3 -Phive core/test + sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-0.12.0 core/test # Speeding up Compilation with Zinc diff --git a/pom.xml b/pom.xml index 6318a1ba0d933..efab5823a89ea 100644 --- a/pom.xml +++ b/pom.xml @@ -128,10 +128,10 @@ 1.4.0 3.4.5 - 0.12.0-protobuf-2.5 + 0.13.1 - 0.12.0 - 10.4.2.0 + 0.13.1 + 10.10.1.1 1.4.3 1.2.3 8.1.14.v20131031 @@ -1287,7 +1287,7 @@ 0.12.0-protobuf-2.5 - 0.12.0-protobuf-2.5 + 0.12.0 10.4.2.0 diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index cdeba8accb70d..aa67951d28423 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -36,11 +36,6 @@ - - com.twitter - parquet-hive-bundle - 1.5.0 - org.apache.spark spark-core_${scala.binary.version} @@ -143,6 +138,19 @@ + + hive-0.12.0 + + false + + + + com.twitter + parquet-hive-bundle + 1.5.0 + + + From cbb46911f881a0704fa16a36f2a362a930db6ade Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Mon, 13 Oct 2014 22:35:44 -0700 Subject: [PATCH 17/19] change run-test for new options --- dev/run-tests | 4 ++-- sql/hive/pom.xml | 37 ++++++++++++++++++------------------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/dev/run-tests b/dev/run-tests index f47fcf66ff7e7..7d06c86eb4b41 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -140,7 +140,7 @@ CURRENT_BLOCK=$BLOCK_BUILD { # We always build with Hive because the PySpark Spark SQL tests need it. - BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" + BUILD_MVN_PROFILE_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" echo "[info] Building Spark with these arguments: $BUILD_MVN_PROFILE_ARGS" @@ -167,7 +167,7 @@ CURRENT_BLOCK=$BLOCK_SPARK_UNIT_TESTS # If the Spark SQL tests are enabled, run the tests with the Hive profiles enabled. # This must be a single argument, as it is. if [ -n "$_RUN_SQL_TESTS" ]; then - SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive" + SBT_MAVEN_PROFILES_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-0.12.0" fi if [ -n "$_SQL_TESTS_ONLY" ]; then diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index aa67951d28423..db01363b4d629 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -161,25 +161,24 @@ org.scalatest scalatest-maven-plugin - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-default-sources - generate-sources - - add-source - - - - v${hive.version.short}/src/main/scala - - - - - + + org.codehaus.mojo + build-helper-maven-plugin + + + add-default-sources + generate-sources + + add-source + + + + v${hive.version.short}/src/main/scala + + + + + From 410b668b6aaab197350b47e3a58b9635db770322 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 16 Oct 2014 10:47:08 -0700 Subject: [PATCH 18/19] solve review comments --- .../scala/org/apache/spark/sql/hive/HiveContext.scala | 7 +------ .../src/main/scala/org/apache/spark/sql/hive/Shim.scala | 2 ++ .../src/main/scala/org/apache/spark/sql/hive/Shim.scala | 9 +++++++++ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 8f963e764cdc3..8322f6da625db 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -304,12 +304,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { driver.setMaxRows(maxRows) driver.getResults(results) driver.close() - results.map { r => - r match { - case s: String => s - case o => o.toString - } - } + HiveShim.processResults(results) case _ => sessionState.out.println(tokens(0) + " " + cmd_1) Seq(proc.run(cmd_1).getResponseCode.toString) diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala index c9e9ff04916af..6dde636965afd 100644 --- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -52,6 +52,8 @@ private[hive] object HiveShim { def createDriverResultsArray = new JArrayList[String] + def processResults(results: JArrayList[String]) = results + def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE def createDefaultDBIfNeeded(context: HiveContext) = { } diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala index d8a34e07820ca..8678c0c475db4 100644 --- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala +++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim.scala @@ -56,6 +56,15 @@ private[hive] object HiveShim { def createDriverResultsArray = new JArrayList[Object] + def processResults(results: JArrayList[Object]) = { + results.map { r => + r match { + case s: String => s + case a: Array[Object] => a(0).asInstanceOf[String] + } + } + } + def getStatsSetupConstTotalSize = StatsSetupConst.TOTAL_SIZE def createDefaultDBIfNeeded(context: HiveContext) = { From 3ece9051a2b29ce20fc7557b22605e3fe8198f55 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Tue, 21 Oct 2014 00:20:09 -0700 Subject: [PATCH 19/19] minor fix --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 8322f6da625db..34ed57b001637 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -284,9 +284,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { */ protected def runHive(cmd: String, maxRows: Int = 1000): Seq[String] = { try { - // Session state must be initilized before the CommandProcessor is created . - SessionState.start(sessionState) - val cmd_trimmed: String = cmd.trim() val tokens: Array[String] = cmd_trimmed.split("\\s+") val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()