From 7db83bc9cb78368e768ef88de4b0733edd6c8d4f Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Wed, 10 Aug 2016 09:07:43 -0700 Subject: [PATCH 1/2] [SPARK-16991][SQL] Fix `EliminateOuterJoin` optimizer to check output columns except join condition --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 11 +++++++++-- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 ++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e34a478818e9..27e739b1670a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1334,12 +1334,19 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { val rightConditions = splitConjunctiveConditions .filter(_.references.subsetOf(join.right.outputSet)) + val joinAttributeSet = if (join.condition.isDefined) { + join.condition.get.references + } else { + AttributeSet.empty + } + val leftOuterAttributeSet = join.left.outputSet -- joinAttributeSet + val rightOuterAttributeSet = join.right.outputSet -- joinAttributeSet val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) || filter.constraints.filter(_.isInstanceOf[IsNotNull]) - .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty) + .exists(expr => leftOuterAttributeSet.intersect(expr.references).nonEmpty) val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) || filter.constraints.filter(_.isInstanceOf[IsNotNull]) - .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty) + .exists(expr => rightOuterAttributeSet.intersect(expr.references).nonEmpty) join.joinType match { case RightOuter if leftHasNonNullPredicate => Inner diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4fcde58833d7..f1c3070461c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2602,6 +2602,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(s"$expected") :: Nil) } + test("SPARK-16991: Full outer join followed by inner join produces wrong results") { + val a = Seq((1, 2), (2, 3)).toDF("a", "b") + val b = Seq((2, 5), (3, 4)).toDF("a", "c") + val c = Seq((3, 1)).toDF("a", "d") + val ab = a.join(b, Seq("a"), "fullouter") + checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) + } + test("SPARK-15752 optimize metadata only query for datasource table") { withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { withTable("srcpart_15752") { From af189d6f0ee49f99a32c3db570036c96ae73076b Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 15 Aug 2016 11:44:21 -0700 Subject: [PATCH 2/2] Address comments. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 27e739b1670a..de51187f1fb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1334,11 +1334,7 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { val rightConditions = splitConjunctiveConditions .filter(_.references.subsetOf(join.right.outputSet)) - val joinAttributeSet = if (join.condition.isDefined) { - join.condition.get.references - } else { - AttributeSet.empty - } + val joinAttributeSet = join.condition.map(_.references).getOrElse(AttributeSet.empty) val leftOuterAttributeSet = join.left.outputSet -- joinAttributeSet val rightOuterAttributeSet = join.right.outputSet -- joinAttributeSet val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) ||