Skip to content

Commit a8d981d

Browse files
wangzhenhuacloud-fan
authored andcommitted
[SPARK-20718][SQL] FileSourceScanExec with different filter orders should be the same after canonicalization
## What changes were proposed in this pull request? Since `constraints` in `QueryPlan` is a set, the order of filters can differ. Usually this is ok because of canonicalization. However, in `FileSourceScanExec`, its data filters and partition filters are sequences, and their orders are not canonicalized. So `def sameResult` returns different results for different orders of data/partition filters. This leads to, e.g. different decision for `ReuseExchange`, and thus results in unstable performance. ## How was this patch tested? Added a new test for `FileSourceScanExec.sameResult`. Author: wangzhenhua <[email protected]> Closes #17959 from wzhfy/canonicalizeFileSourceScanExec. (cherry picked from commit c8da535) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 2cac317 commit a8d981d

File tree

2 files changed

+62
-3
lines changed

2 files changed

+62
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.BaseRelation
3838
import org.apache.spark.sql.types.StructType
3939
import org.apache.spark.util.Utils
4040

41-
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
41+
trait DataSourceScanExec extends LeafExecNode with CodegenSupport with PredicateHelper {
4242
val relation: BaseRelation
4343
val metastoreTableIdentifier: Option[TableIdentifier]
4444

@@ -519,8 +519,18 @@ case class FileSourceScanExec(
519519
relation,
520520
output.map(QueryPlan.normalizeExprId(_, output)),
521521
requiredSchema,
522-
partitionFilters.map(QueryPlan.normalizeExprId(_, output)),
523-
dataFilters.map(QueryPlan.normalizeExprId(_, output)),
522+
canonicalizeFilters(partitionFilters, output),
523+
canonicalizeFilters(dataFilters, output),
524524
None)
525525
}
526+
527+
private def canonicalizeFilters(filters: Seq[Expression], output: Seq[Attribute])
528+
: Seq[Expression] = {
529+
if (filters.nonEmpty) {
530+
val normalizedFilters = QueryPlan.normalizeExprId(filters.reduce(And), output)
531+
splitConjunctivePredicates(normalizedFilters)
532+
} else {
533+
Nil
534+
}
535+
}
526536
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.{DataFrame, QueryTest}
21+
import org.apache.spark.sql.test.SharedSQLContext
22+
23+
/**
24+
* Tests for the sameResult function for [[SparkPlan]]s.
25+
*/
26+
class SameResultSuite extends QueryTest with SharedSQLContext {
27+
28+
test("FileSourceScanExec: different orders of data filters and partition filters") {
29+
withTempPath { path =>
30+
val tmpDir = path.getCanonicalPath
31+
spark.range(10)
32+
.selectExpr("id as a", "id + 1 as b", "id + 2 as c", "id + 3 as d")
33+
.write
34+
.partitionBy("a", "b")
35+
.parquet(tmpDir)
36+
val df = spark.read.parquet(tmpDir)
37+
// partition filters: a > 1 AND b < 9
38+
// data filters: c > 1 AND d < 9
39+
val plan1 = getFileSourceScanExec(df.where("a > 1 AND b < 9 AND c > 1 AND d < 9"))
40+
val plan2 = getFileSourceScanExec(df.where("b < 9 AND a > 1 AND d < 9 AND c > 1"))
41+
assert(plan1.sameResult(plan2))
42+
}
43+
}
44+
45+
private def getFileSourceScanExec(df: DataFrame): FileSourceScanExec = {
46+
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
47+
.asInstanceOf[FileSourceScanExec]
48+
}
49+
}

0 commit comments

Comments
 (0)