Skip to content

Commit f1fe805

Browse files
adrian-wanggatorsmile
authored andcommitted
[SPARK-27279][SQL] Reuse subquery should compare child plan of SubqueryExec
## What changes were proposed in this pull request? For now, `ReuseSubquery` in Spark compares two subqueries at `SubqueryExec` level, which invalidates the `ReuseSubquery` rule. This pull request fixes this, and add a configuration key for subquery reuse exclusively. ## How was this patch tested? add a unit test. Closes #24214 from adrian-wang/reuse. Authored-by: Daoyuan Wang <[email protected]> Signed-off-by: gatorsmile <[email protected]>
1 parent 956b52b commit f1fe805

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,8 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
674674

675675
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
676676

677+
override def doCanonicalize(): SparkPlan = child.canonicalized
678+
677679
@transient
678680
private lazy val relationFuture: Future[Array[InternalRow]] = {
679681
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
2525
import org.apache.spark.{AccumulatorSuite, SparkException}
2626
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2727
import org.apache.spark.sql.catalyst.util.StringUtils
28-
import org.apache.spark.sql.execution.aggregate
28+
import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec}
2929
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
3030
import org.apache.spark.sql.execution.datasources.FilePartition
3131
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
@@ -113,6 +113,33 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
113113
}
114114
}
115115

116+
test("Reuse Subquery") {
117+
Seq(true, false).foreach { reuse =>
118+
withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
119+
val df = sql(
120+
"""
121+
|SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM testData)
122+
|FROM testData
123+
|LIMIT 1
124+
""".stripMargin)
125+
126+
import scala.collection.mutable.ArrayBuffer
127+
val subqueries = ArrayBuffer[SubqueryExec]()
128+
df.queryExecution.executedPlan.transformAllExpressions {
129+
case s @ ScalarSubquery(plan: SubqueryExec, _) =>
130+
subqueries += plan
131+
s
132+
}
133+
134+
if (reuse) {
135+
assert(subqueries.distinct.size == 1, "Subquery reusing not working correctly")
136+
} else {
137+
assert(subqueries.distinct.size == 2, "There should be 2 subqueries when not reusing")
138+
}
139+
}
140+
}
141+
}
142+
116143
test("SPARK-6743: no columns from cache") {
117144
Seq(
118145
(83, 0, 38),

0 commit comments

Comments
 (0)