Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.adaptive

import org.apache.spark.sql.catalyst.planning.ExtractSingleColumnNullAwareAntiJoin
import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
import org.apache.spark.sql.catalyst.plans.{Inner, LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation, HashedRelationWithAllNullKeys}
Expand All @@ -33,6 +33,8 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation
* This applies to all Joins (sort merge join, shuffled hash join, and broadcast hash join),
* because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE
* at the first place.
*
* 3. Join is left anti join without condition, and join right side is non-empty.
*/
object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] {

Expand All @@ -53,5 +55,17 @@ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] {

case j @ Join(_, _, LeftSemi, _, _) if canEliminate(j.right, EmptyHashedRelation) =>
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)

case j @ Join(_, _, LeftAnti, None, _) =>
val isNonEmptyRightSide = j.right match {
case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined =>
stage.getRuntimeStatistics.rowCount.exists(_ > 0)
case _ => false
}
if (isNonEmptyRightSide) {
LocalRelation(j.output, data = Seq.empty, isStreaming = j.isStreaming)
} else {
j
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1230,6 +1230,28 @@ class AdaptiveQueryExecSuite
}
}

test("SPARK-34533: Eliminate left anti join to empty relation") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
withTable("emptyTestData") {
spark.range(0).write.saveAsTable("emptyTestData")
Seq(
// broadcast non-empty right side
("SELECT /*+ broadcast(testData3) */ * FROM testData LEFT ANTI JOIN testData3", true),
// broadcast empty right side
("SELECT /*+ broadcast(emptyTestData) */ * FROM testData LEFT ANTI JOIN emptyTestData",
false),
// broadcast left side
("SELECT /*+ broadcast(testData) */ * FROM testData LEFT ANTI JOIN testData3", false)
).foreach { case (query, isEliminated) =>
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(query)
assert(findTopLevelBaseJoin(plan).size == 1)
assert(findTopLevelBaseJoin(adaptivePlan).isEmpty == isEliminated)
}
}
}
}

test("SPARK-32753: Only copy tags to node with no tags") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
withTempView("v1") {
Expand Down