Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIden
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -367,13 +368,14 @@ case class CatalogStatistics(
* on column names.
*/
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled) {
val attrStats = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
attributeStats = AttributeMap(attrStats))
if (cboEnabled && rowCount.isDefined) {
val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
} else {
// When CBO is disabled, we apply the size-only estimation strategy, so there's no need to
// propagate other statistics from catalog to the plan.
// When CBO is disabled or the table doesn't have other statistics, we apply the size-only
// estimation strategy and only propagate sizeInBytes in statistics.
Statistics(sizeInBytes = sizeInBytes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.plans.logical.statsEstimation

import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils

// Check relation statistics
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
assert(relation.stats.sizeInBytes == 0)
assert(relation.stats.sizeInBytes == 1)
assert(relation.stats.rowCount == Some(0))
assert(relation.stats.attributeStats.size == 1)
val (attribute, colStat) = relation.stats.attributeStats.head
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,35 @@ import org.apache.spark.sql.types._


class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
test("Hive serde tables should fallback to HDFS for size estimation") {

test("size estimation for relations is based on row size * number of rows") {
val dsTbl = "rel_est_ds_table"
val hiveTbl = "rel_est_hive_table"
withTable(dsTbl, hiveTbl) {
spark.range(1000L).write.format("parquet").saveAsTable(dsTbl)
spark.range(1000L).write.format("hive").saveAsTable(hiveTbl)

Seq(dsTbl, hiveTbl).foreach { tbl =>
sql(s"ANALYZE TABLE $tbl COMPUTE STATISTICS")
val catalogStats = getCatalogStatistics(tbl)
withSQLConf(SQLConf.CBO_ENABLED.key -> "false") {
val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats
assert(relationStats.sizeInBytes == catalogStats.sizeInBytes)
assert(relationStats.rowCount.isEmpty)
}
spark.sessionState.catalog.refreshTable(TableIdentifier(tbl))
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
val relationStats = spark.table(tbl).queryExecution.optimizedPlan.stats
// Due to compression in parquet files, in this test, file size is smaller than
// in-memory size.
assert(catalogStats.sizeInBytes < relationStats.sizeInBytes)
assert(catalogStats.rowCount == relationStats.rowCount)
}
}
}
}

test("Hive serde tables should fallback to HDFS for size estimation") {
withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
withTable("csv_table") {
withTempDir { tempDir =>
Expand Down