diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala index cfdd20ec7565d..d6df52278e079 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin -import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} +import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, HashedRelationWithAllNullKeys} @@ -33,6 +33,8 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation * This applies to all Joins (sort merge join, shuffled hash join, and broadcast hash join), * because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE * at the first place. + * + * 3. Join is left anti join without condition, and join right side is non-empty. */ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { @@ -53,5 +55,17 @@ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] { case j @ Join(_, _, LeftSemi, _, _) if canEliminate(j.right, EmptyHashedRelation) => LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + + case j @ Join(_, _, LeftAnti, None, _) => + val isNonEmptyRightSide = j.right match { + case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined => + stage.getRuntimeStatistics.rowCount.exists(_ > 0) + case _ => false + } + if (isNonEmptyRightSide) { + LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming) + } else { + j + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 122bc2d1e59a6..d7a1d5d26eae1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -1230,6 +1230,28 @@ class AdaptiveQueryExecSuite } } + test("SPARK-34533: Eliminate left anti join to empty relation") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { + withTable("emptyTestData") { + spark.range(0).write.saveAsTable("emptyTestData") + Seq( + // broadcast non-empty right side + ("SELECT /*+ broadcast(testData3) */ * FROM testData LEFT ANTI JOIN testData3", true), + // broadcast empty right side + ("SELECT /*+ broadcast(emptyTestData) */ * FROM testData LEFT ANTI JOIN emptyTestData", + false), + // broadcast left side + ("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN testData3", false) + ).foreach { case (query, isEliminated) => + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query) + assert(findTopLevelBaseJoin(plan).size == 1) + assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated) + } + } + } + } + test("SPARK-32753: Only copy tags to node with no tags") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTempView("v1") {