Skip to content

Commit fb036c4

Browse files
wangzhenhuacloud-fan
authored andcommitted
[SPARK-20318][SQL] Use Catalyst type for min/max in ColumnStat for ease of estimation
## What changes were proposed in this pull request? Currently when estimating predicates like col > literal or col = literal, we will update min or max in column stats based on literal value. However, literal value is of Catalyst type (internal type), while min/max is of external type. Then for the next predicate, we again need to do type conversion to compare and update column stats. This is awkward and causes many unnecessary conversions in estimation. To solve this, we use Catalyst type for min/max in `ColumnStat`. Note that the persistent format in metastore is still of external type, so there's no inconsistency for statistics in metastore. This pr also fixes a bug for boolean type in `IN` condition. ## How was this patch tested? The changes for ColumnStat are covered by existing tests. For bug fix, a new test for boolean type in IN condition is added Author: wangzhenhua <[email protected]> Closes #17630 from wzhfy/refactorColumnStat.
1 parent 7536e28 commit fb036c4

File tree

10 files changed

+189
-182
lines changed

10 files changed

+189
-182
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala

Lines changed: 65 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
2525
import org.apache.spark.sql.{AnalysisException, Row}
2626
import org.apache.spark.sql.catalyst.expressions._
2727
import org.apache.spark.sql.catalyst.expressions.aggregate._
28+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2829
import org.apache.spark.sql.types._
2930
import org.apache.spark.util.Utils
3031

@@ -74,11 +75,10 @@ case class Statistics(
7475
* Statistics collected for a column.
7576
*
7677
* 1. Supported data types are defined in `ColumnStat.supportsType`.
77-
* 2. The JVM data type stored in min/max is the external data type (used in Row) for the
78-
* corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for
79-
* TimestampType we store java.sql.Timestamp.
80-
* 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs.
81-
* 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms
78+
* 2. The JVM data type stored in min/max is the internal data type for the corresponding
79+
* Catalyst data type. For example, the internal type of DateType is Int, and that the internal
80+
* type of TimestampType is Long.
81+
* 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
8282
* (sketches) might have been used, and the data collected can also be stale.
8383
*
8484
* @param distinctCount number of distinct values
@@ -104,22 +104,43 @@ case class ColumnStat(
104104
/**
105105
* Returns a map from string to string that can be used to serialize the column stats.
106106
* The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
107-
* representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]].
107+
* representation for the value. min/max values are converted to the external data type. For
108+
* example, for DateType we store java.sql.Date, and for TimestampType we store
109+
* java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
108110
*
109111
* As part of the protocol, the returned map always contains a key called "version".
110112
* In the case min/max values are null (None), they won't appear in the map.
111113
*/
112-
def toMap: Map[String, String] = {
114+
def toMap(colName: String, dataType: DataType): Map[String, String] = {
113115
val map = new scala.collection.mutable.HashMap[String, String]
114116
map.put(ColumnStat.KEY_VERSION, "1")
115117
map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
116118
map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
117119
map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
118120
map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
119-
min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, v.toString) }
120-
max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, v.toString) }
121+
min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
122+
max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
121123
map.toMap
122124
}
125+
126+
/**
127+
* Converts the given value from Catalyst data type to string representation of external
128+
* data type.
129+
*/
130+
private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
131+
val externalValue = dataType match {
132+
case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
133+
case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
134+
case BooleanType | _: IntegralType | FloatType | DoubleType => v
135+
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
136+
// This version of Spark does not use min/max for binary/string types so we ignore it.
137+
case _ =>
138+
throw new AnalysisException("Column statistics deserialization is not supported for " +
139+
s"column $colName of data type: $dataType.")
140+
}
141+
externalValue.toString
142+
}
143+
123144
}
124145

125146

