Skip to content

Commit 18725f5

Browse files
wangzhenhuacloud-fan
authored andcommitted
[SPARK-20718][SQL][FOLLOWUP] Fix canonicalization for HiveTableScanExec
## What changes were proposed in this pull request? Fix canonicalization for different filter orders in `HiveTableScanExec`. ## How was this patch tested? Added a new test case. Author: wangzhenhua <[email protected]> Closes #17962 from wzhfy/canonicalizeHiveTableScanExec. (cherry picked from commit 54b4f2a) Signed-off-by: Wenchen Fan <[email protected]>
1 parent 10b7244 commit 18725f5

File tree

4 files changed

+39
-22
lines changed

4 files changed

+39
-22
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -423,7 +423,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
423423
lazy val allAttributes: AttributeSeq = children.flatMap(_.output)
424424
}
425425

426-
object QueryPlan {
426+
object QueryPlan extends PredicateHelper {
427427
/**
428428
* Normalize the exprIds in the given expression, by updating the exprId in `AttributeReference`
429429
* with its referenced ordinal from input attributes. It's similar to `BindReferences` but we
@@ -442,4 +442,17 @@ object QueryPlan {
442442
}
443443
}.canonicalized.asInstanceOf[T]
444444
}
445+
446+
/**
447+
* Composes the given predicates into a conjunctive predicate, which is normalized and reordered.
448+
* Then returns a new sequence of predicates by splitting the conjunctive predicate.
449+
*/
450+
def normalizePredicates(predicates: Seq[Expression], output: AttributeSeq): Seq[Expression] = {
451+
if (predicates.nonEmpty) {
452+
val normalized = normalizeExprId(predicates.reduce(And), output)
453+
splitConjunctivePredicates(normalized)
454+
} else {
455+
Nil
456+
}
457+
}
445458
}

sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.BaseRelation
3838
import org.apache.spark.sql.types.StructType
3939
import org.apache.spark.util.Utils
4040

41-
trait DataSourceScanExec extends LeafExecNode with CodegenSupport with PredicateHelper {
41+
trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
4242
val relation: BaseRelation
4343
val metastoreTableIdentifier: Option[TableIdentifier]
4444

@@ -519,18 +519,8 @@ case class FileSourceScanExec(
519519
relation,
520520
output.map(QueryPlan.normalizeExprId(_, output)),
521521
requiredSchema,
522-
canonicalizeFilters(partitionFilters, output),
523-
canonicalizeFilters(dataFilters, output),
522+
QueryPlan.normalizePredicates(partitionFilters, output),
523+
QueryPlan.normalizePredicates(dataFilters, output),
524524
None)
525525
}
526-
527-
private def canonicalizeFilters(filters: Seq[Expression], output: Seq[Attribute])
528-
: Seq[Expression] = {
529-
if (filters.nonEmpty) {
530-
val normalizedFilters = QueryPlan.normalizeExprId(filters.reduce(And), output)
531-
splitConjunctivePredicates(normalizedFilters)
532-
} else {
533-
Nil
534-
}
535-
}
536526
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ case class HiveTableScanExec(
206206
HiveTableScanExec(
207207
requestedAttributes.map(QueryPlan.normalizeExprId(_, input)),
208208
relation.canonicalized.asInstanceOf[CatalogRelation],
209-
partitionPruningPred.map(QueryPlan.normalizeExprId(_, input)))(sparkSession)
209+
QueryPlan.normalizePredicates(partitionPruningPred, input))(sparkSession)
210210
}
211211

212212
override def otherCopyArgs: Seq[AnyRef] = Seq(sparkSession)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -164,16 +164,30 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH
164164
|PARTITION (p1='a',p2='c',p3='c',p4='d',p5='e')
165165
|SELECT v.id
166166
""".stripMargin)
167-
val plan = sql(
168-
s"""
169-
|SELECT * FROM $table
170-
""".stripMargin).queryExecution.sparkPlan
171-
val scan = plan.collectFirst {
172-
case p: HiveTableScanExec => p
173-
}.get
167+
val scan = getHiveTableScanExec(s"SELECT * FROM $table")
174168
val numDataCols = scan.relation.dataCols.length
175169
scan.rawPartitions.foreach(p => assert(p.getCols.size == numDataCols))
176170
}
177171
}
178172
}
173+
174+
test("HiveTableScanExec canonicalization for different orders of partition filters") {
175+
val table = "hive_tbl_part"
176+
withTable(table) {
177+
sql(
178+
s"""
179+
|CREATE TABLE $table (id int)
180+
|PARTITIONED BY (a int, b int)
181+
""".stripMargin)
182+
val scan1 = getHiveTableScanExec(s"SELECT * FROM $table WHERE a = 1 AND b = 2")
183+
val scan2 = getHiveTableScanExec(s"SELECT * FROM $table WHERE b = 2 AND a = 1")
184+
assert(scan1.sameResult(scan2))
185+
}
186+
}
187+
188+
private def getHiveTableScanExec(query: String): HiveTableScanExec = {
189+
sql(query).queryExecution.sparkPlan.collectFirst {
190+
case p: HiveTableScanExec => p
191+
}.get
192+
}
179193
}

0 commit comments

Comments
 (0)