Skip to content

Commit a537dd0

Browse files
committed
fix.
1 parent 2a10513 commit a537dd0

File tree

3 files changed

+53
-12
lines changed

3 files changed

+53
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,18 +1328,12 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper {
13281328
}
13291329

13301330
private def buildNewJoinType(filter: Filter, join: Join): JoinType = {
1331-
val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition)
1332-
val leftConditions = splitConjunctiveConditions
1333-
.filter(_.references.subsetOf(join.left.outputSet))
1334-
val rightConditions = splitConjunctiveConditions
1335-
.filter(_.references.subsetOf(join.right.outputSet))
1336-
1337-
val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||
1338-
filter.constraints.filter(_.isInstanceOf[IsNotNull])
1339-
.exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty)
1340-
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) ||
1341-
filter.constraints.filter(_.isInstanceOf[IsNotNull])
1342-
.exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty)
1331+
val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints
1332+
val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet))
1333+
val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet))
1334+
1335+
val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
1336+
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)
13431337

13441338
join.joinType match {
13451339
case RightOuter if leftHasNonNullPredicate => Inner

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
2020
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
2121
import org.apache.spark.sql.catalyst.dsl.expressions._
2222
import org.apache.spark.sql.catalyst.dsl.plans._
23+
import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull}
2324
import org.apache.spark.sql.catalyst.plans._
2425
import org.apache.spark.sql.catalyst.plans.logical._
2526
import org.apache.spark.sql.catalyst.rules._
@@ -192,4 +193,42 @@ class OuterJoinEliminationSuite extends PlanTest {
192193

193194
comparePlans(optimized, correctAnswer)
194195
}
196+
197+
test("joins: no outer join elimination if the filter is not NULL eliminated") {
198+
val x = testRelation.subquery('x)
199+
val y = testRelation1.subquery('y)
200+
201+
val originalQuery =
202+
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
203+
.where(Coalesce("y.e".attr :: "x.a".attr :: Nil))
204+
205+
val optimized = Optimize.execute(originalQuery.analyze)
206+
207+
val left = testRelation
208+
val right = testRelation1
209+
val correctAnswer =
210+
left.join(right, FullOuter, Option("a".attr === "d".attr))
211+
.where(Coalesce("e".attr :: "a".attr :: Nil)).analyze
212+
213+
comparePlans(optimized, correctAnswer)
214+
}
215+
216+
test("joins: no outer join elimination if the filter's constraints are not NULL eliminated") {
217+
val x = testRelation.subquery('x)
218+
val y = testRelation1.subquery('y)
219+
220+
val originalQuery =
221+
x.join(y, FullOuter, Option("x.a".attr === "y.d".attr))
222+
.where(IsNotNull(Coalesce("y.e".attr :: "x.a".attr :: Nil)))
223+
224+
val optimized = Optimize.execute(originalQuery.analyze)
225+
226+
val left = testRelation
227+
val right = testRelation1
228+
val correctAnswer =
229+
left.join(right, FullOuter, Option("a".attr === "d".attr))
230+
.where(IsNotNull(Coalesce("e".attr :: "a".attr :: Nil))).analyze
231+
232+
comparePlans(optimized, correctAnswer)
233+
}
195234
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2602,6 +2602,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
26022602
Row(s"$expected") :: Nil)
26032603
}
26042604

2605+
test("SPARK-16991: Full outer join followed by inner join produces wrong results") {
2606+
val a = Seq((1, 2), (2, 3)).toDF("a", "b")
2607+
val b = Seq((2, 5), (3, 4)).toDF("a", "c")
2608+
val c = Seq((3, 1)).toDF("a", "d")
2609+
val ab = a.join(b, Seq("a"), "fullouter")
2610+
checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil)
2611+
}
2612+
26052613
test("SPARK-15752 optimize metadata only query for datasource table") {
26062614
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") {
26072615
withTable("srcpart_15752") {

0 commit comments

Comments
 (0)