Skip to content

Commit ec3e274

Browse files
committed
propagate stats in api.partition to spark partition
1 parent 84c45c0 commit ec3e274

File tree

5 files changed

+51
-11
lines changed

5 files changed

+51
-11
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ class HiveContext private[hive](
198198
*/
199199
protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
200200

201+
/*
202+
* Calculate table statistics in runtime if needed.
203+
*/
204+
protected[hive] def hiveCalculateStatsRuntime: Boolean =
205+
getConf(HIVE_TABLE_CALCULATE_STATS_RUNTIME)
206+
201207
protected[hive] def hiveThriftServerSingleSession: Boolean =
202208
sc.conf.get("spark.sql.hive.thriftServer.singleSession", "false").toBoolean
203209

@@ -749,7 +755,11 @@ private[hive] object HiveContext {
749755

750756
val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async",
751757
defaultValue = Some(true),
752-
doc = "TODO")
758+
doc = "hive thrift server use background spark sql thread pool to execute sql queries.")
759+
760+
val HIVE_TABLE_CALCULATE_STATS_RUNTIME = booleanConf("spark.sql.hive.calulcate.stats.runtime",
761+
defaultValue = Some(false),
762+
doc = "Calculate table statistics in runtime if needed.")
753763

754764
/** Constructs a configuration for hive, where the metastore is located in a temp directory. */
755765
def newTemporaryConfiguration(): Map[String, String] = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.hive
1919

20+
import org.apache.hadoop.hive.common.StatsSetupConst._
21+
2022
import scala.collection.JavaConverters._
2123
import scala.collection.mutable
2224

@@ -770,8 +772,6 @@ private[hive] case class MetastoreRelation
770772

771773
@transient override lazy val statistics: Statistics = Statistics(
772774
sizeInBytes = {
773-
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
774-
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
775775
// TODO: check if this estimate is valid for tables after partition pruning.
776776
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
777777
// relatively cheap if parameters for the table are populated into the metastore. An
@@ -782,10 +782,7 @@ private[hive] case class MetastoreRelation
782782
// When table is external,`totalSize` is always zero, which will influence join strategy
783783
// so when `totalSize` is zero, use `rawDataSize` instead
784784
// if the size is still less than zero, we use default size
785-
Option(totalSize).map(_.toLong).filter(_ > 0)
786-
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
787-
.getOrElse(Option(calculateInput().getLength).filter(_ > 0)
788-
.getOrElse(sqlContext.conf.defaultSizeInBytes))))
785+
calculateInput().filter(_ > 0).getOrElse(sqlContext.conf.defaultSizeInBytes))
789786
}
790787
)
791788

@@ -820,6 +817,7 @@ private[hive] case class MetastoreRelation
820817
tPartition.setDbName(databaseName)
821818
tPartition.setTableName(tableName)
822819
tPartition.setValues(p.values.asJava)
820+
tPartition.setParameters(p.properties.asJava)
823821

824822
val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
825823
tPartition.setSd(sd)
@@ -860,7 +858,19 @@ private[hive] case class MetastoreRelation
860858
}
861859
}
862860

863-
private def calculateInput(): ContentSummary = {
861+
private def calculateInput(): Option[Long] = {
862+
var partitions: Seq[Partition] = Nil
863+
if (hiveQlTable.isPartitioned) {
864+
partitions = getHiveQlPartitions(pruningPredicates)
865+
}
866+
867+
// try with stats in table/partition properties
868+
val fromStats = getFromStats(hiveQlTable, partitions, TOTAL_SIZE).orElse(
869+
getFromStats(hiveQlTable, partitions, RAW_DATA_SIZE))
870+
if (fromStats.isDefined || !sqlContext.hiveCalculateStatsRuntime) {
871+
return fromStats
872+
}
873+
864874
// create dummy mapwork
865875
val dummy: MapWork = new MapWork
866876
val alias: String = "_dummy"
@@ -870,7 +880,7 @@ private[hive] case class MetastoreRelation
870880
val pathToAliases = dummy.getPathToAliases
871881
val pathToPartition = dummy.getPathToPartitionInfo
872882
if (hiveQlTable.isPartitioned) {
873-
for (partition <- getHiveQlPartitions(pruningPredicates)) {
883+
for (partition <- partitions) {
874884
val partPath = getDnsPath(partition.getDataLocation, sqlContext.hiveconf).toString
875885
pathToAliases.put(partPath, new util.ArrayList(util.Arrays.asList(alias)))
876886
pathToPartition.put(partPath, new PartitionDesc(partition, tableDesc))
@@ -881,7 +891,24 @@ private[hive] case class MetastoreRelation
881891
pathToPartition.put(tablePath, new PartitionDesc(tableDesc, null))
882892
}
883893
// calculate summary
884-
Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null)
894+
Some(Utilities.getInputSummary(new Context(sqlContext.hiveconf), dummy, null).getLength)
895+
}
896+
897+
private def getFromStats(table: Table, partitions: Seq[Partition], statKey: String):
898+
Option[Long] = {
899+
if (table.isPartitioned) {
900+
var totalSize: Long = 0
901+
for (partition <- partitions) {
902+
val partSize = Option(partition.getParameters.get(statKey)).map(_.toLong).filter(_ > 0)
903+
if (partSize.isEmpty) {
904+
return None;
905+
}
906+
totalSize += partSize.get
907+
}
908+
Some(totalSize)
909+
} else {
910+
Option(table.getParameters.get(statKey)).map(_.toLong).filter(_ > 0)
911+
}
885912
}
886913

887914
private[this] def castFromString(value: String, dataType: DataType) = {

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ private[hive] case class HiveStorageDescriptor(
4242

4343
private[hive] case class HivePartition(
4444
values: Seq[String],
45+
properties: Map[String, String],
4546
storage: HiveStorageDescriptor)
4647

4748
private[hive] case class HiveColumn(name: String, @Nullable hiveType: String, comment: String)

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ private[hive] class ClientWrapper(
422422
val apiPartition = partition.getTPartition
423423
HivePartition(
424424
values = Option(apiPartition.getValues).map(_.asScala).getOrElse(Seq.empty),
425+
properties = Option(apiPartition.getParameters).map(_.asScala.toMap).getOrElse(Map.empty),
425426
storage = HiveStorageDescriptor(
426427
location = apiPartition.getSd.getLocation,
427428
inputFormat = apiPartition.getSd.getInputFormat,

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
103103
|SELECT * FROM src
104104
""".stripMargin).collect()
105105

106-
assert(queryTotalSize("analyzeTable_part") === hiveContext.conf.defaultSizeInBytes)
106+
// stats for partition is populated in insert query
107+
assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
107108

108109
sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")
109110

0 commit comments

Comments
 (0)