From 0dc1cb2a260d10456dd23d13e00ca8c353394194 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 19 Aug 2020 22:57:32 -0700 Subject: [PATCH 01/10] Optimize BHJ/SHJ inner/semi join with empty hashed relation --- .../spark/sql/execution/joins/HashJoin.scala | 24 ++++++--- .../org/apache/spark/sql/JoinSuite.scala | 54 ++++++++++++++++++- 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 1a7554c905c6c..d3991ed74c1c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -230,7 +230,9 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { val joinKeys = streamSideKeyGenerator() val joinedRow = new JoinedRow - if (hashedRelation.keyIsUnique) { + if (hashedRelation == EmptyHashedRelation) { + Iterator.empty + } else if (hashedRelation.keyIsUnique) { streamIter.filter { current => val key = joinKeys(current) lazy val matched = hashedRelation.getValue(key) @@ -432,7 +434,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { * Generates the code for Inner join. */ protected def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val HashedRelationInfo(relationTerm, keyIsUnique, _) = prepareRelation(ctx) + val HashedRelationInfo(relationTerm, keyIsUnique, isEmptyHashedRelation) = prepareRelation(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input) val numOutput = metricTerm(ctx, "numOutputRows") @@ -442,7 +444,11 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { case BuildRight => input ++ buildVars } - if (keyIsUnique) { + if (isEmptyHashedRelation) { + s""" + |// If HashedRelation is empty, hash inner join simply returns nothing. + """.stripMargin + } else if (keyIsUnique) { s""" |// generate join key for stream side |${keyEv.code} @@ -559,12 +565,16 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { * Generates the code for left semi join. */ protected def codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val HashedRelationInfo(relationTerm, keyIsUnique, _) = prepareRelation(ctx) + val HashedRelationInfo(relationTerm, keyIsUnique, isEmptyHashedRelation) = prepareRelation(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) val (matched, checkCondition, _) = getJoinCondition(ctx, input) val numOutput = metricTerm(ctx, "numOutputRows") - if (keyIsUnique) { + if (isEmptyHashedRelation) { + s""" + |// If HashedRelation is empty, hash semi join simply returns nothing. + """.stripMargin + } else if (keyIsUnique) { s""" |// generate join key for stream side |${keyEv.code} @@ -612,10 +622,10 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { val numOutput = metricTerm(ctx, "numOutputRows") if (isEmptyHashedRelation) { return s""" - |// If the right side is empty, Anti Join simply returns the left side. + |// If HashedRelation is empty, hash anti join simply returns the stream side. |$numOutput.add(1); |${consume(ctx, input)} - |""".stripMargin + """.stripMargin } val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index e7629a21f787a..93cd84713296b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.{Ascending, GenericRow, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.Filter -import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, SortExec, SparkPlan} +import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins._ @@ -1254,4 +1254,56 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } } } + + test("SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation") { + val inputDFs = Seq( + // Test empty build side for inner join + (spark.range(30).selectExpr("id as k1"), + spark.range(10).selectExpr("id as k2").filter("k2 < -1"), + "inner"), + // Test empty build side for semi join + (spark.range(30).selectExpr("id as k1"), + spark.range(10).selectExpr("id as k2").filter("k2 < -1"), + "semi") + ) + inputDFs.foreach { case (df1, df2, joinType) => + // Test broadcast hash join + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") { + val bhjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) + assert(bhjCodegenDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : BroadcastHashJoinExec) => true + case WholeStageCodegenExec(ProjectExec(_, _ : BroadcastHashJoinExec)) => true + }.size === 1) + checkAnswer(bhjCodegenDF, Seq.empty) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val bhjNonCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) + assert(bhjNonCodegenDF.queryExecution.executedPlan.collect { + case _: BroadcastHashJoinExec => true }.size === 1) + checkAnswer(bhjNonCodegenDF, Seq.empty) + } + } + + // Test shuffled hash join + withSQLConf(SQLConf.PREFER_SORTMERGEJOIN.key -> "false", + // Set broadcast join threshold and number of shuffle partitions, + // as shuffled hash join depends on these two configs. + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", + SQLConf.SHUFFLE_PARTITIONS.key -> "2") { + val shjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) + assert(shjCodegenDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true + case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true + }.size === 1) + checkAnswer(shjCodegenDF, Seq.empty) + + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { + val shjNonCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) + assert(shjNonCodegenDF.queryExecution.executedPlan.collect { + case _: ShuffledHashJoinExec => true }.size === 1) + checkAnswer(shjNonCodegenDF, Seq.empty) + } + } + } + } } From bee7e1ee7583034e0a2cb35def58f1d0fd06fc14 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 19 Aug 2020 23:04:48 -0700 Subject: [PATCH 02/10] Fix code --- .../scala/org/apache/spark/sql/execution/joins/HashJoin.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index d3991ed74c1c9..888cc4d04c73e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -155,7 +155,9 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { val joinRow = new JoinedRow val joinKeys = streamSideKeyGenerator() - if (hashedRelation.keyIsUnique) { + if (hashedRelation == EmptyHashedRelation) { + Iterator.empty + } else if (hashedRelation.keyIsUnique) { streamIter.flatMap { srow => joinRow.withLeft(srow) val matched = hashedRelation.getValue(joinKeys(srow)) From 5c6e9ce60ac52d9261b36df54c4cbf44c657d3d9 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 20 Aug 2020 21:41:35 -0700 Subject: [PATCH 03/10] Handle empty hashed relation in AQE optimization rule as well --- .../adaptive/AdaptiveSparkPlanExec.scala | 2 +- .../adaptive/EliminateNullAwareAntiJoin.scala | 41 -------------- .../OptimizeJoinToEmptyRelation.scala | 55 +++++++++++++++++++ .../adaptive/AdaptiveQueryExecSuite.scala | 32 +++++++++++ 4 files changed, 88 insertions(+), 42 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index a4ed3d5683185..fa07664ffd4b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -80,7 +80,7 @@ case class AdaptiveSparkPlanExec( // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), - Batch("Eliminate Null Aware Anti Join", Once, EliminateNullAwareAntiJoin) + Batch("Optimize Join to Empty Relation", Once, OptimizeJoinToEmptyRelation) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala deleted file mode 100644 index afccde09040a4..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateNullAwareAntiJoin.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.adaptive - -import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.joins.HashedRelationWithAllNullKeys - -/** - * This optimization rule detects and convert a NAAJ to an Empty LocalRelation - * when buildSide is HashedRelationWithAllNullKeys. - */ -object EliminateNullAwareAntiJoin extends Rule[LogicalPlan] { - - private def canEliminate(plan: LogicalPlan): Boolean = plan match { - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined - && stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys => true - case _ => false - } - - def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { - case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) if canEliminate(j.right) => - LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala new file mode 100644 index 0000000000000..6e25737b81fbd --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin +import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, HashedRelationWithAllNullKeys} + +/** + * This optimization rule detects and converts a `Join` to an empty `LocalRelation`: + * 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation` + * is `EmptyHashedRelationWithAllNullKeys`. + * + * 2. `Join` is inner or left semi join, and broadcasted `HashedRelation` is `EmptyHashedRelation`. + */ +object OptimizeJoinToEmptyRelation extends Rule[LogicalPlan] { + + private def canEliminate( + plan: LogicalPlan, + expectedRelation: HashedRelation): Boolean = plan match { + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined + && stage.broadcast.relationFuture.get().value == expectedRelation => true + case _ => false + } + + def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { + case j @ ExtractSingleColumnNullAwareAntiJoin(_, _) + if canEliminate(j.right, HashedRelationWithAllNullKeys) => + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + + case j @ Join(_, _, Inner, _, _) if canEliminate(j.left, EmptyHashedRelation) || + canEliminate(j.right, EmptyHashedRelation) => + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + + case j @ Join(_, _, LeftSemi, _, _) if canEliminate(j.right, EmptyHashedRelation) => + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 1dc239c0416f8..f766cb0daefcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1181,4 +1181,36 @@ class AdaptiveQueryExecSuite checkNumLocalShuffleReaders(adaptivePlan) } } + + test("SPARK-32573, SPARK-32649: optimize join to empty relation") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { + // Test NULL-aware anti join + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)") + val bhj = findTopLevelBroadcastHashJoin(plan) + assert(bhj.size == 1) + val join = findTopLevelBaseJoin(adaptivePlan) + assert(join.isEmpty) + checkNumLocalShuffleReaders(adaptivePlan) + + // Test inner and left semi join + Seq( + // inner join (small table at right side) + "SELECT * FROM testData t1 join testData3 t2 ON t1.key = t2.a WHERE t2.b = 1", + // inner join (small table at left side) + "SELECT * FROM testData3 t1 join testData t2 ON t1.a = t2.key WHERE t1.b = 1", + // left semi join + "SELECT * FROM testData t1 left semi join testData3 t2 ON t1.key = t2.a AND t2.b = 1" + ).foreach(query => { + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) + val smj = findTopLevelSortMergeJoin(plan) + assert(smj.size == 1) + val join = findTopLevelBaseJoin(adaptivePlan) + assert(join.isEmpty) + checkNumLocalShuffleReaders(adaptivePlan) + }) + } + } } From 9f3703274dc98b7918f3b524bb8de907ecc26631 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 20 Aug 2020 23:48:18 -0700 Subject: [PATCH 04/10] Fix unit test in AdaptiveQueryExecSuite --- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index f766cb0daefcd..d38f5da0e3bb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -226,7 +226,8 @@ class AdaptiveQueryExecSuite val df1 = spark.range(10).withColumn("a", 'id) val df2 = spark.range(10).withColumn("b", 'id) withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - val testDf = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count() + val testDf = df1.where('a > 10).join(df2.where('b > 10), Seq("id"), "left_outer") + .groupBy('a).count() checkAnswer(testDf, Seq()) val plan = testDf.queryExecution.executedPlan assert(find(plan)(_.isInstanceOf[SortMergeJoinExec]).isDefined) @@ -238,7 +239,8 @@ class AdaptiveQueryExecSuite } withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { - val testDf = df1.where('a > 10).join(df2.where('b > 10), "id").groupBy('a).count() + val testDf = df1.where('a > 10).join(df2.where('b > 10), Seq("id"), "left_outer") + .groupBy('a).count() checkAnswer(testDf, Seq()) val plan = testDf.queryExecution.executedPlan assert(find(plan)(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) From ac5d34ddcb3186bd7c6cdbf3b5d8facb769b51f0 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 21 Aug 2020 10:59:42 -0700 Subject: [PATCH 05/10] Address comments --- .../adaptive/AdaptiveSparkPlanExec.scala | 2 +- ...scala => EliminateJoinToEmptyRelation.scala} | 17 ++++++++--------- .../spark/sql/execution/joins/HashJoin.scala | 12 ++++++------ .../adaptive/AdaptiveQueryExecSuite.scala | 13 +++++++++---- 4 files changed, 24 insertions(+), 20 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/{OptimizeJoinToEmptyRelation.scala => EliminateJoinToEmptyRelation.scala} (77%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index fa07664ffd4b6..d4018f8ce3a95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -80,7 +80,7 @@ case class AdaptiveSparkPlanExec( // TODO add more optimization rules override protected def batches: Seq[Batch] = Seq( Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)), - Batch("Optimize Join to Empty Relation", Once, OptimizeJoinToEmptyRelation) + Batch("Eliminate Join to Empty Relation", Once, EliminateJoinToEmptyRelation) ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala similarity index 77% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala index 6e25737b81fbd..cffb3d20f7fbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala @@ -24,19 +24,18 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, HashedRelationWithAllNullKeys} /** - * This optimization rule detects and converts a `Join` to an empty `LocalRelation`: - * 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation` - * is `EmptyHashedRelationWithAllNullKeys`. + * This optimization rule detects and converts a Join to an empty [[LocalRelation]]: + * 1. Join is single column NULL-aware anti join (NAAJ), and broadcasted [[HashedRelation]] + * is [[HashedRelationWithAllNullKeys]]. * - * 2. `Join` is inner or left semi join, and broadcasted `HashedRelation` is `EmptyHashedRelation`. + * 2. Join is inner or left semi join, and broadcasted [[HashedRelation]] + is [[EmptyHashedRelation]]. */ -object OptimizeJoinToEmptyRelation extends Rule[LogicalPlan] { +object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { - private def canEliminate( - plan: LogicalPlan, - expectedRelation: HashedRelation): Boolean = plan match { + private def canEliminate(plan: LogicalPlan, relation: HashedRelation): Boolean = plan match { case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined - && stage.broadcast.relationFuture.get().value == expectedRelation => true + && stage.broadcast.relationFuture.get().value == relation => true case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 888cc4d04c73e..085cc29289ddd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -447,9 +447,9 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { } if (isEmptyHashedRelation) { - s""" - |// If HashedRelation is empty, hash inner join simply returns nothing. - """.stripMargin + """ + |// If HashedRelation is empty, hash inner join simply returns nothing. + """.stripMargin } else if (keyIsUnique) { s""" |// generate join key for stream side @@ -573,9 +573,9 @@ trait HashJoin extends BaseJoinExec with CodegenSupport { val numOutput = metricTerm(ctx, "numOutputRows") if (isEmptyHashedRelation) { - s""" - |// If HashedRelation is empty, hash semi join simply returns nothing. - """.stripMargin + """ + |// If HashedRelation is empty, hash semi join simply returns nothing. + """.stripMargin } else if (keyIsUnique) { s""" |// generate join key for stream side diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index d38f5da0e3bb2..c1f5132c6ee6e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1170,6 +1170,7 @@ class AdaptiveQueryExecSuite } } +<<<<<<< HEAD test("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", @@ -1184,11 +1185,10 @@ class AdaptiveQueryExecSuite } } - test("SPARK-32573, SPARK-32649: optimize join to empty relation") { + test("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { - // Test NULL-aware anti join + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) { val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)") val bhj = findTopLevelBroadcastHashJoin(plan) @@ -1196,8 +1196,13 @@ class AdaptiveQueryExecSuite val join = findTopLevelBaseJoin(adaptivePlan) assert(join.isEmpty) checkNumLocalShuffleReaders(adaptivePlan) + } + } - // Test inner and left semi join + test("SPARK-32649: Eliminate inner and semi join to empty relation") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") { Seq( // inner join (small table at right side) "SELECT * FROM testData t1 join testData3 t2 ON t1.key = t2.a WHERE t2.b = 1", From c82d835d765b4adac84b40468bfa88b01c7b8ee5 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 21 Aug 2020 16:04:17 -0700 Subject: [PATCH 06/10] Fix comment --- .../sql/execution/adaptive/EliminateJoinToEmptyRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala index cffb3d20f7fbc..6133870fc836e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation * is [[HashedRelationWithAllNullKeys]]. * * 2. Join is inner or left semi join, and broadcasted [[HashedRelation]] - is [[EmptyHashedRelation]]. + * is [[EmptyHashedRelation]]. */ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { From ddd30e0b9ffc1fa2d9144975eb513c5fcf90742a Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Fri, 21 Aug 2020 16:35:34 -0700 Subject: [PATCH 07/10] Add comment for SMJ/SHJ changed to BHJ --- .../sql/execution/adaptive/EliminateJoinToEmptyRelation.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala index 6133870fc836e..cfdd20ec7565d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala @@ -30,6 +30,9 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation * * 2. Join is inner or left semi join, and broadcasted [[HashedRelation]] * is [[EmptyHashedRelation]]. + * This applies to all Joins (sort merge join, shuffled hash join, and broadcast hash join), + * because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE + * at the first place. */ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { From 1f328a79649eaeeb925809f12516f0f5618fdf2a Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sat, 22 Aug 2020 23:10:56 -0700 Subject: [PATCH 08/10] Rebase to latest master to resolve conflict --- .../sql/execution/adaptive/EliminateJoinToEmptyRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala index cfdd20ec7565d..460ddfed4562d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala @@ -36,9 +36,9 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation */ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { - private def canEliminate(plan: LogicalPlan, relation: HashedRelation): Boolean = plan match { + private def canEliminate(plan: LogicalPlan, emptyRelation: HashedRelation): Boolean = plan match { case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined - && stage.broadcast.relationFuture.get().value == relation => true + && stage.broadcast.relationFuture.get().value == emptyRelation => true case _ => false } From a7cdf6eb662a5d5d3005486c548b989ba4ed5ff6 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sat, 22 Aug 2020 23:11:31 -0700 Subject: [PATCH 09/10] Rename emptyRelation to relation in EliminateJoinToEmptyRelation --- .../sql/execution/adaptive/EliminateJoinToEmptyRelation.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala index 460ddfed4562d..cfdd20ec7565d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala @@ -36,9 +36,9 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation */ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { - private def canEliminate(plan: LogicalPlan, emptyRelation: HashedRelation): Boolean = plan match { + private def canEliminate(plan: LogicalPlan, relation: HashedRelation): Boolean = plan match { case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined - && stage.broadcast.relationFuture.get().value == emptyRelation => true + && stage.broadcast.relationFuture.get().value == relation => true case _ => false } From 496eda790c95b1c9895967b7181d582fa87da0ff Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sat, 22 Aug 2020 23:17:24 -0700 Subject: [PATCH 10/10] Fix the rebase error --- .../adaptive/AdaptiveQueryExecSuite.scala | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index c1f5132c6ee6e..3bd079cf65433 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1170,21 +1170,6 @@ class AdaptiveQueryExecSuite } } -<<<<<<< HEAD - test("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") { - withSQLConf( - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) { - val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( - "SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)") - val bhj = findTopLevelBroadcastHashJoin(plan) - assert(bhj.size == 1) - val join = findTopLevelBaseJoin(adaptivePlan) - assert(join.isEmpty) - checkNumLocalShuffleReaders(adaptivePlan) - } - } - test("SPARK-32573: Eliminate NAAJ when BuildSide is HashedRelationWithAllNullKeys") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",