Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ import java.util.concurrent.TimeUnit
import java.util.regex.Pattern

import scala.collection.JavaConverters._
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.catalyst.expressions.{And, AttributeSet}
import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.rules.Rule

import scala.collection.mutable.HashMap
import scala.language.implicitConversions

Expand Down Expand Up @@ -141,7 +149,7 @@ class HiveContext private[hive](
* converted to a data source table, using the data source set by spark.sql.sources.default.
* The table in CTAS statement will be converted when it meets any of the following conditions:
* - The CTAS does not specify any of a SerDe (ROW FORMAT SERDE), a File Format (STORED AS), or
* a Storage Hanlder (STORED BY), and the value of hive.default.fileformat in hive-site.xml
* a Storage Handler (STORED BY), and the value of hive.default.fileformat in hive-site.xml
* is either TextFile or SequenceFile.
* - The CTAS statement specifies TextFile (STORED AS TEXTFILE) as the file format and no SerDe
* is specified (no ROW FORMAT SERDE clause).
Expand Down Expand Up @@ -190,6 +198,12 @@ class HiveContext private[hive](
*/
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)

/*
* Calculate table statistics in runtime if needed.
*/
protected[hive] def hiveCalculateStatsRuntime: Boolean =
getConf(HIVE_TABLE_CALCULATE_STATS_RUNTIME)

protected[hive] def hiveThriftServerSingleSession: Boolean =
sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean

Expand Down Expand Up @@ -440,7 +454,7 @@ class HiveContext private[hive](
// If users put any Spark SQL setting in the spark conf (e.g. spark-defaults.conf),
// this setConf will be called in the constructor of the SQLContext.
// Also, calling hiveconf will create a default session containing a HiveConf, which
// will interfer with the creation of executionHive (which is a lazy val). So,
// will interfere with the creation of executionHive (which is a lazy val). So,
// we put hiveconf.set at the end of this method.
hiveconf.set(key, value)
}
Expand Down Expand Up @@ -565,6 +579,31 @@ class HiveContext private[hive](
}
}

@transient
override protected[sql] lazy val optimizer: Optimizer = new Optimizer {
override protected val batches =
DefaultOptimizer.batches.asInstanceOf[Seq[Batch]] ++
Seq(Batch("Hive Table Stats", Once, new HiveTableStats))
}

class HiveTableStats extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plan foreach ?

case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) =>
// Filter out all predicates that only deal with partition keys, these are given to the
// hive table scan operator to be used for partition pruning.
val partitionKeyIds = AttributeSet(relation.partitionKeys)
val (pruningPredicates, _) = predicates.partition { predicate =>
predicate.references.nonEmpty &&
predicate.references.subsetOf(partitionKeyIds)
}
relation.pruningPredicates = pruningPredicates
plan
case u: UnaryNode => apply(u.child); plan
case b: BinaryNode => apply(b.left); apply(b.right); plan
case _ => plan
}
}

@transient
private val hivePlanner = new SparkPlanner with HiveStrategies {
val hiveContext = self
Expand Down Expand Up @@ -716,7 +755,11 @@ private[hive] object HiveContext {

val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async",
defaultValue = Some(true),
doc = "TODO")
doc = "hive thrift server use background spark sql thread pool to execute sql queries.")

val HIVE_TABLE_CALCULATE_STATS_RUNTIME = booleanConf("spark.sql.hive.calulcate.stats.runtime",
defaultValue = Some(false),
doc = "Calculate table statistics in runtime if needed.")

/** Constructs a configuration for hive, where the metastore is located in a temp directory. */
def newTemporaryConfiguration(): Map[String, String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@

package org.apache.spark.sql.hive

import org.apache.hadoop.hive.common.StatsSetupConst._

import scala.collection.JavaConverters._
import scala.collection.mutable

import java.util

import com.google.common.base.Objects
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, ContentSummary}
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.Warehouse
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.metastore.api.{MetaException, FieldSchema}
import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.exec.{TableScanOperator, Operator, Utilities}
import org.apache.hadoop.hive.ql.hooks.ReadEntity
import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils
import org.apache.hadoop.hive.ql.parse.ParseContext
import org.apache.hadoop.hive.ql.plan.{PartitionDesc, OperatorDesc, MapWork, TableDesc}

