Skip to content

Commit 021145f

Browse files
wangyumcloud-fan
authored andcommitted
[SPARK-24716][SQL] Refactor ParquetFilters
## What changes were proposed in this pull request? Replace DataFrame schema to Parquet file schema when create `ParquetFilters`. Thus we can easily implement `Decimal` and `Timestamp` push down. some thing like this: ```scala // DecimalType: 32BitDecimalType case ParquetSchemaType(DECIMAL, INT32, decimal) if pushDownDecimal => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().intValue() .asInstanceOf[Integer]).orNull) // DecimalType: 64BitDecimalType case ParquetSchemaType(DECIMAL, INT64, decimal) if pushDownDecimal => (n: String, v: Any) => FilterApi.eq( longColumn(n), Option(v).map(_.asInstanceOf[JBigDecimal].unscaledValue().longValue() .asInstanceOf[java.lang.Long]).orNull) // DecimalType: LegacyParquetFormat 32BitDecimalType & 64BitDecimalType case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal) if pushDownDecimal && decimal.getPrecision <= Decimal.MAX_LONG_DIGITS => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(d => decimalToBinaryUsingUnscaledLong(decimal.getPrecision, d.asInstanceOf[JBigDecimal])).orNull) // DecimalType: ByteArrayDecimalType case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, decimal) if pushDownDecimal && decimal.getPrecision > Decimal.MAX_LONG_DIGITS => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(d => decimalToBinaryUsingUnscaledBytes(decimal.getPrecision, d.asInstanceOf[JBigDecimal])).orNull) ``` ```scala // INT96 doesn't support pushdown case ParquetSchemaType(TIMESTAMP_MICROS, INT64, null) => (n: String, v: Any) => FilterApi.eq( longColumn(n), Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) .asInstanceOf[java.lang.Long]).orNull) case ParquetSchemaType(TIMESTAMP_MILLIS, INT64, null) => (n: String, v: Any) => FilterApi.eq( longColumn(n), Option(v).map(_.asInstanceOf[Timestamp].getTime.asInstanceOf[java.lang.Long]).orNull) ``` ## How was this patch tested? unit tests Author: Yuming Wang <[email protected]> Closes #21696 from wangyum/SPARK-24716.
1 parent b2deef6 commit 021145f

File tree

4 files changed

+121
-101
lines changed

4 files changed

+121
-101
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -353,38 +353,42 @@ class ParquetFileFormat
353353
(file: PartitionedFile) => {
354354
assert(file.partitionValues.numFields == partitionSchema.size)
355355

356-
// Try to push down filters when filter push-down is enabled.
357-
val pushed = if (enableParquetFilterPushDown) {
358-
filters
359-
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
360-
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
361-
// is used here.
362-
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
363-
.createFilter(requiredSchema, _))
364-
.reduceOption(FilterApi.and)
365-
} else {
366-
None
367-
}
368-
369356
val fileSplit =
370357
new FileSplit(new Path(new URI(file.filePath)), file.start, file.length, Array.empty)
358+
val filePath = fileSplit.getPath
371359

372360
val split =
373361
new org.apache.parquet.hadoop.ParquetInputSplit(
374-
fileSplit.getPath,
362+
filePath,
375363
fileSplit.getStart,
376364
fileSplit.getStart + fileSplit.getLength,
377365
fileSplit.getLength,
378366
fileSplit.getLocations,
379367
null)
380368

381369
val sharedConf = broadcastedHadoopConf.value.value
370+
371+
// Try to push down filters when filter push-down is enabled.
372+
val pushed = if (enableParquetFilterPushDown) {
373+
val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
374+
.getFileMetaData.getSchema
375+
filters
376+
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
377+
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
378+
// is used here.
379+
.flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith)
380+
.createFilter(parquetSchema, _))
381+
.reduceOption(FilterApi.and)
382+
} else {
383+
None
384+
}
385+
382386
// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
383387
// *only* if the file was created by something other than "parquet-mr", so check the actual
384388
// writer here for this file. We have to do this per-file, as each file in the table may
385389
// have different writers.
386390
def isCreatedByParquetMr(): Boolean = {
387-
val footer = ParquetFileReader.readFooter(sharedConf, fileSplit.getPath, SKIP_ROW_GROUPS)
391+
val footer = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS)
388392
footer.getFileMetaData().getCreatedBy().startsWith("parquet-mr")
389393
}
390394
val convertTz =

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 93 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -19,187 +19,200 @@ package org.apache.spark.sql.execution.datasources.parquet
1919

2020
import java.sql.Date
2121

22+
import scala.collection.JavaConverters.asScalaBufferConverter
23+
2224
import org.apache.parquet.filter2.predicate._
2325
import org.apache.parquet.filter2.predicate.FilterApi._
2426
import org.apache.parquet.io.api.Binary
25-
import org.apache.parquet.schema.PrimitiveComparator
27+
import org.apache.parquet.schema.{DecimalMetadata, MessageType, OriginalType, PrimitiveComparator, PrimitiveType}
28+
import org.apache.parquet.schema.OriginalType._
29+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
30+
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
2631

