Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,22 @@ private[parquet] class ParquetFilters(
*/
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToParquetField = getFieldMap(schema)
createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true)
createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts = true)
}

/**
* @param nameToParquetField a map from the field name to its field name and data type.
* This only includes the root fields whose types are primitive types.
* @param predicate the input filter predicates. Not all the predicates can be pushed down.
* @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed
* down safely. Pushing ONLY one side of AND down is safe to
* do at the top level or none of its ancestors is NOT and OR.
* @return the Parquet-native filter predicates that are eligible for pushdown.
*/
private def createFilterHelper(
nameToParquetField: Map[String, ParquetField],
predicate: sources.Filter,
canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = {
canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = {
// Decimal type must make sure that filter value's scale matched the file.
// If doesn't matched, which would cause data corruption.
def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
Expand Down Expand Up @@ -505,24 +514,28 @@ private[parquet] class ParquetFilters(
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
val lhsFilterOption =
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts)
val rhsFilterOption =
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts)

(lhsFilterOption, rhsFilterOption) match {
case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter)
case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter)
case _ => None
}

case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
lhsFilter <-
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = false)
rhsFilter <-
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false)
} yield FilterApi.or(lhsFilter, rhsFilter)

case sources.Not(pred) =>
createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false)
.map(FilterApi.not)

case sources.In(name, values) if canMakeFilterOn(name, values.head)
Expand Down