From 65f8a329dab6e1b6a0383b49323cfe7de146dfaa Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 10 Mar 2021 15:38:17 -0800 Subject: [PATCH 1/2] Avoid unnecessary code generation in JoinCodegenSupport.genBuildSideVars --- .../joins/BroadcastHashJoinExec.scala | 2 +- .../execution/joins/JoinCodegenSupport.scala | 40 ++++++++++--------- .../execution/joins/ExistenceJoinSuite.scala | 2 +- .../sql/execution/joins/InnerJoinSuite.scala | 8 ++-- .../sql/execution/joins/OuterJoinSuite.scala | 2 +- 5 files changed, 29 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index 2a9e15851e9f1..cec1286c98a7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -46,7 +46,7 @@ case class BroadcastHashJoinExec( left: SparkPlan, right: SparkPlan, isNullAwareAntiJoin: Boolean = false) - extends HashJoin with CodegenSupport { + extends HashJoin { if (isNullAwareAntiJoin) { require(leftKeys.length == 1, "leftKeys length should be 1") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala index 6af4e119cb04e..96aa0be5f59ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.catalyst.expressions.{BindReferences, BoundReference} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.catalyst.plans.InnerLike +import org.apache.spark.sql.catalyst.plans.{ExistenceJoin, InnerLike, LeftAnti, LeftOuter, LeftSemi, RightOuter} import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan} /** @@ -73,23 +73,27 @@ trait JoinCodegenSupport extends CodegenSupport with BaseJoinExec { ctx.INPUT_ROW = buildRow buildPlan.output.zipWithIndex.map { case (a, i) => val ev = BoundReference(i, a.dataType, a.nullable).genCode(ctx) - if (joinType.isInstanceOf[InnerLike]) { - ev - } else { - // the variables are needed even there is no matched rows - val isNull = ctx.freshName("isNull") - val value = ctx.freshName("value") - val javaType = CodeGenerator.javaType(a.dataType) - val code = code""" - |boolean $isNull = true; - |$javaType $value = ${CodeGenerator.defaultValue(a.dataType)}; - |if ($buildRow != null) { - | ${ev.code} - | $isNull = ${ev.isNull}; - | $value = ${ev.value}; - |} - """.stripMargin - ExprCode(code, JavaCode.isNullVariable(isNull), JavaCode.variable(value, a.dataType)) + joinType match { + case _: InnerLike | LeftSemi | LeftAnti | _: ExistenceJoin => + ev + case LeftOuter | RightOuter => + // the variables are needed even there is no matched rows + val isNull = ctx.freshName("isNull") + val value = ctx.freshName("value") + val javaType = CodeGenerator.javaType(a.dataType) + val code = code""" + |boolean $isNull = true; + |$javaType $value = ${CodeGenerator.defaultValue(a.dataType)}; + |if ($buildRow != null) { + | ${ev.code} + | $isNull = ${ev.isNull}; + | $value = ${ev.value}; + |} + """.stripMargin + ExprCode(code, JavaCode.isNullVariable(isNull), JavaCode.variable(value, a.dataType)) + case _ => + throw new IllegalArgumentException( + s"JoinCodegenSupport.genBuildSideVars should not take $joinType as the JoinType") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala index fcbc0da9d5551..13848e5cde23b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala @@ -103,7 +103,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSparkSession { ProjectExec(output, FilterExec(condition, join)) } - test(s"$testName using ShuffledHashJoin") { + testWithWholeStageCodegenOnAndOff(s"$testName using ShuffledHashJoin") { _ => extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index f476c15f59983..5262320134319 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -153,7 +153,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession { } } - test(s"$testName using ShuffledHashJoin (build=left)") { + testWithWholeStageCodegenOnAndOff(s"$testName using ShuffledHashJoin (build=left)") { _ => extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) => @@ -165,7 +165,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession { } } - test(s"$testName using ShuffledHashJoin (build=right)") { + testWithWholeStageCodegenOnAndOff(s"$testName using ShuffledHashJoin (build=right)") { _ => extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) => @@ -198,7 +198,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession { } } - test(s"$testName using BroadcastNestedLoopJoin build left") { + testWithWholeStageCodegenOnAndOff(s"$testName using BroadcastNestedLoopJoin build left") { _ => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => BroadcastNestedLoopJoinExec(left, right, BuildLeft, Inner, Some(condition())), @@ -207,7 +207,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession { } } - test(s"$testName using BroadcastNestedLoopJoin build right") { + testWithWholeStageCodegenOnAndOff(s"$testName using BroadcastNestedLoopJoin build right") { _ => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => BroadcastNestedLoopJoinExec(left, right, BuildRight, Inner, Some(condition())), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 238d37afe1075..150d40d0301fc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -104,7 +104,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSparkSession { ExtractEquiJoinKeys.unapply(join) } - test(s"$testName using ShuffledHashJoin") { + testWithWholeStageCodegenOnAndOff(s"$testName using ShuffledHashJoin") { _ => extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { val buildSide = if (joinType == LeftOuter) BuildRight else BuildLeft From a0cb67694bf398e827f31e3df842501b71bc0207 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 10 Mar 2021 22:42:51 -0800 Subject: [PATCH 2/2] revert 3.1-compatible change pre comment --- .../spark/sql/execution/joins/BroadcastHashJoinExec.scala | 2 +- .../apache/spark/sql/execution/joins/ExistenceJoinSuite.scala | 2 +- .../org/apache/spark/sql/execution/joins/InnerJoinSuite.scala | 4 ++-- .../org/apache/spark/sql/execution/joins/OuterJoinSuite.scala | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index cec1286c98a7e..2a9e15851e9f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -46,7 +46,7 @@ case class BroadcastHashJoinExec( left: SparkPlan, right: SparkPlan, isNullAwareAntiJoin: Boolean = false) - extends HashJoin { + extends HashJoin with CodegenSupport { if (isNullAwareAntiJoin) { require(leftKeys.length == 1, "leftKeys length should be 1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala index 13848e5cde23b..fcbc0da9d5551 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala @@ -103,7 +103,7 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSparkSession { ProjectExec(output, FilterExec(condition, join)) } - testWithWholeStageCodegenOnAndOff(s"$testName using ShuffledHashJoin") { _ => + test(s"$testName using ShuffledHashJoin") { extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (left: SparkPlan, right: SparkPlan) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 5262320134319..316201c60db2e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -153,7 +153,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession { } } - testWithWholeStageCodegenOnAndOff(s"$testName using ShuffledHashJoin (build=left)") { _ => + test(s"$testName using ShuffledHashJoin (build=left)") { extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) => @@ -165,7 +165,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSparkSession { } } - testWithWholeStageCodegenOnAndOff(s"$testName using ShuffledHashJoin (build=right)") { _ => + test(s"$testName using ShuffledHashJoin (build=right)") { extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { checkAnswer2(leftRows, rightRows, (leftPlan: SparkPlan, rightPlan: SparkPlan) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 150d40d0301fc..238d37afe1075 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -104,7 +104,7 @@ class OuterJoinSuite extends SparkPlanTest with SharedSparkSession { ExtractEquiJoinKeys.unapply(join) } - testWithWholeStageCodegenOnAndOff(s"$testName using ShuffledHashJoin") { _ => + test(s"$testName using ShuffledHashJoin") { extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _) => withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") { val buildSide = if (joinType == LeftOuter) BuildRight else BuildLeft