Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,13 @@ class ParquetFileFormat
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(parquetSchema, _))
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,38 @@ import org.apache.spark.unsafe.types.UTF8String
* Some utility function to convert Spark data source filters to Parquet filters.
*/
private[parquet] class ParquetFilters(
schema: MessageType,
pushDownDate: Boolean,
pushDownTimestamp: Boolean,
pushDownDecimal: Boolean,
pushDownStartWith: Boolean,
pushDownInFilterThreshold: Int,
caseSensitive: Boolean) {
// A map which contains parquet field name and data type, if predicate push down applies.
private val nameToParquetField : Map[String, ParquetField] = {
// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
val primitiveFields =
schema.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
f.getName -> ParquetField(f.getName,
ParquetSchemaType(f.getOriginalType,
f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
}
if (caseSensitive) {
primitiveFields.toMap
} else {
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
// mode, just skip pushdown for these fields, they will trigger Exception when reading,
// See: SPARK-25132.
val dedupPrimitiveFields =
primitiveFields
.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
.mapValues(_.head._2)
CaseInsensitiveMap(dedupPrimitiveFields)
}
}

/**
* Holds a single field information stored in the underlying parquet file.
Expand Down Expand Up @@ -361,96 +387,104 @@ private[parquet] class ParquetFilters(
FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length))
}

/**
* Returns a map, which contains parquet field name and data type, if predicate push down applies.
*/
private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang, don't have to move codes around to make it easier to track ...

// Here we don't flatten the fields in the nested schema but just look up through
// root fields. Currently, accessing to nested fields does not push down filters
// and it does not support to create filters for them.
val primitiveFields =
dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
f.getName -> ParquetField(f.getName,
ParquetSchemaType(f.getOriginalType,
f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
}
if (caseSensitive) {
primitiveFields.toMap
} else {
// Don't consider ambiguity here, i.e. more than one field is matched in case insensitive
// mode, just skip pushdown for these fields, they will trigger Exception when reading,
// See: SPARK-25132.
val dedupPrimitiveFields =
primitiveFields
.groupBy(_._1.toLowerCase(Locale.ROOT))
.filter(_._2.size == 1)
.mapValues(_.head._2)
CaseInsensitiveMap(dedupPrimitiveFields)
// Returns filters that can be pushed down when reading Parquet files.
def convertibleFilters(filters: Seq[sources.Filter]): Seq[sources.Filter] = {
filters.flatMap(convertibleFiltersHelper(_, canPartialPushDown = true))
}

private def convertibleFiltersHelper(
predicate: sources.Filter,
canPartialPushDown: Boolean): Option[sources.Filter] = {
predicate match {
case sources.And(left, right) =>
val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown)
val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown)
(leftResultOptional, rightResultOptional) match {
case (Some(leftResult), Some(rightResult)) => Some(sources.And(leftResult, rightResult))
case (Some(leftResult), None) if canPartialPushDown => Some(leftResult)
case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult)
case _ => None
}

case sources.Or(left, right) =>
val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown)
val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown)
if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) {
None
} else {
Some(sources.Or(leftResultOptional.get, rightResultOptional.get))
}
case sources.Not(pred) =>
val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false)
resultOptional.map(sources.Not)

case other =>
if (createFilter(other).isDefined) {
Some(other)
} else {
None
}
}
}