import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
Expand All @@ -44,6 +54,7 @@ import org.apache.spark.sql.execution.{FileRelation, datasources}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.hive.execution.HiveNativeCommand
import org.apache.spark.sql.sources._
import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}

Expand Down Expand Up @@ -707,9 +718,11 @@ private[hive] case class InsertIntoHiveTable(
private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: HiveTable)
(@transient private val sqlContext: SQLContext)
(@transient private val sqlContext: HiveContext)
extends LeafNode with MultiInstanceRelation with FileRelation {

private[hive] var pruningPredicates: Seq[Expression] = Nil

override def equals(other: Any): Boolean = other match {
case relation: MetastoreRelation =>
databaseName == relation.databaseName &&
Expand Down Expand Up @@ -759,8 +772,6 @@ private[hive] case class MetastoreRelation

@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore. An
Expand All @@ -771,9 +782,7 @@ private[hive] case class MetastoreRelation
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(sqlContext.conf.defaultSizeInBytes)))
calculateInput().filter(_ > 0).getOrElse(sqlContext.conf.defaultSizeInBytes))
}
)

Expand All @@ -782,17 +791,33 @@ private[hive] case class MetastoreRelation
lazy val allPartitions = table.getAllPartitions

def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
getHiveQlPartitions(predicates, bindPredicate(predicates))
}

def bindPredicate(predicates: Seq[Expression]): Option[Expression] = {
predicates.reduceLeftOption(org.apache.spark.sql.catalyst.expressions.And).map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
BindReferences.bindReference(pred, partitionKeys)
}
}

def getHiveQlPartitions(
predicates: Seq[Expression],
pruner: Option[Expression]): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}

rawPartitions.map { p =>
prunePartitions(rawPartitions, pruner).map { p =>
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
tPartition.setDbName(databaseName)
tPartition.setTableName(tableName)
tPartition.setValues(p.values.asJava)
tPartition.setParameters(p.properties.asJava)

val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
tPartition.setSd(sd)
Expand All @@ -815,6 +840,86 @@ private[hive] case class MetastoreRelation
}
}

// moved from org.apache.spark.sql.hive.execution.HiveTableScan
private[hive] def prunePartitions(partitions: Seq[HivePartition], pruner: Option[Expression]) = {
pruner match {
case None => partitions
case Some(shouldKeep) => partitions.filter { part =>
val dataTypes = partitionKeys.map(_.dataType)
val castedValues = for ((value, dataType) <- part.values.zip(dataTypes)) yield {
castFromString(value, dataType)
}

// Only partitioned values are needed here, since the predicate has already been bound to
// partition key attribute references.
val row = InternalRow.fromSeq(castedValues)
shouldKeep.eval(row).asInstanceOf[Boolean]
}
}
}

private def calculateInput(): Option[Long] = {
var partitions: Seq[Partition] = Nil
if (hiveQlTable.isPartitioned) {
partitions = getHiveQlPartitions(pruningPredicates)
}

// try with stats in table/partition properties
val fromStats = getFromStats(hiveQlTable, partitions, TOTAL_SIZE).orElse(
getFromStats(hiveQlTable, partitions, RAW_DATA_SIZE))
if (fromStats.isDefined || !sqlContext.hiveCalculateStatsRuntime) {
return fromStats
}

// create dummy mapwork
val dummy: MapWork = new MapWork
val alias: String = "_dummy"
val operator: Operator[_ <: OperatorDesc] = new TableScanOperator

dummy.getAliasToWork.put(alias, operator)
val pathToAliases = dummy.getPathToAliases
val pathToPartition = dummy.getPathToPartitionInfo
if (hiveQlTable.isPartitioned) {
for (partition <- partitions) {
val partPath = getDnsPath(partition.getDataLocation, sqlContext.hiveconf).toString
pathToAliases.put(partPath, new util.ArrayList(util.Arrays.asList(alias)))
pathToPartition.put(partPath, new PartitionDesc(partition, tableDesc))
}
} else {
val tablePath = getDnsPath(hiveQlTable.getDataLocation, sqlContext.hiveconf).toString
pathToAliases.put(tablePath, new util.ArrayList(util.Arrays.asList(alias)))
pathToPartition.put(tablePath, new PartitionDesc(tableDesc, null))
}
// calculate summary
Some(Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null).getLength)
}