@@ -150,28 +171,15 @@ object ColumnStat extends Logging {
150171
* Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
151172
* from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
152173
*/
153-
def fromMap(table: String, field: StructField, map: Map[String, String])
154-
: Option[ColumnStat] = {
155-
val str2val: (String => Any) = field.dataType match {
156-
case _: IntegralType => _.toLong
157-
case _: DecimalType => new java.math.BigDecimal(_)
158-
case DoubleType | FloatType => _.toDouble
159-
case BooleanType => _.toBoolean
160-
case DateType => java.sql.Date.valueOf
161-
case TimestampType => java.sql.Timestamp.valueOf
162-
// This version of Spark does not use min/max for binary/string types so we ignore it.
163-
case BinaryType | StringType => _ => null
164-
case _ =>
165-
throw new AnalysisException("Column statistics deserialization is not supported for " +
166-
s"column ${field.name} of data type: ${field.dataType}.")
167-
}
168-
174+
def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
169175
try {
170176
Some(ColumnStat(
171177
distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
172178
// Note that flatMap(Option.apply) turns Option(null) into None.
173-
min = map.get(KEY_MIN_VALUE).map(str2val).flatMap(Option.apply),
174-
max = map.get(KEY_MAX_VALUE).map(str2val).flatMap(Option.apply),
179+
min = map.get(KEY_MIN_VALUE)
180+
.map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
181+
max = map.get(KEY_MAX_VALUE)
182+
.map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
175183
nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
176184
avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
177185
maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong
@@ -183,6 +191,30 @@ object ColumnStat extends Logging {
183191
}
184192
}
185193

194+
/**
195+
* Converts from string representation of external data type to the corresponding Catalyst data
196+
* type.
197+
*/
198+
private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
199+
dataType match {
200+
case BooleanType => s.toBoolean
201+
case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
202+
case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
203+
case ByteType => s.toByte
204+
case ShortType => s.toShort
205+
case IntegerType => s.toInt
206+
case LongType => s.toLong
207+
case FloatType => s.toFloat
208+
case DoubleType => s.toDouble
209+
case _: DecimalType => Decimal(s)
210+
// This version of Spark does not use min/max for binary/string types so we ignore it.
211+
case BinaryType | StringType => null
212+
case _ =>
213+
throw new AnalysisException("Column statistics deserialization is not supported for " +
214+
s"column $name of data type: $dataType.")
215+
}
216+
}
217+
186218
/**
187219
* Constructs an expression to compute column statistics for a given column.
188220
*
@@ -232,11 +264,14 @@ object ColumnStat extends Logging {
232264
}
233265

234266
/** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
235-
def rowToColumnStat(row: Row): ColumnStat = {
267+
def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = {
236268
ColumnStat(
237269
distinctCount = BigInt(row.getLong(0)),
238-
min = Option(row.get(1)), // for string/binary min/max, get should return null
239-
max = Option(row.get(2)),
270+
// for string/binary min/max, get should return null
271+
min = Option(row.get(1))
272+
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
273+
max = Option(row.get(2))
274+
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
240275
nullCount = BigInt(row.getLong(3)),
241276
avgLen = row.getLong(4),
242277
maxLen = row.getLong(5)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.math.BigDecimal.RoundingMode
2222
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
2323
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
2424
import org.apache.spark.sql.internal.SQLConf
25-
import org.apache.spark.sql.types.{DataType, StringType}
25+
import org.apache.spark.sql.types.{DecimalType, _}
2626

2727

2828
object EstimationUtils {
@@ -75,4 +75,32 @@ object EstimationUtils {
7575
// (simple computation of statistics returns product of children).
7676
if (outputRowCount > 0) outputRowCount * sizePerRow else 1
7777
}
78+
79+
/**
80+
* For simplicity we use Decimal to unify operations for data types whose min/max values can be
81+
* represented as numbers, e.g. Boolean can be represented as 0 (false) or 1 (true).
82+
* The two methods below are the contract of conversion.
83+
*/
84+
def toDecimal(value: Any, dataType: DataType): Decimal = {
85+
dataType match {
86+
case _: NumericType | DateType | TimestampType => Decimal(value.toString)
87+
case BooleanType => if (value.asInstanceOf[Boolean]) Decimal(1) else Decimal(0)
88+
}
89+
}
90+
91+
def fromDecimal(dec: Decimal, dataType: DataType): Any = {
92+
dataType match {
93+
case BooleanType => dec.toLong == 1
94+
case DateType => dec.toInt
95+
case TimestampType => dec.toLong
96+
case ByteType => dec.toByte
97+
case ShortType => dec.toShort
98+
case IntegerType => dec.toInt
99+
case LongType => dec.toLong
100+
case FloatType => dec.toFloat
101+
case DoubleType => dec.toDouble
102+
case _: DecimalType => dec
103+
}
104+
}
105+
78106
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala

Lines changed: 21 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.spark.internal.Logging
2525
import org.apache.spark.sql.catalyst.expressions._
2626
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
2727
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, LeafNode, Statistics}
28-
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2928
import org.apache.spark.sql.internal.SQLConf
3029
import org.apache.spark.sql.types._
3130

@@ -301,30 +300,6 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
301300
}
302301
}
303302

