From ea7def39894d995bb7d8b39a408269937e68cccf Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 19 Dec 2020 05:36:51 +0900 Subject: [PATCH 1/8] Fix an issue that EXPLAIN CODEGEN doesn't show subquery code. --- .../spark/sql/execution/debug/package.scala | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 6c40104e52a5f..4a963562ebb97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -107,12 +107,19 @@ package object debug { */ def codegenStringSeq(plan: SparkPlan): Seq[(String, String, ByteCodeStats)] = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() - plan transform { - case s: WholeStageCodegenExec => - codegenSubtrees += s - s - case s => s + + def findSubtrees(plan: SparkPlan): Unit = { + plan transform { + case s: WholeStageCodegenExec => + codegenSubtrees += s + s + case s => + s.subqueries.foreach(findSubtrees) + s + } } + + findSubtrees(plan) codegenSubtrees.toSeq.sortBy(_.codegenStageId).map { subtree => val (_, source) = subtree.doCodeGen() val codeStats = try { From b0d34e7e8df5333849993b569bd14366635df12a Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 01:05:28 +0900 Subject: [PATCH 2/8] Add test. --- .../org/apache/spark/sql/ExplainSuite.scala | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 75372c5437f25..082882f4396ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -228,6 +228,28 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } + test("explain codegen - check presence of subquery") { + withTable("df1", "df2") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + withTable("df1") { + spark.range(1, 100) + .write + .format("parquet") + .mode("overwrite") + .saveAsTable("df1") + + val sqlText = "EXPLAIN CODEGEN SELECT (SELECT min(id) FROM df1)" + val expectedText = "Found 3 WholeStageCodegen subtrees." + + withNormalizedExplain(sqlText) { normalizedOutput => + assert(normalizedOutput.contains(expectedText)) + } + } + } + } + } + test("explain formatted - check presence of subquery in case of DPP") { withTable("df1", "df2") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", From 09c6fd1066b004858adbf121068a0eb2c51b79a6 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 06:36:19 +0900 Subject: [PATCH 3/8] Add JIRA ID to the test. --- sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 082882f4396ed..94ed02b3ee2b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -228,7 +228,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } } - test("explain codegen - check presence of subquery") { + test("SPARK-33853: explain codegen - check presence of subquery") { withTable("df1", "df2") { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { From 26551bfd3019f004af05e0b6b676bbc60a7bba38 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 14:26:23 +0900 Subject: [PATCH 4/8] Remove SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false". --- .../org/apache/spark/sql/ExplainSuite.scala | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 94ed02b3ee2b7..7704718f1d402 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -229,22 +229,19 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite } test("SPARK-33853: explain codegen - check presence of subquery") { - withTable("df1", "df2") { - withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - withTable("df1") { - spark.range(1, 100) - .write - .format("parquet") - .mode("overwrite") - .saveAsTable("df1") - - val sqlText = "EXPLAIN CODEGEN SELECT (SELECT min(id) FROM df1)" - val expectedText = "Found 3 WholeStageCodegen subtrees." - - withNormalizedExplain(sqlText) { normalizedOutput => - assert(normalizedOutput.contains(expectedText)) - } + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { + withTable("df1") { + spark.range(1, 100) + .write + .format("parquet") + .mode("overwrite") + .saveAsTable("df1") + + val sqlText = "EXPLAIN CODEGEN SELECT (SELECT min(id) FROM df1)" + val expectedText = "Found 3 WholeStageCodegen subtrees." + + withNormalizedExplain(sqlText) { normalizedOutput => + assert(normalizedOutput.contains(expectedText)) } } } From 0f6a53b1e9f4f3fd01c0637793b3b0c6a83b3c70 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 15:18:39 +0900 Subject: [PATCH 5/8] Apply the change to BenchmarkQueryTest too. --- .../apache/spark/sql/execution/debug/package.scala | 4 +--- .../org/apache/spark/sql/BenchmarkQueryTest.scala | 14 ++++++++++---- .../scala/org/apache/spark/sql/ExplainSuite.scala | 7 ++----- 3 files changed, 13 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 4a963562ebb97..3cbebca14f7dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -109,13 +109,11 @@ package object debug { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() def findSubtrees(plan: SparkPlan): Unit = { - plan transform { + plan foreach { case s: WholeStageCodegenExec => codegenSubtrees += s - s case s => s.subqueries.foreach(findSubtrees) - s } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala index 2c3b37a1498ec..e3adac9e116ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala @@ -63,11 +63,17 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSparkSession { protected def checkGeneratedCode(plan: SparkPlan, checkMethodCodeSize: Boolean = true): Unit = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() - plan foreach { - case s: WholeStageCodegenExec => - codegenSubtrees += s - case _ => + + def findSubtrees(paln: SparkPlan): Unit = { + plan foreach { + case s: WholeStageCodegenExec => + codegenSubtrees += s + case s => + s.subqueries.foreach(findSubtrees) + } } + + findSubtrees(plan) codegenSubtrees.toSeq.foreach { subtree => val code = subtree.doCodeGen()._2 val (_, ByteCodeStats(maxMethodCodeSize, _, _)) = try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 7704718f1d402..22a6c320ee1bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -231,11 +231,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite test("SPARK-33853: explain codegen - check presence of subquery") { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { withTable("df1") { - spark.range(1, 100) - .write - .format("parquet") - .mode("overwrite") - .saveAsTable("df1") + val df1 = spark.range(1, 100) + df1.createTempView("df1") val sqlText = "EXPLAIN CODEGEN SELECT (SELECT min(id) FROM df1)" val expectedText = "Found 3 WholeStageCodegen subtrees." From 022d363f7abb8082fa6cec4a29370c86a9aeffc7 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 20 Dec 2020 15:39:37 +0900 Subject: [PATCH 6/8] Minor change. --- .../src/test/scala/org/apache/spark/sql/ExplainSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 22a6c320ee1bd..19beb92fc029f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -230,11 +230,11 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite test("SPARK-33853: explain codegen - check presence of subquery") { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { - withTable("df1") { + withTable("df") { val df1 = spark.range(1, 100) - df1.createTempView("df1") + df1.createTempView("df") - val sqlText = "EXPLAIN CODEGEN SELECT (SELECT min(id) FROM df1)" + val sqlText = "EXPLAIN CODEGEN SELECT (SELECT min(id) FROM df)" val expectedText = "Found 3 WholeStageCodegen subtrees." withNormalizedExplain(sqlText) { normalizedOutput => From c338832464d73fc31c4859553278aa5c28b76a38 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 21 Dec 2020 02:44:19 +0900 Subject: [PATCH 7/8] Fix typo. --- .../test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala index e3adac9e116ee..d58bf2c6260b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/BenchmarkQueryTest.scala @@ -64,7 +64,7 @@ abstract class BenchmarkQueryTest extends QueryTest with SharedSparkSession { protected def checkGeneratedCode(plan: SparkPlan, checkMethodCodeSize: Boolean = true): Unit = { val codegenSubtrees = new collection.mutable.HashSet[WholeStageCodegenExec]() - def findSubtrees(paln: SparkPlan): Unit = { + def findSubtrees(plan: SparkPlan): Unit = { plan foreach { case s: WholeStageCodegenExec => codegenSubtrees += s From 13cadebb9673c91ae5405e601a6f931579a5e675 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 21 Dec 2020 03:01:41 +0900 Subject: [PATCH 8/8] withTable -> withTempView --- sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 19beb92fc029f..1af0758db4d81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -230,7 +230,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite test("SPARK-33853: explain codegen - check presence of subquery") { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true") { - withTable("df") { + withTempView("df") { val df1 = spark.range(1, 100) df1.createTempView("df")