Skip to content

Commit 90b8717

Browse files
committed
address comments
1 parent 1ea94cc commit 90b8717

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,10 @@ private[parquet] class ParquetFilters(
4949
caseSensitive: Boolean) {
5050

5151
private case class ParquetField(
52-
resolvedName: String,
53-
schema: ParquetSchemaType)
52+
// field name in parquet file
53+
fieldName: String,
54+
// field schema type in parquet file
55+
fieldSchema: ParquetSchemaType)
5456

5557
private case class ParquetSchemaType(
5658
originalType: OriginalType,
@@ -387,7 +389,7 @@ private[parquet] class ParquetFilters(
387389
* Converts data sources filters to Parquet filter predicates.
388390
*/
389391
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
390-
val nameToParquet = getFieldMap(schema)
392+
val nameToParquetField = getFieldMap(schema)
391393

392394
// Decimal type must make sure that filter value's scale matched the file.
393395
// If doesn't matched, which would cause data corruption.
@@ -400,7 +402,7 @@ private[parquet] class ParquetFilters(
400402
// Parquet's type in the given file should be matched to the value's type
401403
// in the pushed filter in order to push down the filter to Parquet.
402404
def valueCanMakeFilterOn(name: String, value: Any): Boolean = {
403-
value == null || (nameToParquet(name).schema match {
405+
value == null || (nameToParquetField(name).fieldSchema match {
404406
case ParquetBooleanType => value.isInstanceOf[JBoolean]
405407
case ParquetByteType | ParquetShortType | ParquetIntegerType => value.isInstanceOf[Number]
406408
case ParquetLongType => value.isInstanceOf[JLong]
@@ -427,7 +429,7 @@ private[parquet] class ParquetFilters(
427429
// filters for the column having dots in the names. Thus, we do not push down such filters.
428430
// See SPARK-20364.
429431
def canMakeFilterOn(name: String, value: Any): Boolean = {
430-
nameToParquet.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
432+
nameToParquetField.contains(name) && !name.contains(".") && valueCanMakeFilterOn(name, value)
431433
}
432434

433435
// NOTE:
@@ -447,29 +449,39 @@ private[parquet] class ParquetFilters(
447449

448450
predicate match {
449451
case sources.IsNull(name) if canMakeFilterOn(name, null) =>
450-
makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, null))
452+
makeEq.lift(nameToParquetField(name).fieldSchema)
453+
.map(_(nameToParquetField(name).fieldName, null))
451454
case sources.IsNotNull(name) if canMakeFilterOn(name, null) =>
452-
makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, null))
455+
makeNotEq.lift(nameToParquetField(name).fieldSchema)
456+
.map(_(nameToParquetField(name).fieldName, null))
453457

454458
case sources.EqualTo(name, value) if canMakeFilterOn(name, value) =>
455-
makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value))
459+
makeEq.lift(nameToParquetField(name).fieldSchema)
460+
.map(_(nameToParquetField(name).fieldName, value))
456461
case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name, value) =>
457-
makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value))
462+
makeNotEq.lift(nameToParquetField(name).fieldSchema)
463+
.map(_(nameToParquetField(name).fieldName, value))
458464

459465
case sources.EqualNullSafe(name, value) if canMakeFilterOn(name, value) =>
460-
makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value))
466+
makeEq.lift(nameToParquetField(name).fieldSchema)
467+
.map(_(nameToParquetField(name).fieldName, value))
461468
case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name, value) =>
462-
makeNotEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value))
469+
makeNotEq.lift(nameToParquetField(name).fieldSchema)
470+
.map(_(nameToParquetField(name).fieldName, value))
463471

464472
case sources.LessThan(name, value) if canMakeFilterOn(name, value) =>
465-
makeLt.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value))
473+
makeLt.lift(nameToParquetField(name).fieldSchema)
474+
.map(_(nameToParquetField(name).fieldName, value))
466475
case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name, value) =>
467-
makeLtEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value))
476+
makeLtEq.lift(nameToParquetField(name).fieldSchema)
477+
.map(_(nameToParquetField(name).fieldName, value))
468478

469479
case sources.GreaterThan(name, value) if canMakeFilterOn(name, value) =>
470-
makeGt.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value))
480+
makeGt.lift(nameToParquetField(name).fieldSchema)
481+
.map(_(nameToParquetField(name).fieldName, value))
471482
case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name, value) =>
472-
makeGtEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, value))
483+
makeGtEq.lift(nameToParquetField(name).fieldSchema)
484+
.map(_(nameToParquetField(name).fieldName, value))
473485

474486
case sources.And(lhs, rhs) =>
475487
// At here, it is not safe to just convert one side if we do not understand the
@@ -496,7 +508,8 @@ private[parquet] class ParquetFilters(
496508
case sources.In(name, values) if canMakeFilterOn(name, values.head)
497509
&& values.distinct.length <= pushDownInFilterThreshold =>
498510
values.distinct.flatMap { v =>
499-
makeEq.lift(nameToParquet(name).schema).map(_(nameToParquet(name).resolvedName, v))
511+
makeEq.lift(nameToParquetField(name).fieldSchema)
512+
.map(_(nameToParquetField(name).fieldName, v))
500513
}.reduceLeftOption(FilterApi.or)
501514

502515
case sources.StringStartsWith(name, prefix)

0 commit comments

Comments
 (0)