From b7f6d5e41bc5aa065524fd383e4d2fae971bb068 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 12 Jun 2015 06:27:40 +0900 Subject: [PATCH 1/7] [SPARK-8312] [SQL] Populate statistics info of hive tables if it's needed to be --- .../org/apache/spark/sql/SQLContext.scala | 4 +- .../apache/spark/sql/hive/HiveContext.scala | 37 +++++++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 93 ++++++++++++++++++- .../sql/hive/execution/HiveTableScan.scala | 38 +------- .../sql/hive/execution/PruningSuite.scala | 4 +- 5 files changed, 130 insertions(+), 46 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 39471d2fb79a..25cc24d7868a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -202,7 +202,9 @@ class SQLContext private[sql]( } @transient - protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer + protected[sql] lazy val optimizer: Optimizer = newOptimizer + + protected[sql] def newOptimizer: Optimizer = DefaultOptimizer @transient protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c0bb5af7d5c8..c8dc4da14210 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -24,6 +24,15 @@ import java.util.concurrent.TimeUnit import java.util.regex.Pattern import scala.collection.JavaConverters._ +import org.apache.hadoop.hive.common.StatsSetupConst +import org.apache.hadoop.hive.common.`type`.HiveDecimal +import org.apache.spark.sql.catalyst.ParserDialect +import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet} +import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.rules.Rule + +import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.language.implicitConversions @@ -141,7 +150,7 @@ class HiveContext private[hive]( * converted to a data source table, using the data source set by spark.sql.sources.default. * The table in CTAS statement will be converted when it meets any of the following conditions: * - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or - * a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml + * a Storage Handler (STORED BY), and the value of hive.default.fileformat in hive-site.xml * is either TextFile or SequenceFile. * - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe * is specified (no ROW FORMAT SERDE clause). @@ -440,7 +449,7 @@ class HiveContext private[hive]( // If users put any Spark SQL setting in the spark conf (e.g. spark-defaults.conf), // this setConf will be called in the constructor of the SQLContext. // Also, calling hiveconf will create a default session containing a HiveConf, which - // will interfer with the creation of executionHive (which is a lazy val). So, + // will interfere with the creation of executionHive (which is a lazy val). So, // we put hiveconf.set at the end of this method. hiveconf.set(key, value) } @@ -565,6 +574,30 @@ class HiveContext private[hive]( } } + override protected[sql] def newOptimizer: Optimizer = new Optimizer { + override protected val batches = + DefaultOptimizer.batches.asInstanceOf[Seq[Batch]] ++ + Seq(Batch("Hive Table Stats", Once, HiveTableStats)) + } + + object HiveTableStats extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan match { + case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => + // Filter out all predicates that only deal with partition keys, these are given to the + // hive table scan operator to be used for partition pruning. + val partitionKeyIds = AttributeSet(relation.partitionKeys) + val (pruningPredicates, _) = predicates.partition { predicate => + !predicate.references.isEmpty && + predicate.references.subsetOf(partitionKeyIds) + } + relation.prepareStats(pruningPredicates) + plan + case u: UnaryNode => apply(u.child); plan + case b: BinaryNode => apply(b.left); apply(b.right); plan + case _ => plan + } + } + @transient private val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f4d45714fae4..1dcf8c222f5e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -20,15 +20,23 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ import scala.collection.mutable +import java.util + import com.google.common.base.Objects import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path, ContentSummary} import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.Warehouse -import org.apache.hadoop.hive.metastore.api.FieldSchema +import org.apache.hadoop.hive.metastore.api.{MetaException, FieldSchema} +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.exec.{TableScanOperator, Operator, Utilities} +import org.apache.hadoop.hive.ql.hooks.ReadEntity import org.apache.hadoop.hive.ql.metadata._ -import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils +import org.apache.hadoop.hive.ql.parse.ParseContext +import org.apache.hadoop.hive.ql.plan.{PartitionDesc, OperatorDesc, MapWork, TableDesc} import org.apache.spark.Logging import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog} @@ -44,6 +52,7 @@ import org.apache.spark.sql.execution.{FileRelation, datasources} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand import org.apache.spark.sql.sources._ +import org.apache.spark.sql.hive.execution.HiveTableScan import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode} @@ -707,7 +716,7 @@ private[hive] case class InsertIntoHiveTable( private[hive] case class MetastoreRelation (databaseName: String, tableName: String, alias: Option[String]) (val table: HiveTable) - (@transient private val sqlContext: SQLContext) + (@transient private val sqlContext: HiveContext) extends LeafNode with MultiInstanceRelation with FileRelation { override def equals(other: Any): Boolean = other match { @@ -782,13 +791,28 @@ private[hive] case class MetastoreRelation lazy val allPartitions = table.getAllPartitions def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { + getHiveQlPartitions(predicates, bindPredicate(predicates)) + } + + def bindPredicate(predicates: Seq[Expression]): Option[Expression] = { + predicates.reduceLeftOption(org.apache.spark.sql.catalyst.expressions.And).map { pred => + require( + pred.dataType == BooleanType, + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") + BindReferences.bindReference(pred, partitionKeys) + } + } + + def getHiveQlPartitions( + predicates: Seq[Expression], + pruner: Option[Expression]): Seq[Partition] = { val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { table.getPartitions(predicates) } else { allPartitions } - rawPartitions.map { p => + prunePartitions(rawPartitions, pruner).map { p => val tPartition = new org.apache.hadoop.hive.metastore.api.Partition tPartition.setDbName(databaseName) tPartition.setTableName(tableName) @@ -815,6 +839,65 @@ private[hive] case class MetastoreRelation } } + def prepareStats(partitionPruningPred: Seq[Expression]) { + val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) + val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) + if (!hiveQlTable.isNonNative && hiveQlTable.getDataLocation != null && + (partitionPruningPred.isEmpty && hiveQlTable.isPartitioned) || + Option(totalSize).map(_.toLong).filter(_ > 0).isEmpty && + Option(rawDataSize).map(_.toLong).filter(_ > 0).isEmpty) { + + val dummy: MapWork = new MapWork + val alias: String = "_dummy" + val operator: Operator[_ <: OperatorDesc] = new TableScanOperator + + dummy.getAliasToWork.put(alias, operator) + val pathToAliases = dummy.getPathToAliases + val pathToPartition = dummy.getPathToPartitionInfo + if (hiveQlTable.isPartitioned) { + for (partition <- getHiveQlPartitions(partitionPruningPred)) { + val partPath = getDnsPath(partition.getDataLocation, sqlContext.hiveconf).toString + pathToAliases.put(partPath, new util.ArrayList(util.Arrays.asList(alias))) + pathToPartition.put(partPath, new PartitionDesc(partition, tableDesc)) + } + } else { + val tablePath = getDnsPath(hiveQlTable.getDataLocation, sqlContext.hiveconf).toString + pathToAliases.put(tablePath, new util.ArrayList(util.Arrays.asList(alias))) + pathToPartition.put(tablePath, new PartitionDesc(tableDesc, null)) + } + // update + hiveQlTable.getParameters.put(StatsSetupConst.TOTAL_SIZE, + Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null).getLength.toString) + } + } + + // moved from org.apache.spark.sql.hive.execution.HiveTableScan + private[hive] def prunePartitions(partitions: Seq[HivePartition], pruner: Option[Expression]) = { + pruner match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = partitionKeys.map(_.dataType) + val castedValues = for ((value, dataType) <- part.values.zip(dataTypes)) yield { + castFromString(value, dataType) + } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = InternalRow.fromSeq(castedValues) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + + private[this] def castFromString(value: String, dataType: DataType) = { + Cast(Literal(value), dataType).eval(null) + } + + private[this] def getDnsPath(path: Path, conf: Configuration): Path = { + val fs = path.getFileSystem(conf) + return new Path(fs.getUri.getScheme, fs.getUri.getAuthority, path.toUri.getPath) + } + /** Only compare database and tablename, not alias. */ override def sameResult(plan: LogicalPlan): Boolean = { plan match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 806d2b9b0b7d..e5657a1f3bb5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -31,7 +30,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive._ -import org.apache.spark.sql.types.{BooleanType, DataType} /** * The Hive table scan operator. Column and partition pruning are both handled. @@ -56,13 +54,7 @@ case class HiveTableScan( // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. - private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => - require( - pred.dataType == BooleanType, - s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") - - BindReferences.bindReference(pred, relation.partitionKeys) - } + private[this] val boundPruningPred = relation.bindPredicate(partitionPruningPred) // Create a local copy of hiveconf,so that scan specific modifications should not impact // other queries @@ -76,10 +68,6 @@ case class HiveTableScan( private[this] val hadoopReader = new HadoopTableReader(attributes, relation, context, hiveExtraConf) - private[this] def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType).eval(null) - } - private def addColumnMetadataToConf(hiveConf: HiveConf) { // Specifies needed column IDs for those non-partitioning columns. val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) @@ -107,33 +95,11 @@ case class HiveTableScan( hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(",")) } - /** - * Prunes partitions not involve the query plan. - * - * @param partitions All partitions of the relation. - * @return Partitions that are involved in the query plan. - */ - private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { - boundPruningPred match { - case None => partitions - case Some(shouldKeep) => partitions.filter { part => - val dataTypes = relation.partitionKeys.map(_.dataType) - val castedValues = part.getValues.asScala.zip(dataTypes) - .map { case (value, dataType) => castFromString(value, dataType) } - - // Only partitioned values are needed here, since the predicate has already been bound to - // partition key attribute references. - val row = InternalRow.fromSeq(castedValues) - shouldKeep.eval(row).asInstanceOf[Boolean] - } - } - } - protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) { hadoopReader.makeRDDForTable(relation.hiveQlTable) } else { hadoopReader.makeRDDForPartitionedTable( - prunePartitions(relation.getHiveQlPartitions(partitionPruningPred))) + relation.getHiveQlPartitions(partitionPruningPred, boundPruningPred)) } override def output: Seq[Attribute] = attributes diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 210d56674541..9cc55fedd7a4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -150,7 +150,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { case p @ HiveTableScan(columns, relation, _) => val columnNames = columns.map(_.name) val partValues = if (relation.table.isPartitioned) { - p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues) + relation.getHiveQlPartitions(p.partitionPruningPred).map(_.getValues.asScala) } else { Seq.empty } @@ -160,7 +160,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch") assert(actualScannedColumns === expectedScannedColumns, "Scanned columns mismatch") - val actualPartitions = actualPartValues.map(_.asScala.mkString(",")).sorted + val actualPartitions = actualPartValues.map(_.mkString(",")).sorted val expectedPartitions = expectedPartValues.map(_.mkString(",")).sorted assert(actualPartitions === expectedPartitions, "Partitions selected do not match") From e312e9181fdbfee9c5fda16161bcf83778c11f72 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 18 Sep 2015 11:30:31 +0900 Subject: [PATCH 2/7] fixes scalastyle fail --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index c8dc4da14210..fd2dd2b39477 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.rules.Rule -import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import scala.language.implicitConversions From 80e84bc96f367458c8aa0699ca090df3df5b0a12 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 18 Sep 2015 17:56:17 +0900 Subject: [PATCH 3/7] fixed test fail --- .../main/scala/org/apache/spark/sql/hive/HiveContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index fd2dd2b39477..7eee58fc0bfa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -576,10 +576,10 @@ class HiveContext private[hive]( override protected[sql] def newOptimizer: Optimizer = new Optimizer { override protected val batches = DefaultOptimizer.batches.asInstanceOf[Seq[Batch]] ++ - Seq(Batch("Hive Table Stats", Once, HiveTableStats)) + Seq(Batch("Hive Table Stats", Once, new HiveTableStats)) } - object HiveTableStats extends Rule[LogicalPlan] { + class HiveTableStats extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan match { case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => // Filter out all predicates that only deal with partition keys, these are given to the From 4ce61087d807b900e1cc56b6299fb23623849da4 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Thu, 19 Nov 2015 11:36:07 +0900 Subject: [PATCH 4/7] addressed comments --- .../org/apache/spark/sql/SQLContext.scala | 4 +- .../apache/spark/sql/hive/HiveContext.scala | 6 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 61 +++++++++---------- 3 files changed, 32 insertions(+), 39 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 25cc24d7868a..39471d2fb79a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -202,9 +202,7 @@ class SQLContext private[sql]( } @transient - protected[sql] lazy val optimizer: Optimizer = newOptimizer - - protected[sql] def newOptimizer: Optimizer = DefaultOptimizer + protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer @transient protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 7eee58fc0bfa..b709e482df37 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -573,7 +573,7 @@ class HiveContext private[hive]( } } - override protected[sql] def newOptimizer: Optimizer = new Optimizer { + override protected[sql] lazy val optimizer: Optimizer = new Optimizer { override protected val batches = DefaultOptimizer.batches.asInstanceOf[Seq[Batch]] ++ Seq(Batch("Hive Table Stats", Once, new HiveTableStats)) @@ -586,10 +586,10 @@ class HiveContext private[hive]( // hive table scan operator to be used for partition pruning. val partitionKeyIds = AttributeSet(relation.partitionKeys) val (pruningPredicates, _) = predicates.partition { predicate => - !predicate.references.isEmpty && + predicate.references.nonEmpty && predicate.references.subsetOf(partitionKeyIds) } - relation.prepareStats(pruningPredicates) + relation.pruningPredicates = pruningPredicates plan case u: UnaryNode => apply(u.child); plan case b: BinaryNode => apply(b.left); apply(b.right); plan diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 1dcf8c222f5e..c1b28338d421 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -719,6 +719,8 @@ private[hive] case class MetastoreRelation (@transient private val sqlContext: HiveContext) extends LeafNode with MultiInstanceRelation with FileRelation { + private[hive] var pruningPredicates: Seq[Expression] = null + override def equals(other: Any): Boolean = other match { case relation: MetastoreRelation => databaseName == relation.databaseName && @@ -782,7 +784,8 @@ private[hive] case class MetastoreRelation // if the size is still less than zero, we use default size Option(totalSize).map(_.toLong).filter(_ > 0) .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sqlContext.conf.defaultSizeInBytes))) + .getOrElse(Option(calculateInput().getLength).filter(_ > 0) + .getOrElse(sqlContext.conf.defaultSizeInBytes)))) } ) @@ -839,38 +842,6 @@ private[hive] case class MetastoreRelation } } - def prepareStats(partitionPruningPred: Seq[Expression]) { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) - if (!hiveQlTable.isNonNative && hiveQlTable.getDataLocation != null && - (partitionPruningPred.isEmpty && hiveQlTable.isPartitioned) || - Option(totalSize).map(_.toLong).filter(_ > 0).isEmpty && - Option(rawDataSize).map(_.toLong).filter(_ > 0).isEmpty) { - - val dummy: MapWork = new MapWork - val alias: String = "_dummy" - val operator: Operator[_ <: OperatorDesc] = new TableScanOperator - - dummy.getAliasToWork.put(alias, operator) - val pathToAliases = dummy.getPathToAliases - val pathToPartition = dummy.getPathToPartitionInfo - if (hiveQlTable.isPartitioned) { - for (partition <- getHiveQlPartitions(partitionPruningPred)) { - val partPath = getDnsPath(partition.getDataLocation, sqlContext.hiveconf).toString - pathToAliases.put(partPath, new util.ArrayList(util.Arrays.asList(alias))) - pathToPartition.put(partPath, new PartitionDesc(partition, tableDesc)) - } - } else { - val tablePath = getDnsPath(hiveQlTable.getDataLocation, sqlContext.hiveconf).toString - pathToAliases.put(tablePath, new util.ArrayList(util.Arrays.asList(alias))) - pathToPartition.put(tablePath, new PartitionDesc(tableDesc, null)) - } - // update - hiveQlTable.getParameters.put(StatsSetupConst.TOTAL_SIZE, - Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null).getLength.toString) - } - } - // moved from org.apache.spark.sql.hive.execution.HiveTableScan private[hive] def prunePartitions(partitions: Seq[HivePartition], pruner: Option[Expression]) = { pruner match { @@ -889,6 +860,30 @@ private[hive] case class MetastoreRelation } } + private def calculateInput(): ContentSummary = { + // create dummy mapwork + val dummy: MapWork = new MapWork + val alias: String = "_dummy" + val operator: Operator[_ <: OperatorDesc] = new TableScanOperator + + dummy.getAliasToWork.put(alias, operator) + val pathToAliases = dummy.getPathToAliases + val pathToPartition = dummy.getPathToPartitionInfo + if (hiveQlTable.isPartitioned) { + for (partition <- getHiveQlPartitions(pruningPredicates)) { + val partPath = getDnsPath(partition.getDataLocation, sqlContext.hiveconf).toString + pathToAliases.put(partPath, new util.ArrayList(util.Arrays.asList(alias))) + pathToPartition.put(partPath, new PartitionDesc(partition, tableDesc)) + } + } else { + val tablePath = getDnsPath(hiveQlTable.getDataLocation, sqlContext.hiveconf).toString + pathToAliases.put(tablePath, new util.ArrayList(util.Arrays.asList(alias))) + pathToPartition.put(tablePath, new PartitionDesc(tableDesc, null)) + } + // calculate summary + Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null) + } + private[this] def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) } From 4362f9427f66c1dc54a30a41b556a39dfd9db0b2 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Thu, 19 Nov 2015 15:00:50 +0900 Subject: [PATCH 5/7] missed transient annotation for optimizer --- .../src/main/scala/org/apache/spark/sql/hive/HiveContext.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index b709e482df37..d2d0d8599a0a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -573,6 +573,7 @@ class HiveContext private[hive]( } } + @transient override protected[sql] lazy val optimizer: Optimizer = new Optimizer { override protected val batches = DefaultOptimizer.batches.asInstanceOf[Seq[Batch]] ++ From 84c45c0efd0b62d8d88c20395bfc0695e1ab4a6e Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 20 Nov 2015 14:32:23 +0900 Subject: [PATCH 6/7] fixed NPE --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c1b28338d421..9ae23b3c3b3b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -719,7 +719,7 @@ private[hive] case class MetastoreRelation (@transient private val sqlContext: HiveContext) extends LeafNode with MultiInstanceRelation with FileRelation { - private[hive] var pruningPredicates: Seq[Expression] = null + private[hive] var pruningPredicates: Seq[Expression] = Nil override def equals(other: Any): Boolean = other match { case relation: MetastoreRelation => From ec3e274d6407f3a189bce19d11f1b11d053e2f68 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Mon, 23 Nov 2015 10:26:04 +0900 Subject: [PATCH 7/7] propagate stats in api.partition to spark partition --- .../apache/spark/sql/hive/HiveContext.scala | 12 ++++- .../spark/sql/hive/HiveMetastoreCatalog.scala | 45 +++++++++++++++---- .../sql/hive/client/ClientInterface.scala | 1 + .../spark/sql/hive/client/ClientWrapper.scala | 1 + .../spark/sql/hive/StatisticsSuite.scala | 3 +- 5 files changed, 51 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index d2d0d8599a0a..3ef507f8c5d1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -198,6 +198,12 @@ class HiveContext private[hive]( */ protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC) + /* + * Calculate table statistics in runtime if needed. + */ + protected[hive] def hiveCalculateStatsRuntime: Boolean = + getConf(HIVE_TABLE_CALCULATE_STATS_RUNTIME) + protected[hive] def hiveThriftServerSingleSession: Boolean = sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean @@ -749,7 +755,11 @@ private[hive] object HiveContext { val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async", defaultValue = Some(true), - doc = "TODO") + doc = "hive thrift server use background spark sql thread pool to execute sql queries.") + + val HIVE_TABLE_CALCULATE_STATS_RUNTIME = booleanConf("spark.sql.hive.calulcate.stats.runtime", + defaultValue = Some(false), + doc = "Calculate table statistics in runtime if needed.") /** Constructs a configuration for hive, where the metastore is located in a temp directory. */ def newTemporaryConfiguration(): Map[String, String] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9ae23b3c3b3b..ee8d39e89bde 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import org.apache.hadoop.hive.common.StatsSetupConst._ + import scala.collection.JavaConverters._ import scala.collection.mutable @@ -770,8 +772,6 @@ private[hive] case class MetastoreRelation @transient override lazy val statistics: Statistics = Statistics( sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) // TODO: check if this estimate is valid for tables after partition pruning. // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be // relatively cheap if parameters for the table are populated into the metastore. An @@ -782,10 +782,7 @@ private[hive] case class MetastoreRelation // When table is external,`totalSize` is always zero, which will influence join strategy // so when `totalSize` is zero, use `rawDataSize` instead // if the size is still less than zero, we use default size - Option(totalSize).map(_.toLong).filter(_ > 0) - .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(Option(calculateInput().getLength).filter(_ > 0) - .getOrElse(sqlContext.conf.defaultSizeInBytes)))) + calculateInput().filter(_ > 0).getOrElse(sqlContext.conf.defaultSizeInBytes)) } ) @@ -820,6 +817,7 @@ private[hive] case class MetastoreRelation tPartition.setDbName(databaseName) tPartition.setTableName(tableName) tPartition.setValues(p.values.asJava) + tPartition.setParameters(p.properties.asJava) val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor() tPartition.setSd(sd) @@ -860,7 +858,19 @@ private[hive] case class MetastoreRelation } } - private def calculateInput(): ContentSummary = { + private def calculateInput(): Option[Long] = { + var partitions: Seq[Partition] = Nil + if (hiveQlTable.isPartitioned) { + partitions = getHiveQlPartitions(pruningPredicates) + } + + // try with stats in table/partition properties + val fromStats = getFromStats(hiveQlTable, partitions, TOTAL_SIZE).orElse( + getFromStats(hiveQlTable, partitions, RAW_DATA_SIZE)) + if (fromStats.isDefined || !sqlContext.hiveCalculateStatsRuntime) { + return fromStats + } + // create dummy mapwork val dummy: MapWork = new MapWork val alias: String = "_dummy" @@ -870,7 +880,7 @@ private[hive] case class MetastoreRelation val pathToAliases = dummy.getPathToAliases val pathToPartition = dummy.getPathToPartitionInfo if (hiveQlTable.isPartitioned) { - for (partition <- getHiveQlPartitions(pruningPredicates)) { + for (partition <- partitions) { val partPath = getDnsPath(partition.getDataLocation, sqlContext.hiveconf).toString pathToAliases.put(partPath, new util.ArrayList(util.Arrays.asList(alias))) pathToPartition.put(partPath, new PartitionDesc(partition, tableDesc)) @@ -881,7 +891,24 @@ private[hive] case class MetastoreRelation pathToPartition.put(tablePath, new PartitionDesc(tableDesc, null)) } // calculate summary - Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null) + Some(Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null).getLength) + } + + private def getFromStats(table: Table, partitions: Seq[Partition], statKey: String): + Option[Long] = { + if (table.isPartitioned) { + var totalSize: Long = 0 + for (partition <- partitions) { + val partSize = Option(partition.getParameters.get(statKey)).map(_.toLong).filter(_ > 0) + if (partSize.isEmpty) { + return None; + } + totalSize += partSize.get + } + Some(totalSize) + } else { + Option(table.getParameters.get(statKey)).map(_.toLong).filter(_ > 0) + } } private[this] def castFromString(value: String, dataType: DataType) = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala index 9d9a55edd731..fe21970c8ba1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala @@ -42,6 +42,7 @@ private[hive] case class HiveStorageDescriptor( private[hive] case class HivePartition( values: Seq[String], + properties: Map[String, String], storage: HiveStorageDescriptor) private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala index 598ccdeee4ad..f6e3cc7d0e91 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala @@ -422,6 +422,7 @@ private[hive] class ClientWrapper( val apiPartition = partition.getTPartition HivePartition( values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty), + properties = Option(apiPartition.getParameters).map(_.asScala.toMap).getOrElse(Map.empty), storage = HiveStorageDescriptor( location = apiPartition.getSd.getLocation, inputFormat = apiPartition.getSd.getInputFormat, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index f775f1e95587..5984b05f6b1b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -103,7 +103,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton { |SELECT * FROM src """.stripMargin).collect() - assert(queryTotalSize("analyzeTable_part") === hiveContext.conf.defaultSizeInBytes) + // stats for partition is populated in insert query + assert(queryTotalSize("analyzeTable_part") === BigInt(17436)) sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")