Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -74,11 +75,10 @@ case class Statistics(
* Statistics collected for a column.
*
* 1. Supported data types are defined in `ColumnStat.supportsType`.
* 2. The JVM data type stored in min/max is the external data type (used in Row) for the
* corresponding Catalyst data type. For example, for DateType we store java.sql.Date, and for
* TimestampType we store java.sql.Timestamp.
* 3. For integral types, they are all upcasted to longs, i.e. shorts are stored as longs.
* 4. There is no guarantee that the statistics collected are accurate. Approximation algorithms
* 2. The JVM data type stored in min/max is the internal data type for the corresponding
* Catalyst data type. For example, the internal type of DateType is Int, and that the internal
* type of TimestampType is Long.
* 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
* (sketches) might have been used, and the data collected can also be stale.
*
* @param distinctCount number of distinct values
Expand All @@ -104,22 +104,43 @@ case class ColumnStat(
/**
* Returns a map from string to string that can be used to serialize the column stats.
* The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
* representation for the value. The deserialization side is defined in [[ColumnStat.fromMap]].
* representation for the value. min/max values are converted to the external data type. For
* example, for DateType we store java.sql.Date, and for TimestampType we store
* java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
*
* As part of the protocol, the returned map always contains a key called "version".
* In the case min/max values are null (None), they won't appear in the map.
*/
def toMap: Map[String, String] = {
def toMap(colName: String, dataType: DataType): Map[String, String] = {
val map = new scala.collection.mutable.HashMap[String, String]
map.put(ColumnStat.KEY_VERSION, "1")
map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, v.toString) }
max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, v.toString) }
min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
map.toMap
}

/**
* Converts the given value from Catalyst data type to string representation of external
* data type.
*/
private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
val externalValue = dataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we get the externalValue first and then call toString? this means for long we will do l.toString.toLong.toString

Copy link
Contributor Author

@wzhfy wzhfy Apr 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good point. I should use asInstance to replace all these toString/toLong. Then call toString after conversion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should just return string in each cases of this pattern match.

case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
case BooleanType | _: IntegralType | FloatType | DoubleType => v
case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
// This version of Spark does not use min/max for binary/string types so we ignore it.
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $colName of data type: $dataType.")
}
externalValue.toString
}

}


