From b22ea803b9d62d79421754186e546147e8c6c051 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Sun, 19 May 2019 17:21:59 +0800 Subject: [PATCH] Add new method for getting pushed down filters in Parquet file reader --- .../parquet/ParquetFileFormat.scala | 6 +- .../datasources/parquet/ParquetFilters.scala | 190 +++++++++------- .../parquet/ParquetFilterSuite.scala | 214 +++++++++++++----- 3 files changed, 277 insertions(+), 133 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index e37f2283e00c..f2da159c5c95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -372,13 +372,13 @@ class ParquetFileFormat // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal, - pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp, + pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(parquetFilters.createFilter(parquetSchema, _)) + .flatMap(parquetFilters.createFilter(_)) .reduceOption(FilterApi.and) } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 22d71659b928..2578b0ba4305 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -41,12 +41,38 @@ import org.apache.spark.unsafe.types.UTF8String * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters( + schema: MessageType, pushDownDate: Boolean, pushDownTimestamp: Boolean, pushDownDecimal: Boolean, pushDownStartWith: Boolean, pushDownInFilterThreshold: Int, caseSensitive: Boolean) { + // A map which contains parquet field name and data type, if predicate push down applies. + private val nameToParquetField : Map[String, ParquetField] = { + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. + val primitiveFields = + schema.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => + f.getName -> ParquetField(f.getName, + ParquetSchemaType(f.getOriginalType, + f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) + } + if (caseSensitive) { + primitiveFields.toMap + } else { + // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive + // mode, just skip pushdown for these fields, they will trigger Exception when reading, + // See: SPARK-25132. + val dedupPrimitiveFields = + primitiveFields + .groupBy(_._1.toLowerCase(Locale.ROOT)) + .filter(_._2.size == 1) + .mapValues(_.head._2) + CaseInsensitiveMap(dedupPrimitiveFields) + } + } /** * Holds a single field information stored in the underlying parquet file. @@ -361,45 +387,95 @@ private[parquet] class ParquetFilters( FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } - /** - * Returns a map, which contains parquet field name and data type, if predicate push down applies. - */ - private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = { - // Here we don't flatten the fields in the nested schema but just look up through - // root fields. Currently, accessing to nested fields does not push down filters - // and it does not support to create filters for them. - val primitiveFields = - dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => - f.getName -> ParquetField(f.getName, - ParquetSchemaType(f.getOriginalType, - f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) - } - if (caseSensitive) { - primitiveFields.toMap - } else { - // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive - // mode, just skip pushdown for these fields, they will trigger Exception when reading, - // See: SPARK-25132. - val dedupPrimitiveFields = - primitiveFields - .groupBy(_._1.toLowerCase(Locale.ROOT)) - .filter(_._2.size == 1) - .mapValues(_.head._2) - CaseInsensitiveMap(dedupPrimitiveFields) + // Returns filters that can be pushed down when reading Parquet files. + def convertibleFilters(filters: Seq[sources.Filter]): Seq[sources.Filter] = { + filters.flatMap(convertibleFiltersHelper(_, canPartialPushDown = true)) + } + + private def convertibleFiltersHelper( + predicate: sources.Filter, + canPartialPushDown: Boolean): Option[sources.Filter] = { + predicate match { + case sources.And(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + (leftResultOptional, rightResultOptional) match { + case (Some(leftResult), Some(rightResult)) => Some(sources.And(leftResult, rightResult)) + case (Some(leftResult), None) if canPartialPushDown => Some(leftResult) + case (None, Some(rightResult)) if canPartialPushDown => Some(rightResult) + case _ => None + } + + case sources.Or(left, right) => + val leftResultOptional = convertibleFiltersHelper(left, canPartialPushDown) + val rightResultOptional = convertibleFiltersHelper(right, canPartialPushDown) + if (leftResultOptional.isEmpty || rightResultOptional.isEmpty) { + None + } else { + Some(sources.Or(leftResultOptional.get, rightResultOptional.get)) + } + case sources.Not(pred) => + val resultOptional = convertibleFiltersHelper(pred, canPartialPushDown = false) + resultOptional.map(sources.Not) + + case other => + if (createFilter(other).isDefined) { + Some(other) + } else { + None + } } } /** * Converts data sources filters to Parquet filter predicates. */ - def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { - val nameToParquetField = getFieldMap(schema) - createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts = true) + def createFilter(predicate: sources.Filter): Option[FilterPredicate] = { + createFilterHelper(predicate, canPartialPushDownConjuncts = true) + } + + // Parquet's type in the given file should be matched to the value's type + // in the pushed filter in order to push down the filter to Parquet. + private def valueCanMakeFilterOn(name: String, value: Any): Boolean = { + value == null || (nameToParquetField(name).fieldType match { + case ParquetBooleanType => value.isInstanceOf[JBoolean] + case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] + case ParquetLongType => value.isInstanceOf[JLong] + case ParquetFloatType => value.isInstanceOf[JFloat] + case ParquetDoubleType => value.isInstanceOf[JDouble] + case ParquetStringType => value.isInstanceOf[String] + case ParquetBinaryType => value.isInstanceOf[Array[Byte]] + case ParquetDateType => value.isInstanceOf[Date] + case ParquetTimestampMicrosType | ParquetTimestampMillisType => + value.isInstanceOf[Timestamp] + case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) => + isDecimalMatched(value, decimalMeta) + case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) => + isDecimalMatched(value, decimalMeta) + case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) => + isDecimalMatched(value, decimalMeta) + case _ => false + }) + } + + // Decimal type must make sure that filter value's scale matched the file. + // If doesn't matched, which would cause data corruption. + private def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { + case decimal: JBigDecimal => + decimal.scale == decimalMeta.getScale + case _ => false + } + + // Parquet does not allow dots in the column name because dots are used as a column path + // delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates + // with missing columns. The incorrect results could be got from Parquet when we push down + // filters for the column having dots in the names. Thus, we do not push down such filters. + // See SPARK-20364. + private def canMakeFilterOn(name: String, value: Any): Boolean = { + nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) } /** - * @param nameToParquetField a map from the field name to its field name and data type. - * This only includes the root fields whose types are primitive types. * @param predicate the input filter predicates. Not all the predicates can be pushed down. * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed * down safely. Pushing ONLY one side of AND down is safe to @@ -407,50 +483,8 @@ private[parquet] class ParquetFilters( * @return the Parquet-native filter predicates that are eligible for pushdown. */ private def createFilterHelper( - nameToParquetField: Map[String, ParquetField], predicate: sources.Filter, canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = { - // Decimal type must make sure that filter value's scale matched the file. - // If doesn't matched, which would cause data corruption. - def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { - case decimal: JBigDecimal => - decimal.scale == decimalMeta.getScale - case _ => false - } - - // Parquet's type in the given file should be matched to the value's type - // in the pushed filter in order to push down the filter to Parquet. - def valueCanMakeFilterOn(name: String, value: Any): Boolean = { - value == null || (nameToParquetField(name).fieldType match { - case ParquetBooleanType => value.isInstanceOf[JBoolean] - case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number] - case ParquetLongType => value.isInstanceOf[JLong] - case ParquetFloatType => value.isInstanceOf[JFloat] - case ParquetDoubleType => value.isInstanceOf[JDouble] - case ParquetStringType => value.isInstanceOf[String] - case ParquetBinaryType => value.isInstanceOf[Array[Byte]] - case ParquetDateType => value.isInstanceOf[Date] - case ParquetTimestampMicrosType | ParquetTimestampMillisType => - value.isInstanceOf[Timestamp] - case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) - case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) - case _ => false - }) - } - - // Parquet does not allow dots in the column name because dots are used as a column path - // delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates - // with missing columns. The incorrect results could be got from Parquet when we push down - // filters for the column having dots in the names. Thus, we do not push down such filters. - // See SPARK-20364. - def canMakeFilterOn(name: String, value: Any): Boolean = { - nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value) - } - // NOTE: // // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, @@ -515,9 +549,9 @@ private[parquet] class ParquetFilters( // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. val lhsFilterOption = - createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts) + createFilterHelper(lhs, canPartialPushDownConjuncts) val rhsFilterOption = - createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts) + createFilterHelper(rhs, canPartialPushDownConjuncts) (lhsFilterOption, rhsFilterOption) match { case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter)) @@ -539,14 +573,12 @@ private[parquet] class ParquetFilters( // (a1 OR b1) AND (a1 OR b2) AND (a2 OR b1) AND (a2 OR b2) // As per the logical in And predicate, we can push down (a1 OR b1). for { - lhsFilter <- - createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts) - rhsFilter <- - createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts) + lhsFilter <- createFilterHelper(lhs, canPartialPushDownConjuncts) + rhsFilter <- createFilterHelper(rhs, canPartialPushDownConjuncts) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false) + createFilterHelper(pred, canPartialPushDownConjuncts = false) .map(FilterApi.not) case sources.In(name, values) if canMakeFilterOn(name, values.head) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index f2b92ae61a52..28cb7611f0eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -24,6 +24,7 @@ import java.sql.{Date, Timestamp} import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} +import org.apache.parquet.schema.MessageType import org.apache.spark.SparkException import org.apache.spark.sql._ @@ -59,10 +60,13 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { - private lazy val parquetFilters = - new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, + private def createParquetFilters( + schema: MessageType, + caseSensitive: Option[Boolean] = None): ParquetFilters = + new ParquetFilters(schema, conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, - conf.parquetFilterPushDownInFilterThreshold, conf.caseSensitiveAnalysis) + conf.parquetFilterPushDownInFilterThreshold, + caseSensitive.getOrElse(conf.caseSensitiveAnalysis)) override def beforeEach(): Unit = { super.beforeEach() @@ -113,10 +117,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val (_, selectedFilters, _) = DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq) assert(selectedFilters.nonEmpty, "No filter is pushed down") - + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + val parquetFilters = createParquetFilters(schema) + // In this test suite, all the simple predicates are convertible here. + assert(parquetFilters.convertibleFilters(selectedFilters) == selectedFilters) val pushedParquetFilters = selectedFilters.map { pred => - val maybeFilter = parquetFilters.createFilter( - new SparkToParquetSchemaConverter(conf).convert(df.schema), pred) + val maybeFilter = parquetFilters.createFilter(pred) assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred") maybeFilter.get } @@ -523,9 +529,9 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> ParquetOutputTimestampType.INT96.toString) { withParquetDataFrame(millisData.map(i => Tuple1(i))) { implicit df => + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) assertResult(None) { - parquetFilters.createFilter( - new SparkToParquetSchemaConverter(conf).convert(df.schema), sources.IsNull("_1")) + createParquetFilters(schema).createFilter(sources.IsNull("_1")) } } } @@ -587,24 +593,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(decimal.scale() === scale) assert(decimal1.scale() === scale + 1) + val parquetFilters = createParquetFilters(parquetSchema) assertResult(Some(lt(intColumn("cdecimal1"), 1000: Integer))) { - parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal1", decimal)) + parquetFilters.createFilter(sources.LessThan("cdecimal1", decimal)) } assertResult(None) { - parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal1", decimal1)) + parquetFilters.createFilter(sources.LessThan("cdecimal1", decimal1)) } assertResult(Some(lt(longColumn("cdecimal2"), 1000L: java.lang.Long))) { - parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal2", decimal)) + parquetFilters.createFilter(sources.LessThan("cdecimal2", decimal)) } assertResult(None) { - parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal2", decimal1)) + parquetFilters.createFilter(sources.LessThan("cdecimal2", decimal1)) } - assert(parquetFilters.createFilter( - parquetSchema, sources.LessThan("cdecimal3", decimal)).isDefined) + assert(parquetFilters.createFilter(sources.LessThan("cdecimal3", decimal)).isDefined) assertResult(None) { - parquetFilters.createFilter(parquetSchema, sources.LessThan("cdecimal3", decimal1)) + parquetFilters.createFilter(sources.LessThan("cdecimal3", decimal1)) } } @@ -765,13 +771,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - + val parquetFilters = createParquetFilters(parquetSchema) assertResult(Some(and( lt(intColumn("a"), 10: Integer), gt(doubleColumn("c"), 1.5: java.lang.Double))) ) { parquetFilters.createFilter( - parquetSchema, sources.And( sources.LessThan("a", 10), sources.GreaterThan("c", 1.5D))) @@ -783,7 +788,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) assertResult(Some(lt(intColumn("a"), 10: Integer))) { parquetFilters.createFilter( - parquetSchema, sources.And( sources.LessThan("a", 10), sources.StringContains("b", "prefix"))) @@ -795,7 +799,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) assertResult(Some(lt(intColumn("a"), 10: Integer))) { parquetFilters.createFilter( - parquetSchema, sources.And( sources.StringContains("b", "prefix"), sources.LessThan("a", 10))) @@ -805,7 +808,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assertResult(Some( FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) { parquetFilters.createFilter( - parquetSchema, sources.And( sources.And( sources.LessThan("a", 10), @@ -818,7 +820,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assertResult(Some( FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) { parquetFilters.createFilter( - parquetSchema, sources.And( sources.GreaterThan("a", 5), sources.And( @@ -840,7 +841,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) assertResult(None) { parquetFilters.createFilter( - parquetSchema, sources.Not( sources.And( sources.GreaterThan("a", 1), @@ -860,7 +860,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) assertResult(None) { parquetFilters.createFilter( - parquetSchema, sources.Not( sources.And( sources.StringContains("b", "prefix"), @@ -879,7 +878,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) assertResult(None) { parquetFilters.createFilter( - parquetSchema, sources.Not( sources.And( sources.And( @@ -900,7 +898,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) assertResult(None) { parquetFilters.createFilter( - parquetSchema, sources.Not( sources.And( sources.GreaterThan("a", 2), @@ -918,6 +915,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + val parquetFilters = createParquetFilters(parquetSchema) // Testing // case sources.Or(lhs, rhs) => // ... @@ -925,7 +923,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assertResult(Some( FilterApi.or(gt(intColumn("a"), 1: Integer), gt(intColumn("a"), 2: Integer)))) { parquetFilters.createFilter( - parquetSchema, sources.Or( sources.And( sources.GreaterThan("a", 1), @@ -940,7 +937,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assertResult(Some( FilterApi.or(gt(intColumn("a"), 2: Integer), gt(intColumn("a"), 1: Integer)))) { parquetFilters.createFilter( - parquetSchema, sources.Or( sources.GreaterThan("a", 2), sources.And( @@ -956,7 +952,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assertResult(Some( FilterApi.or(gt(intColumn("a"), 1: Integer), lt(intColumn("a"), 0: Integer)))) { parquetFilters.createFilter( - parquetSchema, sources.Or( sources.And( sources.GreaterThan("a", 1), @@ -967,6 +962,128 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } + test("SPARK-27698 Convertible Parquet filter predicates") { + val schema = StructType(Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = true), + StructField("c", DoubleType, nullable = true) + )) + + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + val parquetFilters = createParquetFilters(parquetSchema) + assertResult(Seq(sources.And(sources.LessThan("a", 10), sources.GreaterThan("c", 1.5D)))) { + parquetFilters.convertibleFilters( + Seq(sources.And( + sources.LessThan("a", 10), + sources.GreaterThan("c", 1.5D)))) + } + + assertResult(Seq(sources.LessThan("a", 10))) { + parquetFilters.convertibleFilters( + Seq(sources.And( + sources.LessThan("a", 10), + sources.StringContains("b", "prefix")))) + } + + assertResult(Seq(sources.LessThan("a", 10))) { + parquetFilters.convertibleFilters( + Seq(sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10)))) + } + + // Testing complex And conditions + assertResult(Seq(sources.And(sources.LessThan("a", 10), sources.GreaterThan("a", 5)))) { + parquetFilters.convertibleFilters( + Seq(sources.And( + sources.And( + sources.LessThan("a", 10), + sources.StringContains("b", "prefix") + ), + sources.GreaterThan("a", 5)))) + } + + // Testing complex And conditions + assertResult(Seq(sources.And(sources.GreaterThan("a", 5), sources.LessThan("a", 10)))) { + parquetFilters.convertibleFilters( + Seq(sources.And( + sources.GreaterThan("a", 5), + sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10) + )))) + } + + // Testing complex And conditions + assertResult(Seq(sources.Or(sources.GreaterThan("a", 1), sources.GreaterThan("a", 2)))) { + parquetFilters.convertibleFilters( + Seq(sources.Or( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2)))) + } + + // Testing complex And/Or conditions, the And condition under Or condition can't be pushed down. + assertResult(Seq(sources.And(sources.LessThan("a", 10), + sources.Or(sources.GreaterThan("a", 1), sources.GreaterThan("a", 2))))) { + parquetFilters.convertibleFilters( + Seq(sources.And( + sources.LessThan("a", 10), + sources.Or( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2))))) + } + + assertResult(Seq(sources.Or(sources.GreaterThan("a", 2), sources.GreaterThan("c", 1.1)))) { + parquetFilters.convertibleFilters( + Seq(sources.Or( + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("c", 1.1), + sources.StringContains("b", "prefix"))))) + } + + // Testing complex Not conditions. + assertResult(Seq.empty) { + parquetFilters.convertibleFilters( + Seq(sources.Not( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix"))))) + } + + assertResult(Seq.empty) { + parquetFilters.convertibleFilters( + Seq(sources.Not( + sources.And( + sources.StringContains("b", "prefix"), + sources.GreaterThan("a", 1))))) + } + + assertResult(Seq.empty) { + parquetFilters.convertibleFilters( + Seq(sources.Not( + sources.And( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2))))) + } + + assertResult(Seq.empty) { + parquetFilters.convertibleFilters( + Seq(sources.Not( + sources.And( + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")))))) + } + } + test("SPARK-16371 Do not push down filters when inner name and outer name are the same") { withParquetDataFrame((1 to 4).map(i => Tuple1(Tuple1(i)))) { implicit df => // Here the schema becomes as below: @@ -1127,10 +1244,9 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex Seq("1str1", "2str2", "3str3", "4str4").map(Row(_))) } + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) assertResult(None) { - parquetFilters.createFilter( - new SparkToParquetSchemaConverter(conf).convert(df.schema), - sources.StringStartsWith("_1", null)) + createParquetFilters(schema).createFilter(sources.StringStartsWith("_1", null)) } } @@ -1147,18 +1263,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex )) val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - + val parquetFilters = createParquetFilters(parquetSchema) assertResult(Some(FilterApi.eq(intColumn("a"), null: Integer))) { - parquetFilters.createFilter(parquetSchema, sources.In("a", Array(null))) + parquetFilters.createFilter(sources.In("a", Array(null))) } assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) { - parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10))) + parquetFilters.createFilter(sources.In("a", Array(10))) } // Remove duplicates assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) { - parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 10))) + parquetFilters.createFilter(sources.In("a", Array(10, 10))) } assertResult(Some(or(or( @@ -1166,12 +1282,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex FilterApi.eq(intColumn("a"), 20: Integer)), FilterApi.eq(intColumn("a"), 30: Integer))) ) { - parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 20, 30))) + parquetFilters.createFilter(sources.In("a", Array(10, 20, 30))) } - assert(parquetFilters.createFilter(parquetSchema, sources.In("a", + assert(parquetFilters.createFilter(sources.In("a", Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined) - assert(parquetFilters.createFilter(parquetSchema, sources.In("a", + assert(parquetFilters.createFilter(sources.In("a", Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isEmpty) import testImplicits._ @@ -1203,25 +1319,20 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } test("SPARK-25207: Case-insensitive field resolution for pushdown when reading parquet") { - def createParquetFilter(caseSensitive: Boolean): ParquetFilters = { - new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownTimestamp, - conf.parquetFilterPushDownDecimal, conf.parquetFilterPushDownStringStartWith, - conf.parquetFilterPushDownInFilterThreshold, caseSensitive) - } - val caseSensitiveParquetFilters = createParquetFilter(caseSensitive = true) - val caseInsensitiveParquetFilters = createParquetFilter(caseSensitive = false) - def testCaseInsensitiveResolution( schema: StructType, expected: FilterPredicate, filter: sources.Filter): Unit = { val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) - + val caseSensitiveParquetFilters = + createParquetFilters(parquetSchema, caseSensitive = Some(true)) + val caseInsensitiveParquetFilters = + createParquetFilters(parquetSchema, caseSensitive = Some(false)) assertResult(Some(expected)) { - caseInsensitiveParquetFilters.createFilter(parquetSchema, filter) + caseInsensitiveParquetFilters.createFilter(filter) } assertResult(None) { - caseSensitiveParquetFilters.createFilter(parquetSchema, filter) + caseSensitiveParquetFilters.createFilter(filter) } } @@ -1278,9 +1389,10 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val dupFieldSchema = StructType( Seq(StructField("cint", IntegerType), StructField("cINT", IntegerType))) val dupParquetSchema = new SparkToParquetSchemaConverter(conf).convert(dupFieldSchema) + val dupCaseInsensitiveParquetFilters = + createParquetFilters(dupParquetSchema, caseSensitive = Some(false)) assertResult(None) { - caseInsensitiveParquetFilters.createFilter( - dupParquetSchema, sources.EqualTo("CINT", 1000)) + dupCaseInsensitiveParquetFilters.createFilter(sources.EqualTo("CINT", 1000)) } }