diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 15d4561b47a23..05b61638cf72c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules._ * - Join with one or two empty children (including Intersect/Except). * 2. Unary-node Logical Plans * - Project/Filter/Sample/Join/Limit/Repartition with all empty children. + * - Join with false condition. * - Aggregate with all empty children and at least one grouping expression. * - Generate(Explode) with all empty children. Others like Hive UDTF may return results. */ @@ -71,24 +73,32 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit // Joins on empty LocalRelations generated from streaming sources are not eliminated // as stateful streaming joins need to perform other state management operations other than // just processing the input data. - case p @ Join(_, _, joinType, _, _) + case p @ Join(_, _, joinType, conditionOpt, _) if !p.children.exists(_.isStreaming) => val isLeftEmpty = isEmptyLocalRelation(p.left) val isRightEmpty = isEmptyLocalRelation(p.right) - if (isLeftEmpty || isRightEmpty) { + val isFalseCondition = conditionOpt match { + case Some(FalseLiteral) => true + case _ => false + } + if (isLeftEmpty || isRightEmpty || isFalseCondition) { joinType match { case _: InnerLike => empty(p) // Intersect is handled as LeftSemi by `ReplaceIntersectWithSemiJoin` rule. // Except is handled as LeftAnti by `ReplaceExceptWithAntiJoin` rule. case LeftOuter | LeftSemi | LeftAnti if isLeftEmpty => empty(p) - case LeftSemi if isRightEmpty => empty(p) - case LeftAnti if isRightEmpty => p.left + case LeftSemi if isRightEmpty | isFalseCondition => empty(p) + case LeftAnti if isRightEmpty | isFalseCondition => p.left case FullOuter if isLeftEmpty && isRightEmpty => empty(p) case LeftOuter | FullOuter if isRightEmpty => Project(p.left.output ++ nullValueProjectList(p.right), p.left) case RightOuter if isRightEmpty => empty(p) case RightOuter | FullOuter if isLeftEmpty => Project(nullValueProjectList(p.left) ++ p.right.output, p.right) + case LeftOuter if isFalseCondition => + Project(p.left.output ++ nullValueProjectList(p.right), p.left) + case RightOuter if isFalseCondition => + Project(nullValueProjectList(p.left) ++ p.right.output, p.right) case _ => p } } else { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala index 54c692c13b781..b5dcb8aa67646 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -155,6 +156,28 @@ class PropagateEmptyRelationSuite extends PlanTest { } } + test("SPARK-28220: Propagate empty relation through Join if condition is FalseLiteral") { + val testcases = Seq( + (Inner, Some(LocalRelation('a.int, 'b.int))), + (Cross, Some(LocalRelation('a.int, 'b.int))), + (LeftOuter, + Some(Project(Seq('a, Literal(null).cast(IntegerType).as('b)), testRelation1).analyze)), + (RightOuter, + Some(Project(Seq(Literal(null).cast(IntegerType).as('a), 'b), testRelation2).analyze)), + (FullOuter, None), + (LeftAnti, Some(testRelation1)), + (LeftSemi, Some(LocalRelation('a.int))) + ) + + testcases.foreach { case (jt, answer) => + val query = testRelation1.join(testRelation2, joinType = jt, condition = Some(FalseLiteral)) + val optimized = Optimize.execute(query.analyze) + val correctAnswer = + answer.getOrElse(OptimizeWithoutPropagateEmptyRelation.execute(query.analyze)) + comparePlans(optimized, correctAnswer) + } + } + test("propagate empty relation through UnaryNode") { val query = testRelation1 .where(false)