Expand Down Expand Up @@ -150,28 +171,15 @@ object ColumnStat extends Logging {
* Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
* from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
*/
def fromMap(table: String, field: StructField, map: Map[String, String])
: Option[ColumnStat] = {
val str2val: (String => Any) = field.dataType match {
case _: IntegralType => _.toLong
case _: DecimalType => new java.math.BigDecimal(_)
case DoubleType | FloatType => _.toDouble
case BooleanType => _.toBoolean
case DateType => java.sql.Date.valueOf
case TimestampType => java.sql.Timestamp.valueOf
// This version of Spark does not use min/max for binary/string types so we ignore it.
case BinaryType | StringType => _ => null
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column ${field.name} of data type: ${field.dataType}.")
}

def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
try {
Some(ColumnStat(
distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
// Note that flatMap(Option.apply) turns Option(null) into None.
min = map.get(KEY_MIN_VALUE).map(str2val).flatMap(Option.apply),
max = map.get(KEY_MAX_VALUE).map(str2val).flatMap(Option.apply),
min = map.get(KEY_MIN_VALUE)
.map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
max = map.get(KEY_MAX_VALUE)
.map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong
Expand All @@ -183,6 +191,30 @@ object ColumnStat extends Logging {
}
}

/**
* Converts from string representation of external data type to the corresponding Catalyst data
* type.
*/
private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
dataType match {
case BooleanType => s.toBoolean
case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
case ByteType => s.toByte
case ShortType => s.toShort
case IntegerType => s.toInt
case LongType => s.toLong
case FloatType => s.toFloat
case DoubleType => s.toDouble
case _: DecimalType => Decimal(s)
// This version of Spark does not use min/max for binary/string types so we ignore it.
case BinaryType | StringType => null
case _ =>
throw new AnalysisException("Column statistics deserialization is not supported for " +
s"column $name of data type: $dataType.")
}
}

/**
* Constructs an expression to compute column statistics for a given column.
*
Expand Down Expand Up @@ -232,11 +264,14 @@ object ColumnStat extends Logging {
}

/** Convert a struct for column stats (defined in statExprs) into [[ColumnStat]]. */
def rowToColumnStat(row: Row): ColumnStat = {
def rowToColumnStat(row: Row, attr: Attribute): ColumnStat = {
ColumnStat(
distinctCount = BigInt(row.getLong(0)),
min = Option(row.get(1)), // for string/binary min/max, get should return null
max = Option(row.get(2)),
// for string/binary min/max, get should return null
min = Option(row.get(1))
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
max = Option(row.get(2))
.map(v => fromExternalString(v.toString, attr.name, attr.dataType)).flatMap(Option.apply),
nullCount = BigInt(row.getLong(3)),
avgLen = row.getLong(4),
maxLen = row.getLong(5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.math.BigDecimal.RoundingMode
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.sql.types.{DecimalType, _}


object EstimationUtils {
Expand Down Expand Up @@ -75,4 +75,32 @@ object EstimationUtils {
// (simple computation of statistics returns product of children).
if (outputRowCount > 0) outputRowCount * sizePerRow else 1
}

/**
* For simplicity we use Decimal to unify operations for data types whose min/max values can be
* represented as numbers, e.g. Boolean can be represented as 0 (false) or 1 (true).
* The two methods below are the contract of conversion.
*/
def toDecimal(value: Any, dataType: DataType): Decimal = {
dataType match {
case _: NumericType | DateType | TimestampType => Decimal(value.toString)
case BooleanType => if (value.asInstanceOf[Boolean]) Decimal(1) else Decimal(0)
}
}

def fromDecimal(dec: Decimal, dataType: DataType): Any = {
dataType match {
case BooleanType => dec.toLong == 1
case DateType => dec.toInt
case TimestampType => dec.toLong
case ByteType => dec.toByte
case ShortType => dec.toShort
case IntegerType => dec.toInt
case LongType => dec.toLong
case FloatType => dec.toFloat
case DoubleType => dec.toDouble
case _: DecimalType => dec
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Filter, LeafNode, Statistics}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -301,30 +300,6 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
}
}

/**
* For a SQL data type, its internal data type may be different from its external type.
* For DateType, its internal type is Int, and its external data type is Java Date type.
* The min/max values in ColumnStat are saved in their corresponding external type.
*
* @param attrDataType the column data type
* @param litValue the literal value
* @return a BigDecimal value
*/
def convertBoundValue(attrDataType: DataType, litValue: Any): Option[Any] = {
attrDataType match {
case DateType =>
Some(DateTimeUtils.toJavaDate(litValue.toString.toInt))
case TimestampType =>
Some(DateTimeUtils.toJavaTimestamp(litValue.toString.toLong))
case _: DecimalType =>
Some(litValue.asInstanceOf[Decimal].toJavaBigDecimal)
case StringType | BinaryType =>
None
case _ =>
Some(litValue)
}
}

/**
* Returns a percentage of rows meeting an equality (=) expression.
* This method evaluates the equality predicate for all data types.
Expand Down Expand Up @@ -356,12 +331,16 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
val statsRange = Range(colStat.min, colStat.max, attr.dataType)
if (statsRange.contains(literal)) {
if (update) {
// We update ColumnStat structure after apply this equality predicate.
// Set distinctCount to 1. Set nullCount to 0.
// Need to save new min/max using the external type value of the literal
val newValue = convertBoundValue(attr.dataType, literal.value)
val newStats = colStat.copy(distinctCount = 1, min = newValue,
max = newValue, nullCount = 0)
// We update ColumnStat structure after apply this equality predicate:
// Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal
// value.
val newStats = attr.dataType match {
case StringType | BinaryType =>
colStat.copy(distinctCount = 1, nullCount = 0)
case _ =>
colStat.copy(distinctCount = 1, min = Some(literal.value),
max = Some(literal.value), nullCount = 0)
}
colStatsMap(attr) = newStats
}

Expand Down Expand Up @@ -430,18 +409,14 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
return Some(0.0)
}

// Need to save new min/max using the external type value of the literal
val newMax = convertBoundValue(
attr.dataType, validQuerySet.maxBy(v => BigDecimal(v.toString)))
val newMin = convertBoundValue(
attr.dataType, validQuerySet.minBy(v => BigDecimal(v.toString)))

val newMax = validQuerySet.maxBy(EstimationUtils.toDecimal(_, dataType))
val newMin = validQuerySet.minBy(EstimationUtils.toDecimal(_, dataType))
// newNdv should not be greater than the old ndv. For example, column has only 2 values
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
newNdv = ndv.min(BigInt(validQuerySet.size))
if (update) {
val newStats = colStat.copy(distinctCount = newNdv, min = newMin,
max = newMax, nullCount = 0)
val newStats = colStat.copy(distinctCount = newNdv, min = Some(newMin),
max = Some(newMax), nullCount = 0)
colStatsMap(attr) = newStats
}

Expand Down Expand Up @@ -478,8 +453,8 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging

val colStat = colStatsMap(attr)
val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange]
val max = BigDecimal(statsRange.max)
val min = BigDecimal(statsRange.min)
val max = statsRange.max.toBigDecimal
val min = statsRange.min.toBigDecimal
val ndv = BigDecimal(colStat.distinctCount)

// determine the overlapping degree between predicate range and column's range
Expand Down Expand Up @@ -540,8 +515,7 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
}

if (update) {
// Need to save new min/max using the external type value of the literal
val newValue = convertBoundValue(attr.dataType, literal.value)
val newValue = Some(literal.value)
var newMax = colStat.max
var newMin = colStat.min
var newNdv = (ndv * percent).setScale(0, RoundingMode.HALF_UP).toBigInt()
Expand Down Expand Up @@ -606,14 +580,14 @@ case class FilterEstimation(plan: Filter, catalystConf: SQLConf) extends Logging
val colStatLeft = colStatsMap(attrLeft)
val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType)
.asInstanceOf[NumericRange]
val maxLeft = BigDecimal(statsRangeLeft.max)
val minLeft = BigDecimal(statsRangeLeft.min)
val maxLeft = statsRangeLeft.max
val minLeft = statsRangeLeft.min

val colStatRight = colStatsMap(attrRight)
val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType)
.asInstanceOf[NumericRange]
val maxRight = BigDecimal(statsRangeRight.max)
val minRight = BigDecimal(statsRangeRight.min)
val maxRight = statsRangeRight.max
val minRight = statsRangeRight.min

// determine the overlapping degree between predicate range and column's range
val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
Expand Down
Loading