private def getFromStats(table: Table, partitions: Seq[Partition], statKey: String):
Option[Long] = {
if (table.isPartitioned) {
var totalSize: Long = 0
for (partition <- partitions) {
val partSize = Option(partition.getParameters.get(statKey)).map(_.toLong).filter(_ > 0)
if (partSize.isEmpty) {
return None;
}
totalSize += partSize.get
}
Some(totalSize)
} else {
Option(table.getParameters.get(statKey)).map(_.toLong).filter(_ > 0)
}
}

private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}

private[this] def getDnsPath(path: Path, conf: Configuration): Path = {
val fs = path.getFileSystem(conf)
return new Path(fs.getUri.getScheme, fs.getUri.getAuthority, path.toUri.getPath)
}

/** Only compare database and tablename, not alias. */
override def sameResult(plan: LogicalPlan): Boolean = {
plan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ private[hive] case class HiveStorageDescriptor(

private[hive] case class HivePartition(
values: Seq[String],
properties: Map[String, String],
storage: HiveStorageDescriptor)

private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ private[hive] class ClientWrapper(
val apiPartition = partition.getTPartition
HivePartition(
values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
properties = Option(apiPartition.getParameters).map(_.asScala.toMap).getOrElse(Map.empty),
storage = HiveStorageDescriptor(
location = apiPartition.getSd.getLocation,
inputFormat = apiPartition.getSd.getInputFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
Expand All @@ -31,7 +30,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.{BooleanType, DataType}

/**
* The Hive table scan operator. Column and partition pruning are both handled.
Expand All @@ -56,13 +54,7 @@ case class HiveTableScan(

// Bind all partition key attribute references in the partition pruning predicate for later
// evaluation.
private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")

BindReferences.bindReference(pred, relation.partitionKeys)
}
private[this] val boundPruningPred = relation.bindPredicate(partitionPruningPred)

// Create a local copy of hiveconf,so that scan specific modifications should not impact
// other queries
Expand All @@ -76,10 +68,6 @@ case class HiveTableScan(
private[this] val hadoopReader =
new HadoopTableReader(attributes, relation, context, hiveExtraConf)

private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}

private def addColumnMetadataToConf(hiveConf: HiveConf) {
// Specifies needed column IDs for those non-partitioning columns.
val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
Expand Down Expand Up @@ -107,33 +95,11 @@ case class HiveTableScan(
hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(","))
}

/**
* Prunes partitions not involve the query plan.
*
* @param partitions All partitions of the relation.
* @return Partitions that are involved in the query plan.
*/
private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
boundPruningPred match {
case None => partitions
case Some(shouldKeep) => partitions.filter { part =>
val dataTypes = relation.partitionKeys.map(_.dataType)
val castedValues = part.getValues.asScala.zip(dataTypes)
.map { case (value, dataType) => castFromString(value, dataType) }

// Only partitioned values are needed here, since the predicate has already been bound to
// partition key attribute references.
val row = InternalRow.fromSeq(castedValues)
shouldKeep.eval(row).asInstanceOf[Boolean]
}
}
}

protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
hadoopReader.makeRDDForPartitionedTable(
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
relation.getHiveQlPartitions(partitionPruningPred, boundPruningPred))
}

override def output: Seq[Attribute] = attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
|SELECT * FROM src
""".stripMargin).collect()

assert(queryTotalSize("analyzeTable_part") === hiveContext.conf.defaultSizeInBytes)
// stats for partition is populated in insert query
assert(queryTotalSize("analyzeTable_part") === BigInt(17436))

sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")

Expand Down
Loading