Skip to content

Commit f4a3274

Browse files
wangzhenhuacmonkey
authored andcommitted
[SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and convert it to Statistics in relations
## What changes were proposed in this pull request? Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing. We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used. ## How was this patch tested? add test cases Author: wangzhenhua <[email protected]> Author: Zhenhua Wang <[email protected]> Closes apache#16323 from wzhfy/nameToAttr.
1 parent cb15853 commit f4a3274

File tree

9 files changed

+95
-23
lines changed

9 files changed

+95
-23
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import java.util.Date
2121

2222
import org.apache.spark.sql.AnalysisException
2323
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
24-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
25-
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
24+
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
25+
import org.apache.spark.sql.catalyst.plans.logical._
2626
import org.apache.spark.sql.catalyst.util.quoteIdentifier
2727
import org.apache.spark.sql.types.{StructField, StructType}
2828

@@ -171,7 +171,7 @@ case class CatalogTable(
171171
createTime: Long = System.currentTimeMillis,
172172
lastAccessTime: Long = -1,
173173
properties: Map[String, String] = Map.empty,
174-
stats: Option[Statistics] = None,
174+
stats: Option[CatalogStatistics] = None,
175175
viewOriginalText: Option[String] = None,
176176
viewText: Option[String] = None,
177177
comment: Option[String] = None,
@@ -247,6 +247,34 @@ case class CatalogTable(
247247
}
248248

249249

250+
/**
251+
* This class of statistics is used in [[CatalogTable]] to interact with metastore.
252+
* We define this new class instead of directly using [[Statistics]] here because there are no
253+
* concepts of attributes or broadcast hint in catalog.
254+
*/
255+
case class CatalogStatistics(
256+
sizeInBytes: BigInt,
257+
rowCount: Option[BigInt] = None,
258+
colStats: Map[String, ColumnStat] = Map.empty) {
259+
260+
/**
261+
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
262+
* on column names.
263+
*/
264+
def toPlanStats(planOutput: Seq[Attribute]): Statistics = {
265+
val matched = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
266+
Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
267+
attributeStats = AttributeMap(matched))
268+
}
269+
270+
/** Readable string representation for the CatalogStatistics. */
271+
def simpleString: String = {
272+
val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else ""
273+
s"$sizeInBytes bytes$rowCountString"
274+
}
275+
}
276+
277+
250278
case class CatalogTableType private(name: String)
251279
object CatalogTableType {
252280
val EXTERNAL = new CatalogTableType("EXTERNAL")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ import org.apache.spark.sql.types._
4141
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
4242
* defaults to the product of children's `sizeInBytes`.
4343
* @param rowCount Estimated number of rows.
44-
* @param colStats Column-level statistics.
44+
* @param attributeStats Statistics for Attributes.
4545
* @param isBroadcastable If true, output is small enough to be used in a broadcast join.
4646
*/
4747
case class Statistics(
4848
sizeInBytes: BigInt,
4949
rowCount: Option[BigInt] = None,
50-
colStats: Map[String, ColumnStat] = Map.empty,
50+
attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
5151
isBroadcastable: Boolean = false) {
5252

5353
override def toString: String = "Statistics(" + simpleString + ")"

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
2121
import org.apache.spark.sql._
2222
import org.apache.spark.sql.catalyst.TableIdentifier
2323
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
24-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
24+
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
2525
import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.expressions.aggregate._
2727
import org.apache.spark.sql.catalyst.plans.logical._
@@ -64,7 +64,7 @@ case class AnalyzeColumnCommand(
6464
AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
6565

6666
// We also update table-level stats in order to keep them consistent with column-level stats.
67-
val statistics = Statistics(
67+
val statistics = CatalogStatistics(
6868
sizeInBytes = sizeInBytes,
6969
rowCount = Some(rowCount),
7070
// Newly computed column stats should override the existing ones.

sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ import org.apache.spark.internal.Logging
2525
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
2626
import org.apache.spark.sql.catalyst.TableIdentifier
2727
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
28-
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
29-
import org.apache.spark.sql.catalyst.plans.logical.Statistics
28+
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
3029
import org.apache.spark.sql.execution.datasources.LogicalRelation
3130
import org.apache.spark.sql.internal.SessionState
3231

@@ -62,9 +61,9 @@ case class AnalyzeTableCommand(
6261
def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
6362
val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
6463
val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
65-
var newStats: Option[Statistics] = None
64+
var newStats: Option[CatalogStatistics] = None
6665
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
67-
newStats = Some(Statistics(sizeInBytes = newTotalSize))
66+
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
6867
}
6968
// We only set rowCount when noscan is false, because otherwise:
7069
// 1. when total size is not changed, we don't need to alter the table;
@@ -76,7 +75,8 @@ case class AnalyzeTableCommand(
7675
newStats = if (newStats.isDefined) {
7776
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
7877
} else {
79-
Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
78+
Some(CatalogStatistics(
79+
sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
8080
}
8181
}
8282
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ case class LogicalRelation(
7373
override lazy val cleanArgs: Seq[Any] = Seq(relation)
7474

7575
@transient override lazy val statistics: Statistics = {
76-
catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
76+
catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
7777
Statistics(sizeInBytes = relation.sizeInBytes))
7878
}
7979

sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable
2424
import scala.util.Random
2525

2626
import org.apache.spark.sql.catalyst.TableIdentifier
27+
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
2728
import org.apache.spark.sql.catalyst.plans.logical._
2829
import org.apache.spark.sql.execution.datasources.LogicalRelation
2930
import org.apache.spark.sql.internal.StaticSQLConf
@@ -39,7 +40,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
3940
import testImplicits._
4041

4142
private def checkTableStats(tableName: String, expectedRowCount: Option[Int])
42-
: Option[Statistics] = {
43+
: Option[CatalogStatistics] = {
4344
val df = spark.table(tableName)
4445
val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
4546
assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
@@ -260,4 +261,46 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
260261
}
261262
}
262263
}
264+
265+
// This test will be run twice: with and without Hive support
266+
test("conversion from CatalogStatistics to Statistics") {
267+
withTable("ds_tbl", "hive_tbl") {
268+
// Test data source table
269+
checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
270+
// Test hive serde table
271+
if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
272+
checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
273+
}
274+
}
275+
}
276+
277+
private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = {
278+
// Create an empty table and run analyze command on it.
279+
val createTableSql = if (isDatasourceTable) {
280+
s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET"
281+
} else {
282+
s"CREATE TABLE $tableName (c1 INT, c2 STRING)"
283+
}
284+
sql(createTableSql)
285+
// Analyze only one column.
286+
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
287+
val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
288+
case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable)
289+
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
290+
}.head
291+
val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
292+
// Check catalog statistics
293+
assert(catalogTable.stats.isDefined)
294+
assert(catalogTable.stats.get.sizeInBytes == 0)
295+
assert(catalogTable.stats.get.rowCount == Some(0))
296+
assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
297+
298+
// Check relation statistics
299+
assert(relation.statistics.sizeInBytes == 0)
300+
assert(relation.statistics.rowCount == Some(0))
301+
assert(relation.statistics.attributeStats.size == 1)
302+
val (attribute, colStat) = relation.statistics.attributeStats.head
303+
assert(attribute.name == "c1")
304+
assert(colStat == emptyColStat)
305+
}
263306
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
3737
import org.apache.spark.sql.catalyst.catalog._
3838
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
3939
import org.apache.spark.sql.catalyst.expressions._
40-
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
40+
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
4141
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
4242
import org.apache.spark.sql.execution.command.DDLUtils
4343
import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -656,7 +656,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
656656
}
657657

658658
table = table.copy(
659-
stats = Some(Statistics(
659+
stats = Some(CatalogStatistics(
660660
sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)),
661661
rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
662662
colStats = colStats.toMap)))

sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private[hive] case class MetastoreRelation(
113113
}
114114

115115
@transient override lazy val statistics: Statistics = {
116-
catalogTable.stats.getOrElse(Statistics(
116+
catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
117117
sizeInBytes = {
118118
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
119119
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)

sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
2323

2424
import org.apache.spark.sql._
2525
import org.apache.spark.sql.catalyst.TableIdentifier
26-
import org.apache.spark.sql.catalyst.plans.logical.Statistics
26+
import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
2727
import org.apache.spark.sql.execution.command.DDLUtils
2828
import org.apache.spark.sql.execution.datasources.LogicalRelation
2929
import org.apache.spark.sql.execution.joins._
@@ -152,7 +152,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
152152
}
153153

154154
private def checkTableStats(
155-
stats: Option[Statistics],
155+
stats: Option[CatalogStatistics],
156156
hasSizeInBytes: Boolean,
157157
expectedRowCounts: Option[Int]): Unit = {
158158
if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
@@ -168,7 +168,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
168168
tableName: String,
169169
isDataSourceTable: Boolean,
170170
hasSizeInBytes: Boolean,
171-
expectedRowCounts: Option[Int]): Option[Statistics] = {
171+
expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
172172
val df = sql(s"SELECT * FROM $tableName")
173173
val stats = df.queryExecution.analyzed.collect {
174174
case rel: MetastoreRelation =>
@@ -435,10 +435,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
435435
}
436436

437437
/** Used to test refreshing cached metadata once table stats are updated. */
438-
private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean): (Statistics, Statistics) = {
438+
private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean)
439+
: (CatalogStatistics, CatalogStatistics) = {
439440
val tableName = "tbl"
440-
var statsBeforeUpdate: Statistics = null
441-
var statsAfterUpdate: Statistics = null
441+
var statsBeforeUpdate: CatalogStatistics = null
442+
var statsAfterUpdate: CatalogStatistics = null
442443
withTable(tableName) {
443444
val tableIndent = TableIdentifier(tableName, Some("default"))
444445
val catalog = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]

0 commit comments

Comments
 (0)