Skip to content

Commit e8355e0

Browse files
author
Zhenhua Wang
committed
relation estimation
1 parent 64817c4 commit e8355e0

File tree

4 files changed

+36
-7
lines changed

4 files changed

+36
-7
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden
2727
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2828
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
2929
import org.apache.spark.sql.catalyst.plans.logical._
30+
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
3031
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
3132
import org.apache.spark.sql.catalyst.util.quoteIdentifier
3233
import org.apache.spark.sql.types.StructType
@@ -367,10 +368,11 @@ case class CatalogStatistics(
367368
* on column names.
368369
*/
369370
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
370-
if (cboEnabled) {
371-
val attrStats = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
372-
Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
373-
attributeStats = AttributeMap(attrStats))
371+
if (cboEnabled && rowCount.isDefined) {
372+
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
373+
// Estimate size as number of rows * row size.
374+
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
375+
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
374376
} else {
375377
// When CBO is disabled, we apply the size-only estimation strategy, so there's no need to
376378
// propagate other statistics from catalog to the plan.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
1919

2020
import org.apache.spark.sql.catalyst.expressions.AttributeMap
2121
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
22-
import org.apache.spark.sql.catalyst.plans.logical
2322
import org.apache.spark.sql.catalyst.plans.logical._
2423

2524
/**

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
224224

225225
// Check relation statistics
226226
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
227-
assert(relation.stats.sizeInBytes == 0)
227+
assert(relation.stats.sizeInBytes == 1)
228228
assert(relation.stats.rowCount == Some(0))
229229
assert(relation.stats.attributeStats.size == 1)
230230
val (attribute, colStat) = relation.stats.attributeStats.head

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,35 @@ import org.apache.spark.sql.types._
4141

4242

4343
class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
44-
test("Hive serde tables should fallback to HDFS for size estimation") {
44+
45+
test("size estimation for relations based on row size * number of rows") {
46+
val dsTbl = "rel_est_ds_table"
47+
val hiveTbl = "rel_est_hive_table"
48+
withTable(dsTbl, hiveTbl) {
49+
spark.range(1000L).write.format("parquet").saveAsTable(dsTbl)
50+
sql(s"CREATE TABLE $hiveTbl STORED AS parquet AS SELECT * FROM $dsTbl")
51+
52+
Seq(dsTbl, hiveTbl).foreach { tbl =>
53+
sql(s"ANALYZE TABLE $tbl COMPUTE STATISTICS")
54+
val catalogStats = getCatalogStatistics(tbl)
55+
withSQLConf(SQLConf.CBO_ENABLED.key -> "false") {
56+
val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats
57+
assert(relationStats.sizeInBytes == catalogStats.sizeInBytes)
58+
assert(relationStats.rowCount.isEmpty)
59+
}
60+
spark.sessionState.catalog.refreshTable(TableIdentifier(tbl))
61+
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
62+
val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats
63+
// Due to compression in parquet files, in this test, file size is smaller than
64+
// in-memory size.
65+
assert(catalogStats.sizeInBytes < relationStats.sizeInBytes)
66+
assert(catalogStats.rowCount == relationStats.rowCount)
67+
}
68+
}
69+
}
70+
}
71+
72+
test("Hive serde tables should fallback to HDFS for size estimation") {
4573
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
4674
withTable("csv_table") {
4775
withTempDir { tempDir =>

0 commit comments

Comments
 (0)