From 4fb10ab43370af48e3f7a78bff825907546b3dbe Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 4 Oct 2022 08:47:29 +0200 Subject: [PATCH 1/5] Revert "[SPARK-40618][SQL] Fix bug in MergeScalarSubqueries rule with nested subqueries" This reverts commit 9ac9cd56087d6b35e0437986fea4e4aaf4867185. --- .../optimizer/MergeScalarSubqueries.scala | 6 ------ .../org/apache/spark/sql/SubquerySuite.scala | 20 +++++-------------- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 69f77e8f3f460..4369ad9f96a6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -210,12 +210,6 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { cachedPlan: LogicalPlan): Option[(LogicalPlan, AttributeMap[Attribute])] = { checkIdenticalPlans(newPlan, cachedPlan).map(cachedPlan -> _).orElse( (newPlan, cachedPlan) match { - case (_, _) if newPlan.containsPattern(SCALAR_SUBQUERY_REFERENCE) || - cachedPlan.containsPattern(SCALAR_SUBQUERY_REFERENCE) => - // Subquery expressions with nested subquery expressions within are not supported for now. - // TODO: support this optimization by collecting the transitive subquery references in the - // new plan and recording them in order to suppress merging the new plan into those. - None case (np: Project, cp: Project) => tryMergePlans(np.child, cp.child).map { case (mergedChild, outputMap) => val (mergedProjectList, newOutputMap) = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 765c2a5223759..b6b65c66bef9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2157,7 +2157,7 @@ class SubquerySuite extends QueryTest } } - test("SPARK-40618: Do not merge scalar subqueries with nested subqueries inside") { + test("Merge non-correlated scalar subqueries from different parent plans") { Seq(false, true).foreach { enableAQE => withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString) { @@ -2189,13 +2189,13 @@ class SubquerySuite extends QueryTest } if (enableAQE) { - assert(subqueryIds.size == 4, "Missing or unexpected SubqueryExec in the plan") - assert(reusedSubqueryIds.size == 2, - "Missing or unexpected reused ReusedSubqueryExec in the plan") - } else { assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in the plan") assert(reusedSubqueryIds.size == 3, "Missing or unexpected reused ReusedSubqueryExec in the plan") + } else { + assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in the plan") + assert(reusedSubqueryIds.size == 4, + "Missing or unexpected reused ReusedSubqueryExec in the plan") } } } @@ -2327,14 +2327,4 @@ class SubquerySuite extends QueryTest assert(findProject(df2).size == 3) } } - - test("SPARK-40618: Regression test for merging subquery bug with nested subqueries") { - // This test contains a subquery expression with another subquery expression nested inside. - // It acts as a regression test to ensure that the MergeScalarSubqueries rule does not attempt - // to merge them together. - withTable("t") { - sql("create table t(col int) using csv") - checkAnswer(sql("select(select sum((select sum(col) from t)) from t)"), Row(null)) - } - } } From 51cac97a7bf20d0af837f4a943fb6362f771010c Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Mon, 3 Oct 2022 10:00:39 +0200 Subject: [PATCH 2/5] fix --- .../optimizer/MergeScalarSubqueries.scala | 54 ++++++++++++------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 4369ad9f96a6c..186c9b3c38e21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions._ @@ -127,7 +128,11 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { * false) due to an extra [[Project]] node in one of them. In that case * `attributes.size` remains 1 after merging, but the merged flag becomes true. */ - case class Header(attributes: Seq[Attribute], plan: LogicalPlan, merged: Boolean) + case class Header( + attributes: Seq[Attribute], + plan: LogicalPlan, + merged: Boolean, + references: Set[Int]) private def extractCommonScalarSubqueries(plan: LogicalPlan) = { val cache = ArrayBuffer.empty[Header] @@ -166,26 +171,39 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { // "Header". private def cacheSubquery(plan: LogicalPlan, cache: ArrayBuffer[Header]): (Int, Int) = { val output = plan.output.head - cache.zipWithIndex.collectFirst(Function.unlift { case (header, subqueryIndex) => - checkIdenticalPlans(plan, header.plan).map { outputMap => - val mappedOutput = mapAttributes(output, outputMap) - val headerIndex = header.attributes.indexWhere(_.exprId == mappedOutput.exprId) - subqueryIndex -> headerIndex - }.orElse(tryMergePlans(plan, header.plan).map { - case (mergedPlan, outputMap) => + val references = mutable.HashSet.empty[Int] + plan.transformAllExpressionsWithPruning(_.containsAnyPattern(SCALAR_SUBQUERY_REFERENCE)) { + case ssr: ScalarSubqueryReference => + references += ssr.subqueryIndex + references ++= cache(ssr.subqueryIndex).references + ssr + } + + cache.zipWithIndex.collectFirst(Function.unlift { + case (header, subqueryIndex) if !references.contains(subqueryIndex) => + checkIdenticalPlans(plan, header.plan).map { outputMap => val mappedOutput = mapAttributes(output, outputMap) - var headerIndex = header.attributes.indexWhere(_.exprId == mappedOutput.exprId) - val newHeaderAttributes = if (headerIndex == -1) { - headerIndex = header.attributes.size - header.attributes :+ mappedOutput - } else { - header.attributes - } - cache(subqueryIndex) = Header(newHeaderAttributes, mergedPlan, true) + val headerIndex = header.attributes.indexWhere(_.exprId == mappedOutput.exprId) subqueryIndex -> headerIndex - }) + }.orElse{ + tryMergePlans(plan, header.plan).map { + case (mergedPlan, outputMap) => + val mappedOutput = mapAttributes(output, outputMap) + var headerIndex = header.attributes.indexWhere(_.exprId == mappedOutput.exprId) + val newHeaderAttributes = if (headerIndex == -1) { + headerIndex = header.attributes.size + header.attributes :+ mappedOutput + } else { + header.attributes + } + cache(subqueryIndex) = + Header(newHeaderAttributes, mergedPlan, true, header.references ++ references) + subqueryIndex -> headerIndex + } + } + case _ => None }).getOrElse { - cache += Header(Seq(output), plan, false) + cache += Header(Seq(output), plan, false, references.toSet) cache.length - 1 -> 0 } } From dbcc46845430d596a4d4309e094926dafef93c08 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 4 Oct 2022 09:07:42 +0200 Subject: [PATCH 3/5] add back test --- .../scala/org/apache/spark/sql/SubquerySuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index b6b65c66bef9a..79c8173c413b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2327,4 +2327,14 @@ class SubquerySuite extends QueryTest assert(findProject(df2).size == 3) } } + + test("SPARK-40618: Regression test for merging subquery bug with nested subqueries") { + // This test contains a subquery expression with another subquery expression nested inside. + // It acts as a regression test to ensure that the MergeScalarSubqueries rule does not attempt + // to merge them together. + withTable("t") { + sql("create table t(col int) using csv") + checkAnswer(sql("select(select sum((select sum(col) from t)) from t)"), Row(null)) + } + } } From 2ef522c1be5da3b5d89f00c210c1320b814ba2fa Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 4 Oct 2022 10:03:18 +0200 Subject: [PATCH 4/5] add transitive references test --- .../org/apache/spark/sql/SubquerySuite.scala | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 79c8173c413b7..a86ddf388a9b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -2332,9 +2332,32 @@ class SubquerySuite extends QueryTest // This test contains a subquery expression with another subquery expression nested inside. // It acts as a regression test to ensure that the MergeScalarSubqueries rule does not attempt // to merge them together. - withTable("t") { + withTable("t", "t2") { sql("create table t(col int) using csv") checkAnswer(sql("select(select sum((select sum(col) from t)) from t)"), Row(null)) + + checkAnswer(sql( + """ + |select + | (select sum( + | (select sum( + | (select sum(col) from t)) + | from t)) + | from t) + |""".stripMargin), + Row(null)) + + sql("create table t2(col int) using csv") + checkAnswer(sql( + """ + |select + | (select sum( + | (select sum( + | (select sum(col) from t)) + | from t2)) + | from t) + |""".stripMargin), + Row(null)) } } } From 2faae155fb65019e4091e394481c04a2bd4727a3 Mon Sep 17 00:00:00 2001 From: Peter Toth Date: Tue, 4 Oct 2022 10:30:45 +0200 Subject: [PATCH 5/5] add scaladoc --- .../spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 186c9b3c38e21..1cb3f3f157cfd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -127,6 +127,8 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { * merged as there can be subqueries that are different ([[checkIdenticalPlans]] is * false) due to an extra [[Project]] node in one of them. In that case * `attributes.size` remains 1 after merging, but the merged flag becomes true. + * @param references A set of subquery indexes in the cache to track all (including transitive) + * nested subqueries. */ case class Header( attributes: Seq[Attribute],