Skip to content

Commit 0c23e25

Browse files
committed
[SPARK-24167][SQL] ParquetFilters should not access SQLConf at executor side
## What changes were proposed in this pull request? This PR is extracted from #21190 , to make it easier to backport. `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't call `conf.parquetFilterPushDownDate` there. ## How was this patch tested? it's tested in #21190 Author: Wenchen Fan <[email protected]> Closes #21224 from cloud-fan/minor2.
1 parent e646ae6 commit 0c23e25

File tree

3 files changed

+15
-13
lines changed

3 files changed

+15
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ class ParquetFileFormat
342342
sparkSession.sessionState.conf.parquetFilterPushDown
343343
// Whole stage codegen (PhysicalRDD) is able to deal with batches directly
344344
val returningBatch = supportBatch(sparkSession, resultSchema)
345+
val pushDownDate = sqlConf.parquetFilterPushDownDate
345346

346347
(file: PartitionedFile) => {
347348
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -352,7 +353,7 @@ class ParquetFileFormat
352353
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
353354
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
354355
// is used here.
355-
.flatMap(ParquetFilters.createFilter(requiredSchema, _))
356+
.flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _))
356357
.reduceOption(FilterApi.and)
357358
} else {
358359
None

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@ import org.apache.parquet.io.api.Binary
2525

2626
import org.apache.spark.sql.catalyst.util.DateTimeUtils
2727
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
28-
import org.apache.spark.sql.internal.SQLConf
2928
import org.apache.spark.sql.sources
3029
import org.apache.spark.sql.types._
3130

3231
/**
3332
* Some utility function to convert Spark data source filters to Parquet filters.
3433
*/
35-
private[parquet] object ParquetFilters {
34+
private[parquet] class ParquetFilters(pushDownDate: Boolean) {
3635

3736
private def dateToDays(date: Date): SQLDate = {
3837
DateTimeUtils.fromJavaDate(date)
@@ -59,7 +58,7 @@ private[parquet] object ParquetFilters {
5958
(n: String, v: Any) => FilterApi.eq(
6059
binaryColumn(n),
6160
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
62-
case DateType if SQLConf.get.parquetFilterPushDownDate =>
61+
case DateType if pushDownDate =>
6362
(n: String, v: Any) => FilterApi.eq(
6463
intColumn(n),
6564
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -85,7 +84,7 @@ private[parquet] object ParquetFilters {
8584
(n: String, v: Any) => FilterApi.notEq(
8685
binaryColumn(n),
8786
Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
88-
case DateType if SQLConf.get.parquetFilterPushDownDate =>
87+
case DateType if pushDownDate =>
8988
(n: String, v: Any) => FilterApi.notEq(
9089
intColumn(n),
9190
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -108,7 +107,7 @@ private[parquet] object ParquetFilters {
108107
case BinaryType =>
109108
(n: String, v: Any) =>
110109
FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
111-
case DateType if SQLConf.get.parquetFilterPushDownDate =>
110+
case DateType if pushDownDate =>
112111
(n: String, v: Any) => FilterApi.lt(
113112
intColumn(n),
114113
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -131,7 +130,7 @@ private[parquet] object ParquetFilters {
131130
case BinaryType =>
132131
(n: String, v: Any) =>
133132
FilterApi.ltEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
134-
case DateType if SQLConf.get.parquetFilterPushDownDate =>
133+
case DateType if pushDownDate =>
135134
(n: String, v: Any) => FilterApi.ltEq(
136135
intColumn(n),
137136
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -154,7 +153,7 @@ private[parquet] object ParquetFilters {
154153
case BinaryType =>
155154
(n: String, v: Any) =>
156155
FilterApi.gt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
157-
case DateType if SQLConf.get.parquetFilterPushDownDate =>
156+
case DateType if pushDownDate =>
158157
(n: String, v: Any) => FilterApi.gt(
159158
intColumn(n),
160159
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
@@ -177,7 +176,7 @@ private[parquet] object ParquetFilters {
177176
case BinaryType =>
178177
(n: String, v: Any) =>
179178
FilterApi.gtEq(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
180-
case DateType if SQLConf.get.parquetFilterPushDownDate =>
179+
case DateType if pushDownDate =>
181180
(n: String, v: Any) => FilterApi.gtEq(
182181
intColumn(n),
183182
Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2}
5555
*/
5656
class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext {
5757

58+
private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate)
59+
5860
override def beforeEach(): Unit = {
5961
super.beforeEach()
6062
// Note that there are many tests here that require record-level filtering set to be true.
@@ -99,7 +101,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
99101
assert(selectedFilters.nonEmpty, "No filter is pushed down")
100102

101103
selectedFilters.foreach { pred =>
102-
val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
104+
val maybeFilter = parquetFilters.createFilter(df.schema, pred)
103105
assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
104106
// Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
105107
maybeFilter.exists(_.getClass === filterClass)
@@ -517,23 +519,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
517519
lt(intColumn("a"), 10: Integer),
518520
gt(doubleColumn("c"), 1.5: java.lang.Double)))
519521
) {
520-
ParquetFilters.createFilter(
522+
parquetFilters.createFilter(
521523
schema,
522524
sources.And(
523525
sources.LessThan("a", 10),
524526
sources.GreaterThan("c", 1.5D)))
525527
}
526528

527529
assertResult(None) {
528-
ParquetFilters.createFilter(
530+
parquetFilters.createFilter(
529531
schema,
530532
sources.And(
531533
sources.LessThan("a", 10),
532534
sources.StringContains("b", "prefix")))
533535
}
534536

535537
assertResult(None) {
536-
ParquetFilters.createFilter(
538+
parquetFilters.createFilter(
537539
schema,
538540
sources.Not(
539541
sources.And(

0 commit comments

Comments
 (0)