Skip to content

Commit f7cf209

Browse files
committed
[SPARK-20941][SQL] Fix SubqueryExec Reuse
### What changes were proposed in this pull request? Before this PR, Subquery reuse does not work. Below are three issues: - Subquery reuse does not work. - It is sharing the same `SQLConf` (`spark.sql.exchange.reuse`) with the one for Exchange Reuse. - No test case covers the rule Subquery reuse. This PR is to fix the above three issues. - Ignored the physical operator `SubqueryExec` when comparing two plans. - Added a dedicated conf `spark.sql.subqueries.reuse` for controlling Subquery Reuse - Added a test case for verifying the behavior ### How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes #18169 from gatorsmile/subqueryReuse.
1 parent 0975019 commit f7cf209

File tree

4 files changed

+47
-1
lines changed

4 files changed

+47
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,12 @@ object SQLConf {
552552
.booleanConf
553553
.createWithDefault(true)
554554

555+
val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
556+
.internal()
557+
.doc("When true, the planner will try to find out duplicated subqueries and re-use them.")
558+
.booleanConf
559+
.createWithDefault(true)
560+
555561
val STATE_STORE_PROVIDER_CLASS =
556562
buildConf("spark.sql.streaming.stateStore.providerClass")
557563
.internal()
@@ -932,6 +938,8 @@ class SQLConf extends Serializable with Logging {
932938

933939
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
934940

941+
def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
942+
935943
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
936944

937945
def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)

sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,9 @@ case class OutputFakerExec(output: Seq[Attribute], child: SparkPlan) extends Spa
595595
*/
596596
case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
597597

598+
// Ignore this wrapper for canonicalizing.
599+
override lazy val canonicalized: SparkPlan = child.canonicalized
600+
598601
override lazy val metrics = Map(
599602
"dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
600603
"collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))

sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) extends Rule[SparkPlan] {
156156
case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
157157

158158
def apply(plan: SparkPlan): SparkPlan = {
159-
if (!conf.exchangeReuseEnabled) {
159+
if (!conf.subqueryReuseEnabled) {
160160
return plan
161161
}
162162
// Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls.

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@ import java.net.{MalformedURLException, URL}
2323
import java.sql.Timestamp
2424
import java.util.concurrent.atomic.AtomicBoolean
2525

26+
import scala.collection.mutable.ArrayBuffer
27+
2628
import org.apache.spark.{AccumulatorSuite, SparkException}
2729
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2830
import org.apache.spark.sql.catalyst.util.StringUtils
31+
import org.apache.spark.sql.execution.{ScalarSubquery, SubqueryExec}
2932
import org.apache.spark.sql.execution.aggregate
3033
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
3134
import org.apache.spark.sql.functions._
@@ -700,6 +703,38 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
700703
row => Seq.fill(16)(Row.merge(row, row))).collect().toSeq)
701704
}
702705

706+
test("Verify spark.sql.subquery.reuse") {
707+
Seq(true, false).foreach { reuse =>
708+
withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
709+
val df = sql(
710+
"""
711+
|SELECT key, (SELECT avg(key) FROM testData)
712+
|FROM testData
713+
|WHERE key > (SELECT avg(key) FROM testData)
714+
|ORDER BY key
715+
|LIMIT 3
716+
""".stripMargin)
717+
718+
checkAnswer(df, Row(51, 50.5) :: Row(52, 50.5) :: Row(53, 50.5) :: Nil)
719+
720+
val subqueries = ArrayBuffer.empty[SubqueryExec]
721+
df.queryExecution.executedPlan.transformAllExpressions {
722+
case s @ ScalarSubquery(plan: SubqueryExec, _) =>
723+
subqueries += plan
724+
s
725+
}
726+
727+
assert(subqueries.size == 2, "Two ScalarSubquery are expected in the plan")
728+
729+
if (reuse) {
730+
assert(subqueries.distinct.size == 1, "Only one ScalarSubquery exists in the plan")
731+
} else {
732+
assert(subqueries.distinct.size == 2, "Reuse is not expected")
733+
}
734+
}
735+
}
736+
}
737+
703738
test("cartesian product join") {
704739
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
705740
checkAnswer(

0 commit comments

Comments
 (0)