/**
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToParquetField = getFieldMap(schema)
createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts = true)
def createFilter(predicate: sources.Filter): Option[FilterPredicate] = {
createFilterHelper(predicate, canPartialPushDownConjuncts = true)
}

// Parquet's type in the given file should be matched to the value's type
// in the pushed filter in order to push down the filter to Parquet.
private def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
value == null || (nameToParquetField(name).fieldType match {
case ParquetBooleanType => value.isInstanceOf[JBoolean]
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
case ParquetLongType => value.isInstanceOf[JLong]
case ParquetFloatType => value.isInstanceOf[JFloat]
case ParquetDoubleType => value.isInstanceOf[JDouble]
case ParquetStringType => value.isInstanceOf[String]
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
case ParquetDateType => value.isInstanceOf[Date]
case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
value.isInstanceOf[Timestamp]
case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
isDecimalMatched(value, decimalMeta)
case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
isDecimalMatched(value, decimalMeta)
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) =>
isDecimalMatched(value, decimalMeta)
case _ => false
})
}

// Decimal type must make sure that filter value's scale matched the file.
// If doesn't matched, which would cause data corruption.
private def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
case decimal: JBigDecimal =>
decimal.scale == decimalMeta.getScale
case _ => false
}

// Parquet does not allow dots in the column name because dots are used as a column path
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
// with missing columns. The incorrect results could be got from Parquet when we push down
// filters for the column having dots in the names. Thus, we do not push down such filters.
// See SPARK-20364.
private def canMakeFilterOn(name: String, value: Any): Boolean = {
nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
}

/**
* @param nameToParquetField a map from the field name to its field name and data type.
* This only includes the root fields whose types are primitive types.
* @param predicate the input filter predicates. Not all the predicates can be pushed down.
* @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed
* down safely. Pushing ONLY one side of AND down is safe to
* do at the top level or none of its ancestors is NOT and OR.
* @return the Parquet-native filter predicates that are eligible for pushdown.
*/
private def createFilterHelper(
nameToParquetField: Map[String, ParquetField],
predicate: sources.Filter,
canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = {
// Decimal type must make sure that filter value's scale matched the file.
// If doesn't matched, which would cause data corruption.
def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
case decimal: JBigDecimal =>
decimal.scale == decimalMeta.getScale
case _ => false
}

// Parquet's type in the given file should be matched to the value's type
// in the pushed filter in order to push down the filter to Parquet.
def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
value == null || (nameToParquetField(name).fieldType match {
case ParquetBooleanType => value.isInstanceOf[JBoolean]
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
case ParquetLongType => value.isInstanceOf[JLong]
case ParquetFloatType => value.isInstanceOf[JFloat]
case ParquetDoubleType => value.isInstanceOf[JDouble]
case ParquetStringType => value.isInstanceOf[String]
case ParquetBinaryType => value.isInstanceOf[Array[Byte]]
case ParquetDateType => value.isInstanceOf[Date]
case ParquetTimestampMicrosType | ParquetTimestampMillisType =>
value.isInstanceOf[Timestamp]
case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) =>
isDecimalMatched(value, decimalMeta)
case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) =>
isDecimalMatched(value, decimalMeta)
case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) =>
isDecimalMatched(value, decimalMeta)
case _ => false
})
}

// Parquet does not allow dots in the column name because dots are used as a column path
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
// with missing columns. The incorrect results could be got from Parquet when we push down
// filters for the column having dots in the names. Thus, we do not push down such filters.
// See SPARK-20364.
def canMakeFilterOn(name: String, value: Any): Boolean = {
nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
}

// NOTE:
//
// For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`,
Expand Down Expand Up @@ -515,9 +549,9 @@ private[parquet] class ParquetFilters(
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
val lhsFilterOption =
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts)
createFilterHelper(lhs, canPartialPushDownConjuncts)
val rhsFilterOption =
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts)
createFilterHelper(rhs, canPartialPushDownConjuncts)

(lhsFilterOption, rhsFilterOption) match {
case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
Expand All @@ -539,14 +573,12 @@ private[parquet] class ParquetFilters(
// (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2)
// As per the logical in And predicate, we can push down (a1 OR b1).
for {
lhsFilter <-
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts)
rhsFilter <-
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts)
lhsFilter <- createFilterHelper(lhs, canPartialPushDownConjuncts)
rhsFilter <- createFilterHelper(rhs, canPartialPushDownConjuncts)
} yield FilterApi.or(lhsFilter, rhsFilter)

case sources.Not(pred) =>
createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false)
createFilterHelper(pred, canPartialPushDownConjuncts = false)
.map(FilterApi.not)

case sources.In(name, values) if canMakeFilterOn(name, values.head)
Expand Down
Loading