From e8355e08a12b7f335ab7ff9ebede1824acbf8092 Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Tue, 28 Nov 2017 19:03:06 +0800 Subject: [PATCH 1/2] relation estimation --- .../sql/catalyst/catalog/interface.scala | 10 ++++--- .../SizeInBytesOnlyStatsPlanVisitor.scala | 1 - .../sql/StatisticsCollectionTestBase.scala | 2 +- .../spark/sql/hive/StatisticsSuite.scala | 30 ++++++++++++++++++- 4 files changed, 36 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index b10ce054cbf0..dc53a9f39db1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.types.StructType @@ -367,10 +368,11 @@ case class CatalogStatistics( * on column names. */ def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = { - if (cboEnabled) { - val attrStats = planOutput.flatMap(a => colStats.get(a.name).map(a -> _)) - Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount, - attributeStats = AttributeMap(attrStats)) + if (cboEnabled && rowCount.isDefined) { + val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _))) + // Estimate size as number of rows * row size. + val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats) + Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats) } else { // When CBO is disabled, we apply the size-only estimation strategy, so there's no need to // propagate other statistics from catalog to the plan. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala index d701a956887a..5e1c4e0bd606 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation import org.apache.spark.sql.catalyst.expressions.AttributeMap import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} -import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala index 0a0407d5dbac..65ccc1915882 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala @@ -224,7 +224,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils // Check relation statistics withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { - assert(relation.stats.sizeInBytes == 0) + assert(relation.stats.sizeInBytes == 1) assert(relation.stats.rowCount == Some(0)) assert(relation.stats.attributeStats.size == 1) val (attribute, colStat) = relation.stats.attributeStats.head diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 7427948fe138..5d37fc8bd9f9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -41,7 +41,35 @@ import org.apache.spark.sql.types._ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton { - test("Hive serde tables should fallback to HDFS for size estimation") { + + test("size estimation for relations based on row size * number of rows") { + val dsTbl = "rel_est_ds_table" + val hiveTbl = "rel_est_hive_table" + withTable(dsTbl, hiveTbl) { + spark.range(1000L).write.format("parquet").saveAsTable(dsTbl) + sql(s"CREATE TABLE $hiveTbl STORED AS parquet AS SELECT * FROM $dsTbl") + + Seq(dsTbl, hiveTbl).foreach { tbl => + sql(s"ANALYZE TABLE $tbl COMPUTE STATISTICS") + val catalogStats = getCatalogStatistics(tbl) + withSQLConf(SQLConf.CBO_ENABLED.key -> "false") { + val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats + assert(relationStats.sizeInBytes == catalogStats.sizeInBytes) + assert(relationStats.rowCount.isEmpty) + } + spark.sessionState.catalog.refreshTable(TableIdentifier(tbl)) + withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { + val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats + // Due to compression in parquet files, in this test, file size is smaller than + // in-memory size. + assert(catalogStats.sizeInBytes < relationStats.sizeInBytes) + assert(catalogStats.rowCount == relationStats.rowCount) + } + } + } + } + + test("Hive serde tables should fallback to HDFS for size estimation") { withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") { withTable("csv_table") { withTempDir { tempDir => From acc498ca072d5cf1cbe8a282df5efdae5a1a7d0c Mon Sep 17 00:00:00 2001 From: Zhenhua Wang Date: Tue, 28 Nov 2017 23:37:43 +0800 Subject: [PATCH 2/2] fix comments --- .../org/apache/spark/sql/catalyst/catalog/interface.scala | 4 ++-- .../scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index dc53a9f39db1..95b6fbb0cd61 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -374,8 +374,8 @@ case class CatalogStatistics( val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats) Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats) } else { - // When CBO is disabled, we apply the size-only estimation strategy, so there's no need to - // propagate other statistics from catalog to the plan. + // When CBO is disabled or the table doesn't have other statistics, we apply the size-only + // estimation strategy and only propagate sizeInBytes in statistics. Statistics(sizeInBytes = sizeInBytes) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 5d37fc8bd9f9..0cdd9305c6b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -42,12 +42,12 @@ import org.apache.spark.sql.types._ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton { - test("size estimation for relations based on row size * number of rows") { + test("size estimation for relations is based on row size * number of rows") { val dsTbl = "rel_est_ds_table" val hiveTbl = "rel_est_hive_table" withTable(dsTbl, hiveTbl) { spark.range(1000L).write.format("parquet").saveAsTable(dsTbl) - sql(s"CREATE TABLE $hiveTbl STORED AS parquet AS SELECT * FROM $dsTbl") + spark.range(1000L).write.format("hive").saveAsTable(hiveTbl) Seq(dsTbl, hiveTbl).foreach { tbl => sql(s"ANALYZE TABLE $tbl COMPUTE STATISTICS")