Skip to content

Commit 7a8ff15

Browse files
committed
[SPARK-26865][SQL] DataSourceV2Strategy should push normalized filters
## What changes were proposed in this pull request? This PR aims to make `DataSourceV2Strategy` normalize filters like [FileSourceStrategy](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L150-L158) when it pushes them into `SupportsPushDownFilters.pushFilters`. ## How was this patch tested? Pass the Jenkins with the newly added test case. Closes #23770 from dongjoon-hyun/SPARK-26865. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent a829234 commit 7a8ff15

File tree

4 files changed

+29
-11
lines changed

4 files changed

+29
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,22 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with
426426
}
427427

428428
object DataSourceStrategy {
429+
/**
430+
* The attribute name of predicate could be different than the one in schema in case of
431+
* case insensitive, we should change them to match the one in schema, so we do not need to
432+
* worry about case sensitivity anymore.
433+
*/
434+
protected[sql] def normalizeFilters(
435+
filters: Seq[Expression],
436+
attributes: Seq[AttributeReference]): Seq[Expression] = {
437+
filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
438+
e transform {
439+
case a: AttributeReference =>
440+
a.withName(attributes.find(_.semanticEquals(a)).get.name)
441+
}
442+
}
443+
}
444+
429445
/**
430446
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
431447
*

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,7 @@ object FileSourceStrategy extends Strategy with Logging {
147147
// - filters that need to be evaluated again after the scan
148148
val filterSet = ExpressionSet(filters)
149149

150-
// The attribute name of predicate could be different than the one in schema in case of
151-
// case insensitive, we should change them to match the one in schema, so we do not need to
152-
// worry about case sensitivity anymore.
153-
val normalizedFilters = filters.filterNot(SubqueryExpression.hasSubquery).map { e =>
154-
e transform {
155-
case a: AttributeReference =>
156-
a.withName(l.output.find(_.semanticEquals(a)).get.name)
157-
}
158-
}
150+
val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, l.output)
159151

160152
val partitionColumns =
161153
l.resolve(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
2020
import scala.collection.mutable
2121

2222
import org.apache.spark.sql.{sources, AnalysisException, SaveMode, Strategy}
23-
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression}
23+
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, SubqueryExpression}
2424
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
2525
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, Repartition}
2626
import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
@@ -104,10 +104,13 @@ object DataSourceV2Strategy extends Strategy {
104104
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
105105
case PhysicalOperation(project, filters, relation: DataSourceV2Relation) =>
106106
val scanBuilder = relation.newScanBuilder()
107+
108+
val normalizedFilters = DataSourceStrategy.normalizeFilters(filters, relation.output)
109+
107110
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
108111
// `postScanFilters` need to be evaluated after the scan.
109112
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
110-
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, filters)
113+
val (pushedFilters, postScanFilters) = pushFilters(scanBuilder, normalizedFilters)
111114
val (scan, output) = pruneColumns(scanBuilder, relation, project ++ postScanFilters)
112115
logInfo(
113116
s"""

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,13 @@ class DataSourceStrategySuite extends PlanTest with SharedSQLContext {
219219
IsNotNull(attrInt))), None)
220220
}
221221

222+
test("SPARK-26865 DataSourceV2Strategy should push normalized filters") {
223+
val attrInt = 'cint.int
224+
assertResult(Seq(IsNotNull(attrInt))) {
225+
DataSourceStrategy.normalizeFilters(Seq(IsNotNull(attrInt.withName("CiNt"))), Seq(attrInt))
226+
}
227+
}
228+
222229
/**
223230
* Translate the given Catalyst [[Expression]] into data source [[sources.Filter]]
224231
* then verify against the given [[sources.Filter]].

0 commit comments

Comments
 (0)