diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c0bb5af7d5c8..3ef507f8c5d1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -24,6 +24,14 @@ import java.util.concurrent.TimeUnit import java.util.regex.Pattern import scala.collection.JavaConverters._ +import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.spark.sql.catalyst.ParserDialect +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet} +import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.rules.Rule + import scala.collection.mutable.HashMap import scala.language.implicitConversions @@ -141,7 +149,7 @@ class HiveContext private[hive]( * converted to a data source table, using the data source set by spark.sql.sources.default. * The table in CTAS statement will be converted when it meets any of the following conditions: * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or - * a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml + * a Storage Handler (STORED BY), and the value of hive.default.fileformat in hive-site.xml * is either TextFile or SequenceFile. * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe * is specified (no ROW FORMAT SERDE clause). @@ -190,6 +198,12 @@ class HiveContext private[hive]( */ protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC) + /* + * Calculate table statistics in runtime if needed. + */ + protected[hive] def hiveCalculateStatsRuntime: Boolean = + getConf(HIVE_TABLE_CALCULATE_STATS_RUNTIME) + protected[hive] def hiveThriftServerSingleSession: Boolean = sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean @@ -440,7 +454,7 @@ class HiveContext private[hive]( // If users put any Spark SQL setting in the spark conf (e.g. spark-defaults.conf), // this setConf will be called in the constructor of the SQLContext. // Also, calling hiveconf will create a default session containing a HiveConf, which - // will interfer with the creation of executionHive (which is a lazy val). So, + // will interfere with the creation of executionHive (which is a lazy val). So, // we put hiveconf.set at the end of this method. hiveconf.set(key, value) } @@ -565,6 +579,31 @@ class HiveContext private[hive]( } } + @transient + override protected[sql] lazy val optimizer: Optimizer = new Optimizer { + override protected val batches = + DefaultOptimizer.batches.asInstanceOf[Seq[Batch]] ++ + Seq(Batch("Hive Table Stats", Once, new HiveTableStats)) + } + + class HiveTableStats extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan match { + case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => + // Filter out all predicates that only deal with partition keys, these are given to the + // hive table scan operator to be used for partition pruning. + val partitionKeyIds = AttributeSet(relation.partitionKeys) + val (pruningPredicates, _) = predicates.partition { predicate => + predicate.references.nonEmpty && + predicate.references.subsetOf(partitionKeyIds) + } + relation.pruningPredicates = pruningPredicates + plan + case u: UnaryNode => apply(u.child); plan + case b: BinaryNode => apply(b.left); apply(b.right); plan + case _ => plan + } + } + @transient private val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self @@ -716,7 +755,11 @@ private[hive] object HiveContext { val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async", defaultValue = Some(true), - doc = "TODO") + doc = "hive thrift server use background spark sql thread pool to execute sql queries.") + + val HIVE_TABLE_CALCULATE_STATS_RUNTIME = booleanConf("spark.sql.hive.calulcate.stats.runtime", + defaultValue = Some(false), + doc = "Calculate table statistics in runtime if needed.") /** Constructs a configuration for hive, where the metastore is located in a temp directory. */ def newTemporaryConfiguration(): Map[String, String] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f4d45714fae4..ee8d39e89bde 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,18 +17,28 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.hive.common.StatsSetupConst._ + import scala.collection.JavaConverters._ import scala.collection.mutable +import java.util + import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path, ContentSummary} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.Warehouse -import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.metastore.api.{MetaException, FieldSchema} +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.exec.{TableScanOperator, Operator, Utilities} +import org.apache.hadoop.hive.ql.hooks.ReadEntity import org.apache.hadoop.hive.ql.metadata._ -import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils +import org.apache.hadoop.hive.ql.parse.ParseContext +import org.apache.hadoop.hive.ql.plan.{PartitionDesc, OperatorDesc, MapWork, TableDesc} import org.apache.spark.Logging import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} @@ -44,6 +54,7 @@ import org.apache.spark.sql.execution.{FileRelation, datasources} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.sources._ +import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} @@ -707,9 +718,11 @@ private[hive] case class InsertIntoHiveTable( private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: HiveTable) - (@transient private val sqlContext: SQLContext) + (@transient private val sqlContext: HiveContext) extends LeafNode with MultiInstanceRelation with FileRelation { + private[hive] var pruningPredicates: Seq[Expression] = Nil + override def equals(other: Any): Boolean = other match { case relation: MetastoreRelation => databaseName == relation.databaseName && @@ -759,8 +772,6 @@ private[hive] case class MetastoreRelation @transient override lazy val statistics: Statistics = Statistics( sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) // TODO: check if this estimate is valid for tables after partition pruning. // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be // relatively cheap if parameters for the table are populated into the metastore. An @@ -771,9 +782,7 @@ private[hive] case class MetastoreRelation // When table is external,`totalSize` is always zero, which will influence join strategy // so when `totalSize` is zero, use `rawDataSize` instead // if the size is still less than zero, we use default size - Option(totalSize).map(_.toLong).filter(_ > 0) - .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sqlContext.conf.defaultSizeInBytes))) + calculateInput().filter(_ > 0).getOrElse(sqlContext.conf.defaultSizeInBytes)) } ) @@ -782,17 +791,33 @@ private[hive] case class MetastoreRelation lazy val allPartitions = table.getAllPartitions def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { + getHiveQlPartitions(predicates, bindPredicate(predicates)) + } + + def bindPredicate(predicates: Seq[Expression]): Option[Expression] = { + predicates.reduceLeftOption(org.apache.spark.sql.catalyst.expressions.And).map { pred => + require( + pred.dataType == BooleanType, + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") + BindReferences.bindReference(pred, partitionKeys) + } + } + + def getHiveQlPartitions( + predicates: Seq[Expression], + pruner: Option[Expression]): Seq[Partition] = { val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { table.getPartitions(predicates) } else { allPartitions } - rawPartitions.map { p => + prunePartitions(rawPartitions, pruner).map { p => val tPartition = new org.apache.hadoop.hive.metastore.api.Partition tPartition.setDbName(databaseName) tPartition.setTableName(tableName) tPartition.setValues(p.values.asJava) + tPartition.setParameters(p.properties.asJava) val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tPartition.setSd(sd) @@ -815,6 +840,86 @@ private[hive] case class MetastoreRelation } } + // moved from org.apache.spark.sql.hive.execution.HiveTableScan + private[hive] def prunePartitions(partitions: Seq[HivePartition], pruner: Option[Expression]) = { + pruner match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = partitionKeys.map(_.dataType) + val castedValues = for ((value, dataType) <- part.values.zip(dataTypes)) yield { + castFromString(value, dataType) + } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = InternalRow.fromSeq(castedValues) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + + private def calculateInput(): Option[Long] = { + var partitions: Seq[Partition] = Nil + if (hiveQlTable.isPartitioned) { + partitions = getHiveQlPartitions(pruningPredicates) + } + + // try with stats in table/partition properties + val fromStats = getFromStats(hiveQlTable, partitions, TOTAL_SIZE).orElse( + getFromStats(hiveQlTable, partitions, RAW_DATA_SIZE)) + if (fromStats.isDefined || !sqlContext.hiveCalculateStatsRuntime) { + return fromStats + } + + // create dummy mapwork + val dummy: MapWork = new MapWork + val alias: String = "_dummy" + val operator: Operator[_ <: OperatorDesc] = new TableScanOperator + + dummy.getAliasToWork.put(alias, operator) + val pathToAliases = dummy.getPathToAliases + val pathToPartition = dummy.getPathToPartitionInfo + if (hiveQlTable.isPartitioned) { + for (partition <- partitions) { + val partPath = getDnsPath(partition.getDataLocation, sqlContext.hiveconf).toString + pathToAliases.put(partPath, new util.ArrayList(util.Arrays.asList(alias))) + pathToPartition.put(partPath, new PartitionDesc(partition, tableDesc)) + } + } else { + val tablePath = getDnsPath(hiveQlTable.getDataLocation, sqlContext.hiveconf).toString + pathToAliases.put(tablePath, new util.ArrayList(util.Arrays.asList(alias))) + pathToPartition.put(tablePath, new PartitionDesc(tableDesc, null)) + } + // calculate summary + Some(Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null).getLength) + } + + private def getFromStats(table: Table, partitions: Seq[Partition], statKey: String): + Option[Long] = { + if (table.isPartitioned) { + var totalSize: Long = 0 + for (partition <- partitions) { + val partSize = Option(partition.getParameters.get(statKey)).map(_.toLong).filter(_ > 0) + if (partSize.isEmpty) { + return None; + } + totalSize += partSize.get + } + Some(totalSize) + } else { + Option(table.getParameters.get(statKey)).map(_.toLong).filter(_ > 0) + } + } + + private[this] def castFromString(value: String, dataType: DataType) = { + Cast(Literal(value), dataType).eval(null) + } + + private[this] def getDnsPath(path: Path, conf: Configuration): Path = { + val fs = path.getFileSystem(conf) + return new Path(fs.getUri.getScheme, fs.getUri.getAuthority, path.toUri.getPath) + } + /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { plan match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala index 9d9a55edd731..fe21970c8ba1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala @@ -42,6 +42,7 @@ private[hive] case class HiveStorageDescriptor( private[hive] case class HivePartition( values: Seq[String], + properties: Map[String, String], storage: HiveStorageDescriptor) private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 598ccdeee4ad..f6e3cc7d0e91 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -422,6 +422,7 @@ private[hive] class ClientWrapper( val apiPartition = partition.getTPartition HivePartition( values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty), + properties = Option(apiPartition.getParameters).map(_.asScala.toMap).getOrElse(Map.empty), storage = HiveStorageDescriptor( location = apiPartition.getSd.getLocation, inputFormat = apiPartition.getSd.getInputFormat, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 806d2b9b0b7d..e5657a1f3bb5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -31,7 +30,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive._ -import org.apache.spark.sql.types.{BooleanType, DataType} /** * The Hive table scan operator. Column and partition pruning are both handled. @@ -56,13 +54,7 @@ case class HiveTableScan( // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. - private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => - require( - pred.dataType == BooleanType, - s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") - - BindReferences.bindReference(pred, relation.partitionKeys) - } + private[this] val boundPruningPred = relation.bindPredicate(partitionPruningPred) // Create a local copy of hiveconf,so that scan specific modifications should not impact // other queries @@ -76,10 +68,6 @@ case class HiveTableScan( private[this] val hadoopReader = new HadoopTableReader(attributes, relation, context, hiveExtraConf) - private[this] def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType).eval(null) - } - private def addColumnMetadataToConf(hiveConf: HiveConf) { // Specifies needed column IDs for those non-partitioning columns. val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) @@ -107,33 +95,11 @@ case class HiveTableScan( hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(",")) } - /** - * Prunes partitions not involve the query plan. - * - * @param partitions All partitions of the relation. - * @return Partitions that are involved in the query plan. - */ - private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { - boundPruningPred match { - case None => partitions - case Some(shouldKeep) => partitions.filter { part => - val dataTypes = relation.partitionKeys.map(_.dataType) - val castedValues = part.getValues.asScala.zip(dataTypes) - .map { case (value, dataType) => castFromString(value, dataType) } - - // Only partitioned values are needed here, since the predicate has already been bound to - // partition key attribute references. - val row = InternalRow.fromSeq(castedValues) - shouldKeep.eval(row).asInstanceOf[Boolean] - } - } - } - protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) { hadoopReader.makeRDDForTable(relation.hiveQlTable) } else { hadoopReader.makeRDDForPartitionedTable( - prunePartitions(relation.getHiveQlPartitions(partitionPruningPred))) + relation.getHiveQlPartitions(partitionPruningPred, boundPruningPred)) } override def output: Seq[Attribute] = attributes diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index f775f1e95587..5984b05f6b1b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -103,7 +103,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { |SELECT * FROM src """.stripMargin).collect() - assert(queryTotalSize("analyzeTable_part") === hiveContext.conf.defaultSizeInBytes) + // stats for partition is populated in insert query + assert(queryTotalSize("analyzeTable_part") === BigInt(17436)) sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 210d56674541..9cc55fedd7a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -150,7 +150,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { case p @ HiveTableScan(columns, relation, _) => val columnNames = columns.map(_.name) val partValues = if (relation.table.isPartitioned) { - p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues) + relation.getHiveQlPartitions(p.partitionPruningPred).map(_.getValues.asScala) } else { Seq.empty } @@ -160,7 +160,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch") assert(actualScannedColumns === expectedScannedColumns, "Scanned columns mismatch") - val actualPartitions = actualPartValues.map(_.asScala.mkString(",")).sorted + val actualPartitions = actualPartValues.map(_.mkString(",")).sorted val expectedPartitions = expectedPartValues.map(_.mkString(",")).sorted assert(actualPartitions === expectedPartitions, "Partitions selected do not match")