Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,8 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def doCanonicalize(): SparkPlan = child.canonicalized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#18169 fixed this problem like this before, but I don't know why was it reverted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile may you please share if you remember why it was reverted? Thanks.


@transient
private lazy val relationFuture: Future[Array[InternalRow]] = {
// relationFuture is used in "doExecute". Therefore we can get the execution id correctly here.
Expand Down
29 changes: 28 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
Expand Down Expand Up @@ -113,6 +113,33 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("Reuse Subquery") {
Seq(true, false).foreach { reuse =>
withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) {
val df = sql(
"""
|SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM testData)
|FROM testData
|LIMIT 1
""".stripMargin)

import scala.collection.mutable.ArrayBuffer
val subqueries = ArrayBuffer[SubqueryExec]()
df.queryExecution.executedPlan.transformAllExpressions {
case s @ ScalarSubquery(plan: SubqueryExec, _) =>
subqueries += plan
s
}

if (reuse) {
assert(subqueries.distinct.size == 1, "Subquery reusing not working correctly")
} else {
assert(subqueries.distinct.size == 2, "There should be 2 subqueries when not reusing")
}
}
}
}

test("SPARK-6743: no columns from cache") {
Seq(
(83, 0, 38),
Expand Down