From e4fdfd5ea1ccd2df4c9f27fc4d6998070d7e7699 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 8 Feb 2022 16:30:28 +0800 Subject: [PATCH 1/2] Materialize QueryPlan subqueries --- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 2417ff904570..ff4825269855 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.rules.RuleId import org.apache.spark.sql.catalyst.rules.UnknownRuleId import org.apache.spark.sql.catalyst.trees.{AlwaysProcess, CurrentOrigin, TreeNode, TreeNodeTag} -import org.apache.spark.sql.catalyst.trees.TreePattern.OUTER_REFERENCE +import org.apache.spark.sql.catalyst.trees.TreePattern.{OUTER_REFERENCE, PLAN_EXPRESSION} import org.apache.spark.sql.catalyst.trees.TreePatternBits import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} @@ -427,8 +427,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] /** * All the top-level subqueries of the current plan node. Nested subqueries are not included. */ - def subqueries: Seq[PlanType] = { - expressions.flatMap(_.collect { + lazy val subqueries: Seq[PlanType] = { + expressions.filter(_.containsPattern(PLAN_EXPRESSION)).flatMap(_.collect { case e: PlanExpression[_] => e.plan.asInstanceOf[PlanType] }) } From d7080ee6784c5a8521fd5ee76b67992167247c00 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 8 Feb 2022 18:59:50 +0800 Subject: [PATCH 2/2] improve --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 2 +- .../sql/execution/adaptive/InsertAdaptiveSparkPlan.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index ff4825269855..58f2425e5370 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -427,7 +427,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] /** * All the top-level subqueries of the current plan node. Nested subqueries are not included. */ - lazy val subqueries: Seq[PlanType] = { + @transient lazy val subqueries: Seq[PlanType] = { expressions.filter(_.containsPattern(PLAN_EXPRESSION)).flatMap(_.collect { case e: PlanExpression[_] => e.plan.asInstanceOf[PlanType] }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 68042d838410..5c208457004c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{ListQuery, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{DYNAMIC_PRUNING_SUBQUERY, IN_SUBQUERY, SCALAR_SUBQUERY} +import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec} import org.apache.spark.sql.execution.datasources.v2.V2CommandExec @@ -118,7 +118,7 @@ case class InsertAdaptiveSparkPlan( if (!plan.containsAnyPattern(SCALAR_SUBQUERY, IN_SUBQUERY, DYNAMIC_PRUNING_SUBQUERY)) { return subqueryMap.toMap } - plan.foreach(_.expressions.foreach(_.foreach { + plan.foreach(_.expressions.filter(_.containsPattern(PLAN_EXPRESSION)).foreach(_.foreach { case expressions.ScalarSubquery(p, _, exprId, _) if !subqueryMap.contains(exprId.id) => val executedPlan = compileSubquery(p)