From ead65c80461d65ce9fad4d091c0da92a060a0972 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 24 Feb 2021 21:08:26 -0800 Subject: [PATCH 1/5] Eliminate LEFT ANTI join to empty relation in AQE --- .../EliminateJoinToEmptyRelation.scala | 24 +++++++++++++- .../adaptive/AdaptiveQueryExecSuite.scala | 32 ++++++++++++++++++- 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala index cfdd20ec7565..9f7d8ace8e9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.execution.adaptive +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin -import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} +import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, HashedRelationWithAllNullKeys} @@ -33,6 +34,9 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation * This applies to all Joins (sort merge join, shuffled hash join, and broadcast hash join), * because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE * at the first place. + * + * 3. Join is left anti join without condition, and broadcasted join right side is not empty. + * This applies to broadcast nested loop join only. */ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { @@ -53,5 +57,23 @@ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { case j @ Join(_, _, LeftSemi, _, _) if canEliminate(j.right, EmptyHashedRelation) => LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + + case j @ Join(_, _, LeftAnti, None, _) => + val isNonEmptyBroadcastedRightSide = j.right match { + case LogicalQueryStage(_, stage: BroadcastQueryStageExec) + if stage.resultOption.get().isDefined => + stage.broadcast.relationFuture.get().value match { + // Match with Array[InternalRow] as this is the type of broadcast result + // in [[BroadcastNestedLoopJoinExec]]. + case v: Array[InternalRow] => v.nonEmpty + case _ => false + } + case _ => false + } + if (isNonEmptyBroadcastedRightSide) { + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + } else { + j + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 122bc2d1e59a..b9f704061b34 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, REPARTITION, REPARTITION_WITH_NUM, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike} -import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions._ @@ -93,6 +93,13 @@ class AdaptiveQueryExecSuite (dfAdaptive.queryExecution.sparkPlan, adaptivePlan) } + private def findTopLevelBroadcastNestedLoopJoin( + plan: SparkPlan): Seq[BroadcastNestedLoopJoinExec] = { + collect(plan) { + case j: BroadcastNestedLoopJoinExec => j + } + } + private def findTopLevelBroadcastHashJoin(plan: SparkPlan): Seq[BroadcastHashJoinExec] = { collect(plan) { case j: BroadcastHashJoinExec => j @@ -1230,6 +1237,29 @@ class AdaptiveQueryExecSuite } } + test("SPARK-34533: Eliminate left anti join to empty relation") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withTable("emptyTestData") { + spark.range(0).write.saveAsTable("emptyTestData") + Seq( + // broadcast non-empty right side + ("SELECT /*+ broadcast(testData3) */ * FROM testData LEFT ANTI JOIN testData3", true), + // broadcast empty right side + ("SELECT /*+ broadcast(emptyTestData) */ * FROM testData LEFT ANTI JOIN emptyTestData", false), + // broadcast left side + ("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN testData3", false), + ).foreach { case (query, isEliminated) => + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) + val bnlj = findTopLevelBroadcastNestedLoopJoin(plan) + assert(bnlj.size == 1) + val join = findTopLevelBaseJoin(adaptivePlan) + assert(isEliminated == join.isEmpty) + } + } + } + } + test("SPARK-32753: Only copy tags to node with no tags") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTempView("v1") { From e9e7b160aa5b892c47c4131fefe67744fec6352f Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 24 Feb 2021 21:32:50 -0800 Subject: [PATCH 2/5] Try to fix scala style --- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index b9f704061b34..45e785b52d87 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1246,7 +1246,8 @@ class AdaptiveQueryExecSuite // broadcast non-empty right side ("SELECT /*+ broadcast(testData3) */ * FROM testData LEFT ANTI JOIN testData3", true), // broadcast empty right side - ("SELECT /*+ broadcast(emptyTestData) */ * FROM testData LEFT ANTI JOIN emptyTestData", false), + ("SELECT /*+ broadcast(emptyTestData) */ * FROM testData LEFT ANTI JOIN emptyTestData", + false), // broadcast left side ("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN testData3", false), ).foreach { case (query, isEliminated) => From 255ee0754ffb35586135d12d87949a745fb1f082 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 24 Feb 2021 21:57:54 -0800 Subject: [PATCH 3/5] Try to fix scala style after retry on laptop --- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 45e785b52d87..86b18a550471 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1249,7 +1249,7 @@ class AdaptiveQueryExecSuite ("SELECT /*+ broadcast(emptyTestData) */ * FROM testData LEFT ANTI JOIN emptyTestData", false), // broadcast left side - ("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN testData3", false), + ("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN testData3", false) ).foreach { case (query, isEliminated) => val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) val bnlj = findTopLevelBroadcastNestedLoopJoin(plan) From 8e5f35d559c0c5fe33fdd29dfd2d5106dad1dd16 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 25 Feb 2021 11:46:30 -0800 Subject: [PATCH 4/5] Address comment to use row count stats --- .../EliminateJoinToEmptyRelation.scala | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala index 9f7d8ace8e9e..d6df52278e07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.adaptive -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} @@ -35,8 +34,7 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation * because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE * at the first place. * - * 3. Join is left anti join without condition, and broadcasted join right side is not empty. - * This applies to broadcast nested loop join only. + * 3. Join is left anti join without condition, and join right side is non-empty. */ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { @@ -59,18 +57,12 @@ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) case j @ Join(_, _, LeftAnti, None, _) => - val isNonEmptyBroadcastedRightSide = j.right match { - case LogicalQueryStage(_, stage: BroadcastQueryStageExec) - if stage.resultOption.get().isDefined => - stage.broadcast.relationFuture.get().value match { - // Match with Array[InternalRow] as this is the type of broadcast result - // in [[BroadcastNestedLoopJoinExec]]. - case v: Array[InternalRow] => v.nonEmpty - case _ => false - } + val isNonEmptyRightSide = j.right match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => + stage.getRuntimeStatistics.rowCount.exists(_ > 0) case _ => false } - if (isNonEmptyBroadcastedRightSide) { + if (isNonEmptyRightSide) { LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) } else { j From 934306e16d961a6d8373252082e0f0417181b7f4 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Thu, 25 Feb 2021 19:55:41 -0800 Subject: [PATCH 5/5] simplify test --- .../adaptive/AdaptiveQueryExecSuite.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 86b18a550471..d7a1d5d26eae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, REPARTITION, REPARTITION_WITH_NUM, ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike} -import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BaseJoinExec, BroadcastHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter import org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate import org.apache.spark.sql.functions._ @@ -93,13 +93,6 @@ class AdaptiveQueryExecSuite (dfAdaptive.queryExecution.sparkPlan, adaptivePlan) } - private def findTopLevelBroadcastNestedLoopJoin( - plan: SparkPlan): Seq[BroadcastNestedLoopJoinExec] = { - collect(plan) { - case j: BroadcastNestedLoopJoinExec => j - } - } - private def findTopLevelBroadcastHashJoin(plan: SparkPlan): Seq[BroadcastHashJoinExec] = { collect(plan) { case j: BroadcastHashJoinExec => j @@ -1252,10 +1245,8 @@ class AdaptiveQueryExecSuite ("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN testData3", false) ).foreach { case (query, isEliminated) => val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) - val bnlj = findTopLevelBroadcastNestedLoopJoin(plan) - assert(bnlj.size == 1) - val join = findTopLevelBaseJoin(adaptivePlan) - assert(isEliminated == join.isEmpty) + assert(findTopLevelBaseJoin(plan).size == 1) + assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated) } } }