From 2ebdcab0fc73cdaf046d327c2746a531c71092d3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 30 Mar 2019 19:45:20 -0700 Subject: [PATCH 1/3] fix --- .../sql/catalyst/expressions/subquery.scala | 7 ++- .../spark/sql/catalyst/plans/QueryPlan.scala | 2 +- .../spark/sql/execution/SparkPlanInfo.scala | 1 + .../execution/basicPhysicalOperators.scala | 51 ++++++++++++++++--- .../apache/spark/sql/execution/subquery.scala | 20 +++++--- .../sql/execution/ui/SparkPlanGraph.scala | 5 ++ .../org/apache/spark/sql/SQLQuerySuite.scala | 29 +---------- .../org/apache/spark/sql/SubquerySuite.scala | 37 +++++++++++++- 8 files changed, 106 insertions(+), 46 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index 0431134920746..cb361ad5b532a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -37,6 +37,9 @@ abstract class PlanExpression[T <: QueryPlan[_]] extends Expression { /** Updates the expression with a new plan. */ def withNewPlan(plan: T): PlanExpression[T] + /** Defines how the canonicalization should work for this expression. */ + def canonicalize(attrs: AttributeSeq): PlanExpression[T] + protected def conditionString: String = children.mkString("[", " && ", "]") } @@ -58,11 +61,11 @@ abstract class SubqueryExpression( children.zip(p.children).forall(p => p._1.semanticEquals(p._2)) case _ => false } - def canonicalize(attrs: AttributeSeq): SubqueryExpression = { + override def canonicalize(attrs: AttributeSeq): SubqueryExpression = { // Normalize the outer references in the subquery plan. val normalizedPlan = plan.transformAllExpressions { case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs)) - } + }.canonicalized withNewPlan(normalizedPlan).canonicalized.asInstanceOf[SubqueryExpression] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 8f5444ed8a5a7..3d4de5c7e8710 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -279,7 +279,7 @@ object QueryPlan extends PredicateHelper { */ def normalizeExprId[T <: Expression](e: T, input: AttributeSeq): T = { e.transformUp { - case s: SubqueryExpression => s.canonicalize(input) + case s: PlanExpression[_] => s.canonicalize(input) case ar: AttributeReference => val ordinal = input.indexOf(ar.exprId) if (ordinal == -1) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index f554ff0aa775f..3cd02b984d333 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -52,6 +52,7 @@ private[execution] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { case ReusedExchangeExec(_, child) => child :: Nil + case ReusedSubqueryExec(child) => child :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 731e7daf2acdf..546640d2cb72d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -660,21 +660,28 @@ object CoalesceExec { } /** - * Physical plan for a subquery. + * Parent class for different types of subquery plans */ -case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { - - override lazy val metrics = Map( - "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), - "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect")) +abstract class BaseSubqueryExec extends SparkPlan { + def name: String + def child: SparkPlan override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering +} - override def doCanonicalize(): SparkPlan = child.canonicalized +/** + * Physical plan for a subquery. + */ +case class SubqueryExec(name: String, child: SparkPlan) + extends BaseSubqueryExec with UnaryExecNode { + + override lazy val metrics = Map( + "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"), + "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect")) @transient private lazy val relationFuture: Future[Array[InternalRow]] = { @@ -698,6 +705,10 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { }(SubqueryExec.executionContext) } + protected override def doCanonicalize(): SparkPlan = { + SubqueryExec("Subquery", child.canonicalized) + } + protected override def doPrepare(): Unit = { relationFuture } @@ -715,3 +726,29 @@ object SubqueryExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) } + +/** + * A wrapper for reused [[BaseSubqueryExec]]. + */ +case class ReusedSubqueryExec(child: BaseSubqueryExec) + extends BaseSubqueryExec with LeafExecNode { + + override def name: String = child.name + + override def output: Seq[Attribute] = child.output + override def doCanonicalize(): SparkPlan = child.canonicalized + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def outputPartitioning: Partitioning = child.outputPartitioning + + protected override def doPrepare(): Unit = { + child.prepare() + } + + protected override def doExecute(): RDD[InternalRow] = { + child.execute() + } + + override def executeCollect(): Array[InternalRow] = { + child.executeCollect() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index b7f10baaca7bd..c70f17fa49b7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, InSet, Literal, PlanExpression} +import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, Expression, ExprId, InSet, Literal, PlanExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -31,11 +31,16 @@ import org.apache.spark.sql.types.{BooleanType, DataType, StructType} /** * The base class for subquery that is used in SparkPlan. */ -abstract class ExecSubqueryExpression extends PlanExpression[SubqueryExec] { +abstract class ExecSubqueryExpression extends PlanExpression[BaseSubqueryExec] { /** * Fill the expression with collected result from executed plan. */ def updateResult(): Unit + + override def canonicalize(attrs: AttributeSeq): ExecSubqueryExpression = { + withNewPlan(plan.canonicalized.asInstanceOf[BaseSubqueryExec]) + .asInstanceOf[ExecSubqueryExpression] + } } object ExecSubqueryExpression { @@ -56,7 +61,7 @@ object ExecSubqueryExpression { * This is the physical copy of ScalarSubquery to be used inside SparkPlan. */ case class ScalarSubquery( - plan: SubqueryExec, + plan: BaseSubqueryExec, exprId: ExprId) extends ExecSubqueryExpression { @@ -64,7 +69,7 @@ case class ScalarSubquery( override def children: Seq[Expression] = Nil override def nullable: Boolean = true override def toString: String = plan.simpleString(SQLConf.get.maxToStringFields) - override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query) + override def withNewPlan(query: BaseSubqueryExec): ScalarSubquery = copy(plan = query) override def semanticEquals(other: Expression): Boolean = other match { case s: ScalarSubquery => plan.sameResult(s.plan) @@ -129,13 +134,14 @@ case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] { return plan } // Build a hash map using schema of subqueries to avoid O(N*N) sameResult calls. - val subqueries = mutable.HashMap[StructType, ArrayBuffer[SubqueryExec]]() + val subqueries = mutable.HashMap[StructType, ArrayBuffer[BaseSubqueryExec]]() plan transformAllExpressions { case sub: ExecSubqueryExpression => - val sameSchema = subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[SubqueryExec]()) + val sameSchema = + subqueries.getOrElseUpdate(sub.plan.schema, ArrayBuffer[BaseSubqueryExec]()) val sameResult = sameSchema.find(_.sameResult(sub.plan)) if (sameResult.isDefined) { - sub.withNewPlan(sameResult.get) + sub.withNewPlan(ReusedSubqueryExec(sameResult.get)) } else { sameSchema += sub.plan sub diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 838ee76c8d43e..b864ad1c71083 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -103,6 +103,11 @@ object SparkPlanGraph { // Point to the re-used subquery val node = exchanges(planInfo) edges += SparkPlanGraphEdge(node.id, parent.id) + case "ReusedSubquery" => + // Re-used subquery might appear before the original subquery, so skip this node and let + // the previous `case` make sure the re-used and the original point to the same node. + buildSparkPlanGraphNode( + planInfo.children.head, nodeIdGenerator, nodes, edges, parent, subgraph, exchanges) case "ReusedExchange" if exchanges.contains(planInfo.children.head) => // Point to the re-used exchange val node = exchanges(planInfo.children.head) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5916cbb7e6812..e8d1eccd329d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils -import org.apache.spark.sql.execution.{aggregate, ScalarSubquery, SubqueryExec} +import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} @@ -113,33 +113,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("Reuse Subquery") { - Seq(true, false).foreach { reuse => - withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) { - val df = sql( - """ - |SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM testData) - |FROM testData - |LIMIT 1 - """.stripMargin) - - import scala.collection.mutable.ArrayBuffer - val subqueries = ArrayBuffer[SubqueryExec]() - df.queryExecution.executedPlan.transformAllExpressions { - case s @ ScalarSubquery(plan: SubqueryExec, _) => - subqueries += plan - s - } - - if (reuse) { - assert(subqueries.distinct.size == 1, "Subquery reusing not working correctly") - } else { - assert(subqueries.distinct.size == 2, "There should be 2 subqueries when not reusing") - } - } - } - } - test("SPARK-6743: no columns from cache") { Seq( (83, 0, 38), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 53eb7ea7a0a05..f8a5c55660ff3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -21,8 +21,9 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort} -import org.apache.spark.sql.execution.{ExecSubqueryExpression, FileSourceScanExec, WholeStageCodegenExec} +import org.apache.spark.sql.execution.{ExecSubqueryExpression, FileSourceScanExec, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec} import org.apache.spark.sql.execution.datasources.FileScanRDD +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext class SubquerySuite extends QueryTest with SharedSQLContext { @@ -1337,4 +1338,38 @@ class SubquerySuite extends QueryTest with SharedSQLContext { checkAnswer(df3, Seq(Row("a", 2, "a"), Row("a", 2, "b"))) } } + + test("SPARK-27279: Reuse Subquery") { + Seq(true, false).foreach { reuse => + withSQLConf(SQLConf.SUBQUERY_REUSE_ENABLED.key -> reuse.toString) { + val df = sql( + """ + |SELECT (SELECT avg(key) FROM testData) + (SELECT avg(key) FROM testData) + |FROM testData + |LIMIT 1 + """.stripMargin) + + df.explain(true) + var countSubqueryExec = 0 + var countReuseSubqueryExec = 0 + df.queryExecution.executedPlan.transformAllExpressions { + case s @ ScalarSubquery(_: SubqueryExec, _) => + countSubqueryExec = countSubqueryExec + 1 + s + case s @ ScalarSubquery(_: ReusedSubqueryExec, _) => + countReuseSubqueryExec = countReuseSubqueryExec + 1 + s + } + + if (reuse) { + assert(countSubqueryExec == 1, "Subquery reusing not working correctly") + assert(countReuseSubqueryExec == 1, "Subquery reusing not working correctly") + } else { + assert(countSubqueryExec == 2, "expect 2 SubqueryExec when not reusing") + assert(countReuseSubqueryExec == 0, + "expect 0 ReusedSubqueryExec when not reusing") + } + } + } + } } From 14f439b707fc18e7cd13c48dbecf0dc042fd2689 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 30 Mar 2019 19:53:57 -0700 Subject: [PATCH 2/3] remove explain --- sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index f8a5c55660ff3..28a9905459674 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -1349,7 +1349,6 @@ class SubquerySuite extends QueryTest with SharedSQLContext { |LIMIT 1 """.stripMargin) - df.explain(true) var countSubqueryExec = 0 var countReuseSubqueryExec = 0 df.queryExecution.executedPlan.transformAllExpressions { From 7036cfab887d30a2ee3dcd66a58d995dd64ae7bc Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 4 Apr 2019 14:20:12 -0700 Subject: [PATCH 3/3] address comments. --- .../spark/sql/catalyst/expressions/subquery.scala | 2 +- .../spark/sql/execution/basicPhysicalOperators.scala | 12 +++--------- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 3 +-- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index cb361ad5b532a..bf9f2970ded16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -65,7 +65,7 @@ abstract class SubqueryExpression( // Normalize the outer references in the subquery plan. val normalizedPlan = plan.transformAllExpressions { case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs)) - }.canonicalized + } withNewPlan(normalizedPlan).canonicalized.asInstanceOf[SubqueryExpression] } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 546640d2cb72d..f00a10b57cbcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -740,15 +740,9 @@ case class ReusedSubqueryExec(child: BaseSubqueryExec) override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = child.outputPartitioning - protected override def doPrepare(): Unit = { - child.prepare() - } + protected override def doPrepare(): Unit = child.prepare() - protected override def doExecute(): RDD[InternalRow] = { - child.execute() - } + protected override def doExecute(): RDD[InternalRow] = child.execute() - override def executeCollect(): Array[InternalRow] = { - child.executeCollect() - } + override def executeCollect(): Array[InternalRow] = child.executeCollect() } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e8d1eccd329d9..db896b3b36ac4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark.{AccumulatorSuite, SparkException} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.util.StringUtils -import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec} @@ -261,7 +260,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val df = sql(sqlText) // First, check if we have GeneratedAggregate. val hasGeneratedAgg = df.queryExecution.sparkPlan - .collect { case _: aggregate.HashAggregateExec => true } + .collect { case _: HashAggregateExec => true } .nonEmpty if (!hasGeneratedAgg) { fail(