diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e34a478818e9..de51187f1fb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1334,12 +1334,15 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { val rightConditions = splitConjunctiveConditions .filter(_.references.subsetOf(join.right.outputSet)) + val joinAttributeSet = join.condition.map(_.references).getOrElse(AttributeSet.empty) + val leftOuterAttributeSet = join.left.outputSet -- joinAttributeSet + val rightOuterAttributeSet = join.right.outputSet -- joinAttributeSet val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) || filter.constraints.filter(_.isInstanceOf[IsNotNull]) - .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty) + .exists(expr => leftOuterAttributeSet.intersect(expr.references).nonEmpty) val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) || filter.constraints.filter(_.isInstanceOf[IsNotNull]) - .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty) + .exists(expr => rightOuterAttributeSet.intersect(expr.references).nonEmpty) join.joinType match { case RightOuter if leftHasNonNullPredicate => Inner diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4fcde58833d7..f1c3070461c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2602,6 +2602,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(s"$expected") :: Nil) } + test("SPARK-16991: Full outer join followed by inner join produces wrong results") { + val a = Seq((1, 2), (2, 3)).toDF("a", "b") + val b = Seq((2, 5), (3, 4)).toDF("a", "c") + val c = Seq((3, 1)).toDF("a", "d") + val ab = a.join(b, Seq("a"), "fullouter") + checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) + } + test("SPARK-15752 optimize metadata only query for datasource table") { withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { withTable("srcpart_15752") {