304-
/**
305-
* For a SQL data type, its internal data type may be different from its external type.
306-
* For DateType, its internal type is Int, and its external data type is Java Date type.
307-
* The min/max values in ColumnStat are saved in their corresponding external type.
308-
*
309-
* @param attrDataType the column data type
310-
* @param litValue the literal value
311-
* @return a BigDecimal value
312-
*/
313-
def convertBoundValue(attrDataType: DataType, litValue: Any): Option[Any] = {
314-
attrDataType match {
315-
case DateType =>
316-
Some(DateTimeUtils.toJavaDate(litValue.toString.toInt))
317-
case TimestampType =>
318-
Some(DateTimeUtils.toJavaTimestamp(litValue.toString.toLong))
319-
case _: DecimalType =>
320-
Some(litValue.asInstanceOf[Decimal].toJavaBigDecimal)
321-
case StringType | BinaryType =>
322-
None
323-
case _ =>
324-
Some(litValue)
325-
}
326-
}
327-
328303
/**
329304
* Returns a percentage of rows meeting an equality (=) expression.
330305
* This method evaluates the equality predicate for all data types.
@@ -356,12 +331,16 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
356331
val statsRange = Range(colStat.min, colStat.max, attr.dataType)
357332
if (statsRange.contains(literal)) {
358333
if (update) {
359-
// We update ColumnStat structure after apply this equality predicate.
360-
// Set distinctCount to 1. Set nullCount to 0.
361-
// Need to save new min/max using the external type value of the literal
362-
val newValue = convertBoundValue(attr.dataType, literal.value)
363-
val newStats = colStat.copy(distinctCount = 1, min = newValue,
364-
max = newValue, nullCount = 0)
334+
// We update ColumnStat structure after apply this equality predicate:
335+
// Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal
336+
// value.
337+
val newStats = attr.dataType match {
338+
case StringType | BinaryType =>
339+
colStat.copy(distinctCount = 1, nullCount = 0)
340+
case _ =>
341+
colStat.copy(distinctCount = 1, min = Some(literal.value),
342+
max = Some(literal.value), nullCount = 0)
343+
}
365344
colStatsMap(attr) = newStats
366345
}
367346

@@ -430,18 +409,14 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
430409
return Some(0.0)
431410
}
432411

433-
// Need to save new min/max using the external type value of the literal
434-
val newMax = convertBoundValue(
435-
attr.dataType, validQuerySet.maxBy(v => BigDecimal(v.toString)))
436-
val newMin = convertBoundValue(
437-
attr.dataType, validQuerySet.minBy(v => BigDecimal(v.toString)))
438-
412+
val newMax = validQuerySet.maxBy(EstimationUtils.toDecimal(_, dataType))
413+
val newMin = validQuerySet.minBy(EstimationUtils.toDecimal(_, dataType))
439414
// newNdv should not be greater than the old ndv. For example, column has only 2 values
440415
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
441416
newNdv = ndv.min(BigInt(validQuerySet.size))
442417
if (update) {
443-
val newStats = colStat.copy(distinctCount = newNdv, min = newMin,
444-
max = newMax, nullCount = 0)
418+
val newStats = colStat.copy(distinctCount = newNdv, min = Some(newMin),
419+
max = Some(newMax), nullCount = 0)
445420
colStatsMap(attr) = newStats
446421
}
447422

@@ -478,8 +453,8 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
478453

479454
val colStat = colStatsMap(attr)
480455
val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange]
481-
val max = BigDecimal(statsRange.max)
482-
val min = BigDecimal(statsRange.min)
456+
val max = statsRange.max.toBigDecimal
457+
val min = statsRange.min.toBigDecimal
483458
val ndv = BigDecimal(colStat.distinctCount)
484459

485460
// determine the overlapping degree between predicate range and column's range
@@ -540,8 +515,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
540515
}
541516

542517
if (update) {
543-
// Need to save new min/max using the external type value of the literal
544-
val newValue = convertBoundValue(attr.dataType, literal.value)
518+
val newValue = Some(literal.value)
545519
var newMax = colStat.max
546520
var newMin = colStat.min
547521
var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
@@ -606,14 +580,14 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
606580
val colStatLeft = colStatsMap(attrLeft)
607581
val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType)
608582
.asInstanceOf[NumericRange]
609-
val maxLeft = BigDecimal(statsRangeLeft.max)
610-
val minLeft = BigDecimal(statsRangeLeft.min)
583+
val maxLeft = statsRangeLeft.max
584+
val minLeft = statsRangeLeft.min
611585

612586
val colStatRight = colStatsMap(attrRight)
613587
val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType)
614588
.asInstanceOf[NumericRange]
615-
val maxRight = BigDecimal(statsRangeRight.max)
616-
val minRight = BigDecimal(statsRangeRight.min)
589+
val maxRight = statsRangeRight.max
590+
val minRight = statsRangeRight.min
617591

618592
// determine the overlapping degree between predicate range and column's range
619593
val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)

0 commit comments

Comments
 (0)