2732
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2833
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
2934
import org.apache.spark.sql.sources
30-
import org.apache.spark.sql.types._
3135
import org.apache.spark.unsafe.types.UTF8String
3236

3337
/**
3438
* Some utility function to convert Spark data source filters to Parquet filters.
3539
*/
3640
private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) {
3741

42+
private case class ParquetSchemaType(
43+
originalType: OriginalType,
44+
primitiveTypeName: PrimitiveTypeName,
45+
decimalMetadata: DecimalMetadata)
46+
47+
private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
48+
private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
49+
private val ParquetLongType = ParquetSchemaType(null, INT64, null)
50+
private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
51+
private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
52+
private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
53+
private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
54+
private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
55+
3856
private def dateToDays(date: Date): SQLDate = {
3957
DateTimeUtils.fromJavaDate(date)
4058
}
4159

42-
private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
43-
case BooleanType =>
60+
private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
61+
case ParquetBooleanType =>
4462
(n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
45-
case IntegerType =>
63+
case ParquetIntegerType =>
4664
(n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer])
47-
case LongType =>
65+
case ParquetLongType =>
4866
(n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long])
49-
case FloatType =>
67+
case ParquetFloatType =>
5068
(n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float])
51-
case DoubleType =>
69+
case ParquetDoubleType =>
5270
(n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
5371

5472
// Binary.fromString and Binary.fromByteArray don't accept null values
55-
case StringType =>
73+
case ParquetStringType =>
5674
(n: String, v: Any) => FilterApi.eq(
5775
binaryColumn(n),
5876
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
59-
case BinaryType =>
77+
case ParquetBinaryType =>
6078
(n: String, v: Any) => FilterApi.eq(
6179
binaryColumn(n),
6280
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
63-
case DateType if pushDownDate =>
81+
case ParquetDateType if pushDownDate =>
6482
(n: String, v: Any) => FilterApi.eq(
6583
intColumn(n),
6684
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
6785
}
6886

69-
private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
70-
case BooleanType =>
87+
private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
88+
case ParquetBooleanType =>
7189
(n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean])
72-
case IntegerType =>
90+
case ParquetIntegerType =>
7391
(n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer])
74-
case LongType =>
92+
case ParquetLongType =>
7593
(n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long])
76-
case FloatType =>
94+
case ParquetFloatType =>
7795
(n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
78-
case DoubleType =>
96+
case ParquetDoubleType =>
7997
(n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
8098

81-
case StringType =>
99+
case ParquetStringType =>
82100
(n: String, v: Any) => FilterApi.notEq(
83101
binaryColumn(n),
84102
Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)
85-
case BinaryType =>
103+
case ParquetBinaryType =>
86104
(n: String, v: Any) => FilterApi.notEq(
87105
binaryColumn(n),
88106
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
89-
case DateType if pushDownDate =>
107+
case ParquetDateType if pushDownDate =>
90108
(n: String, v: Any) => FilterApi.notEq(
91109
intColumn(n),
92110
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
93111
}
94112

95-
private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
96-
case IntegerType =>
113+
private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
114+
case ParquetIntegerType =>
97115
(n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer])
98-
case LongType =>
116+
case ParquetLongType =>
99117
(n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long])
100-
case FloatType =>
118+
case ParquetFloatType =>
101119
(n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float])
102-
case DoubleType =>
120+
case ParquetDoubleType =>
103121
(n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
104122

105-
case StringType =>
123+
case ParquetStringType =>
106124
(n: String, v: Any) =>
107-
FilterApi.lt(binaryColumn(n),
108-
Binary.fromString(v.asInstanceOf[String]))
109-
case BinaryType =>
125+
FilterApi.lt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
126+
case ParquetBinaryType =>
110127
(n: String, v: Any) =>
111128
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
112-
case DateType if pushDownDate =>
113-
(n: String, v: Any) => FilterApi.lt(
114-
intColumn(n),
115-
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
129+
case ParquetDateType if pushDownDate =>
130+
(n: String, v: Any) =>
131+
FilterApi.lt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
116132
}
117133

118-
private val makeLtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
119-
case IntegerType =>
120-
(n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
121-
case LongType =>
134+
private val makeLtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
135+
case ParquetIntegerType =>
136+
(n: String, v: Any) => FilterApi.ltEq(intColumn(n), v.asInstanceOf[Integer])
137+
case ParquetLongType =>
122138
(n: String, v: Any) => FilterApi.ltEq(longColumn(n), v.asInstanceOf[java.lang.Long])
123-
case FloatType =>
139+
case ParquetFloatType =>
124140
(n: String, v: Any) => FilterApi.ltEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
125-
case DoubleType =>
141+
case ParquetDoubleType =>
126142
(n: String, v: Any) => FilterApi.ltEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
127143

128-
case StringType =>
144+
case ParquetStringType =>
129145
(n: String, v: Any) =>
130-
FilterApi.ltEq(binaryColumn(n),
131-
Binary.fromString(v.asInstanceOf[String]))
132-
case BinaryType =>
146+
FilterApi.ltEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
147+
case ParquetBinaryType =>
133148
(n: String, v: Any) =>
134149
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
135-
case DateType if pushDownDate =>
136-
(n: String, v: Any) => FilterApi.ltEq(
137-
intColumn(n),
138-
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
150+
case ParquetDateType if pushDownDate =>
151+
(n: String, v: Any) =>
152+
FilterApi.ltEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
139153
}
140154

141-
private val makeGt: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
142-
case IntegerType =>
143-
(n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[java.lang.Integer])
144-
case LongType =>
155+
private val makeGt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
156+
case ParquetIntegerType =>
157+
(n: String, v: Any) => FilterApi.gt(intColumn(n), v.asInstanceOf[Integer])
158+
case ParquetLongType =>
145159
(n: String, v: Any) => FilterApi.gt(longColumn(n), v.asInstanceOf[java.lang.Long])
146-
case FloatType =>
160+
case ParquetFloatType =>
147161
(n: String, v: Any) => FilterApi.gt(floatColumn(n), v.asInstanceOf[java.lang.Float])
148-
case DoubleType =>
162+
case ParquetDoubleType =>
149163
(n: String, v: Any) => FilterApi.gt(doubleColumn(n), v.asInstanceOf[java.lang.Double])
150164

151-
case StringType =>
165+
case ParquetStringType =>
152166
(n: String, v: Any) =>
153-
FilterApi.gt(binaryColumn(n),
154-
Binary.fromString(v.asInstanceOf[String]))
155-
case BinaryType =>
167+
FilterApi.gt(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
168+
case ParquetBinaryType =>
156169
(n: String, v: Any) =>
157170
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
158-
case DateType if pushDownDate =>
159-
(n: String, v: Any) => FilterApi.gt(
160-
intColumn(n),
161-
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
171+
case ParquetDateType if pushDownDate =>
172+
(n: String, v: Any) =>
173+
FilterApi.gt(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
162174
}
163175

164-
private val makeGtEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = {
165-
case IntegerType =>
166-
(n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[java.lang.Integer])
167-
case LongType =>
176+
private val makeGtEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = {
177+
case ParquetIntegerType =>
178+
(n: String, v: Any) => FilterApi.gtEq(intColumn(n), v.asInstanceOf[Integer])
179+
case ParquetLongType =>
168180
(n: String, v: Any) => FilterApi.gtEq(longColumn(n), v.asInstanceOf[java.lang.Long])
169-
case FloatType =>
181+
case ParquetFloatType =>
170182
(n: String, v: Any) => FilterApi.gtEq(floatColumn(n), v.asInstanceOf[java.lang.Float])
171-
case DoubleType =>
183+
case ParquetDoubleType =>
172184
(n: String, v: Any) => FilterApi.gtEq(doubleColumn(n), v.asInstanceOf[java.lang.Double])
173185

174-
case StringType =>
186+
case ParquetStringType =>
175187
(n: String, v: Any) =>
176-
FilterApi.gtEq(binaryColumn(n),
177-
Binary.fromString(v.asInstanceOf[String]))
178-
case BinaryType =>
188+
FilterApi.gtEq(binaryColumn(n), Binary.fromString(v.asInstanceOf[String]))
189+
case ParquetBinaryType =>
179190
(n: String, v: Any) =>
180191
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
181-
case DateType if pushDownDate =>
182-
(n: String, v: Any) => FilterApi.gtEq(
183-
intColumn(n),
184-
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
192+
case ParquetDateType if pushDownDate =>
193+
(n: String, v: Any) =>
194+
FilterApi.gtEq(intColumn(n), dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
185195
}
186196

187197
/**
188198
* Returns a map from name of the column to the data type, if predicate push down applies.
189199
*/
190-
private def getFieldMap(dataType: DataType): Map[String, DataType] = dataType match {
191-
case StructType(fields) =>
200+
private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match {
201+
case m: MessageType =>
192202
// Here we don't flatten the fields in the nested schema but just look up through
193203
// root fields. Currently, accessing to nested fields does not push down filters
194204
// and it does not support to create filters for them.
195-
fields.map(f => f.name -> f.dataType).toMap
196-
case _ => Map.empty[String, DataType]
205+
m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
206+
f.getName -> ParquetSchemaType(
207+
f.getOriginalType, f.getPrimitiveTypeName, f.getDecimalMetadata)
208+
}.toMap
209+
case _ => Map.empty[String, ParquetSchemaType]
197210
}
198211

199212
/**
200213
* Converts data sources filters to Parquet filter predicates.
201214
*/
202-
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
215+
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
203216
val nameToType = getFieldMap(schema)
204217

205218
// Parquet does not allow dots in the column name because dots are used as a column path

sql/core/src/main/scala/org/apache/spark/sql/sources/filters.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ case class StringStartsWith(attribute: String, value: String) extends Filter {
199199

200200
/**
201201
* A filter that evaluates to `true` iff the attribute evaluates to
202-
* a string that starts with `value`.
202+
* a string that ends with `value`.
203203
*
204204
* @since 1.3.1
205205
*/

0 commit comments

Comments
 (0)