From a537dd07823339e63a10e6628f333f44fa528b54 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 16 Aug 2016 00:17:09 -0700 Subject: [PATCH 1/3] fix. --- .../sql/catalyst/optimizer/Optimizer.scala | 18 +++------ .../optimizer/OuterJoinEliminationSuite.scala | 39 +++++++++++++++++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 8 ++++ 3 files changed, 53 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e34a478818e9..507f58638f5c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1328,18 +1328,12 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } private def buildNewJoinType(filter: Filter, join: Join): JoinType = { - val splitConjunctiveConditions: Seq[Expression] = splitConjunctivePredicates(filter.condition) - val leftConditions = splitConjunctiveConditions - .filter(_.references.subsetOf(join.left.outputSet)) - val rightConditions = splitConjunctiveConditions - .filter(_.references.subsetOf(join.right.outputSet)) - - val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) || - filter.constraints.filter(_.isInstanceOf[IsNotNull]) - .exists(expr => join.left.outputSet.intersect(expr.references).nonEmpty) - val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) || - filter.constraints.filter(_.isInstanceOf[IsNotNull]) - .exists(expr => join.right.outputSet.intersect(expr.references).nonEmpty) + val conditions = splitConjunctivePredicates(filter.condition) ++ filter.constraints + val leftConditions = conditions.filter(_.references.subsetOf(join.left.outputSet)) + val rightConditions = conditions.filter(_.references.subsetOf(join.right.outputSet)) + + val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull) + val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull) join.joinType match { case RightOuter if leftHasNonNullPredicate => Inner diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala index 41754adef421..c168a55e40c5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OuterJoinEliminationSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{Coalesce, IsNotNull} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -192,4 +193,42 @@ class OuterJoinEliminationSuite extends PlanTest { comparePlans(optimized, correctAnswer) } + + test("joins: no outer join elimination if the filter is not NULL eliminated") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)) + .where(Coalesce("y.e".attr :: "x.a".attr :: Nil)) + + val optimized = Optimize.execute(originalQuery.analyze) + + val left = testRelation + val right = testRelation1 + val correctAnswer = + left.join(right, FullOuter, Option("a".attr === "d".attr)) + .where(Coalesce("e".attr :: "a".attr :: Nil)).analyze + + comparePlans(optimized, correctAnswer) + } + + test("joins: no outer join elimination if the filter's constraints are not NULL eliminated") { + val x = testRelation.subquery('x) + val y = testRelation1.subquery('y) + + val originalQuery = + x.join(y, FullOuter, Option("x.a".attr === "y.d".attr)) + .where(IsNotNull(Coalesce("y.e".attr :: "x.a".attr :: Nil))) + + val optimized = Optimize.execute(originalQuery.analyze) + + val left = testRelation + val right = testRelation1 + val correctAnswer = + left.join(right, FullOuter, Option("a".attr === "d".attr)) + .where(IsNotNull(Coalesce("e".attr :: "a".attr :: Nil))).analyze + + comparePlans(optimized, correctAnswer) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4fcde58833d7..f1c3070461c5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2602,6 +2602,14 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(s"$expected") :: Nil) } + test("SPARK-16991: Full outer join followed by inner join produces wrong results") { + val a = Seq((1, 2), (2, 3)).toDF("a", "b") + val b = Seq((2, 5), (3, 4)).toDF("a", "c") + val c = Seq((3, 1)).toDF("a", "d") + val ab = a.join(b, Seq("a"), "fullouter") + checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) + } + test("SPARK-15752 optimize metadata only query for datasource table") { withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { withTable("srcpart_15752") { From cedabd5a31837cd6a87255ef6b385b6695a8f145 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 24 Aug 2016 14:17:07 -0700 Subject: [PATCH 2/3] added test cases --- .../org/apache/spark/sql/SQLQuerySuite.scala | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b107d969d568..b58c19fc3cee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2594,6 +2594,55 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) } + test("SPARK-17099: Incorrect result when HAVING clause is added to group by query") { + sparkContext.parallelize( + Seq(-234, 145, 367, 975, 298)).toDF("int_col_5").createOrReplaceTempView("t1") + sparkContext.parallelize( + Seq( + (-769, -244), + (-800, -409), + (940, 86), + (-507, 304), + (-367, 158)) + ).toDF("int_col_2", "int_col_5").createOrReplaceTempView("t2") + + checkAnswer( + sql( + """ + |SELECT + | (SUM(COALESCE(t1.int_col_5, t2.int_col_2))), + | ((COALESCE(t1.int_col_5, t2.int_col_2)) * 2) + |FROM t1 + |RIGHT JOIN t2 + | ON (t2.int_col_2) = (t1.int_col_5) + |GROUP BY GREATEST(COALESCE(t2.int_col_5, 109), COALESCE(t1.int_col_5, -449)), + | COALESCE(t1.int_col_5, t2.int_col_2) + |HAVING (SUM(COALESCE(t1.int_col_5, t2.int_col_2))) + | > ((COALESCE(t1.int_col_5, t2.int_col_2)) * 2) + """.stripMargin), + Row(-367, -734) :: Row(-800, -1600) :: Row(-769, -1538) :: Row(-507, -1014) :: Nil) + } + + test("SPARK-17120: Analyzer incorrectly optimizes plan to empty LocalRelation") { + sparkContext.parallelize(Seq(97)).toDF("int_col_6").createOrReplaceTempView("t1") + sparkContext.parallelize(Seq(0)).toDF("int_col_1").createOrReplaceTempView("t2") + + withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + checkAnswer( + sql( + """ + |SELECT * + |FROM ( + |SELECT + | COALESCE(t2.int_col_1, t1.int_col_6) AS int_col + | FROM t1 + | LEFT JOIN t2 ON false + |) t where (t.int_col) is not null + """.stripMargin), + Row(97) :: Nil) + } + } + test("SPARK-15752 optimize metadata only query for datasource table") { withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { withTable("srcpart_15752") { From ebc79fcb3981cfcaaf40450358a5b6c369e27132 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 24 Aug 2016 14:48:50 -0700 Subject: [PATCH 3/3] moved test cases --- .../resources/sql-tests/inputs/outer-join.sql | 36 ++++++++++ .../sql-tests/results/outer-join.sql.out | 72 +++++++++++++++++++ .../apache/spark/sql/DataFrameJoinSuite.scala | 8 +++ .../org/apache/spark/sql/SQLQuerySuite.scala | 57 --------------- 4 files changed, 116 insertions(+), 57 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/outer-join.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/outer-join.sql.out diff --git a/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql new file mode 100644 index 000000000000..f50f1ebad970 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/outer-join.sql @@ -0,0 +1,36 @@ +-- SPARK-17099: Incorrect result when HAVING clause is added to group by query +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +(-234), (145), (367), (975), (298) +as t1(int_col1); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +(-769, -244), (-800, -409), (940, 86), (-507, 304), (-367, 158) +as t2(int_col0, int_col1); + +SELECT + (SUM(COALESCE(t1.int_col1, t2.int_col0))), + ((COALESCE(t1.int_col1, t2.int_col0)) * 2) +FROM t1 +RIGHT JOIN t2 + ON (t2.int_col0) = (t1.int_col1) +GROUP BY GREATEST(COALESCE(t2.int_col1, 109), COALESCE(t1.int_col1, -449)), + COALESCE(t1.int_col1, t2.int_col0) +HAVING (SUM(COALESCE(t1.int_col1, t2.int_col0))) + > ((COALESCE(t1.int_col1, t2.int_col0)) * 2); + + +-- SPARK-17120: Analyzer incorrectly optimizes plan to empty LocalRelation +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1); + +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1); + +SELECT * +FROM ( +SELECT + COALESCE(t2.int_col1, t1.int_col1) AS int_col + FROM t1 + LEFT JOIN t2 ON false +) t where (t.int_col) is not null; + + + diff --git a/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out new file mode 100644 index 000000000000..b39fdb0e5872 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/outer-join.sql.out @@ -0,0 +1,72 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 6 + + +-- !query 0 +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +(-234), (145), (367), (975), (298) +as t1(int_col1) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES +(-769, -244), (-800, -409), (940, 86), (-507, 304), (-367, 158) +as t2(int_col0, int_col1) +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +SELECT + (SUM(COALESCE(t1.int_col1, t2.int_col0))), + ((COALESCE(t1.int_col1, t2.int_col0)) * 2) +FROM t1 +RIGHT JOIN t2 + ON (t2.int_col0) = (t1.int_col1) +GROUP BY GREATEST(COALESCE(t2.int_col1, 109), COALESCE(t1.int_col1, -449)), + COALESCE(t1.int_col1, t2.int_col0) +HAVING (SUM(COALESCE(t1.int_col1, t2.int_col0))) + > ((COALESCE(t1.int_col1, t2.int_col0)) * 2) +-- !query 2 schema +struct +-- !query 2 output +-367 -734 +-507 -1014 +-769 -1538 +-800 -1600 + + +-- !query 3 +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (97) as t1(int_col1) +-- !query 3 schema +struct<> +-- !query 3 output + + + +-- !query 4 +CREATE OR REPLACE TEMPORARY VIEW t2 AS SELECT * FROM VALUES (0) as t2(int_col1) +-- !query 4 schema +struct<> +-- !query 4 output + + + +-- !query 5 +SELECT * +FROM ( +SELECT + COALESCE(t2.int_col1, t1.int_col1) AS int_col + FROM t1 + LEFT JOIN t2 ON false +) t where (t.int_col) is not null +-- !query 5 schema +struct +-- !query 5 output +97 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 4342c039aefc..4abf5e42b9c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -225,4 +225,12 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row(1, null) :: Row(null, 2) :: Nil ) } + + test("SPARK-16991: Full outer join followed by inner join produces wrong results") { + val a = Seq((1, 2), (2, 3)).toDF("a", "b") + val b = Seq((2, 5), (3, 4)).toDF("a", "c") + val c = Seq((3, 1)).toDF("a", "d") + val ab = a.join(b, Seq("a"), "fullouter") + checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index b58c19fc3cee..eac266cba55b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2586,63 +2586,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Row(s"$expected") :: Nil) } - test("SPARK-16991: Full outer join followed by inner join produces wrong results") { - val a = Seq((1, 2), (2, 3)).toDF("a", "b") - val b = Seq((2, 5), (3, 4)).toDF("a", "c") - val c = Seq((3, 1)).toDF("a", "d") - val ab = a.join(b, Seq("a"), "fullouter") - checkAnswer(ab.join(c, "a"), Row(3, null, 4, 1) :: Nil) - } - - test("SPARK-17099: Incorrect result when HAVING clause is added to group by query") { - sparkContext.parallelize( - Seq(-234, 145, 367, 975, 298)).toDF("int_col_5").createOrReplaceTempView("t1") - sparkContext.parallelize( - Seq( - (-769, -244), - (-800, -409), - (940, 86), - (-507, 304), - (-367, 158)) - ).toDF("int_col_2", "int_col_5").createOrReplaceTempView("t2") - - checkAnswer( - sql( - """ - |SELECT - | (SUM(COALESCE(t1.int_col_5, t2.int_col_2))), - | ((COALESCE(t1.int_col_5, t2.int_col_2)) * 2) - |FROM t1 - |RIGHT JOIN t2 - | ON (t2.int_col_2) = (t1.int_col_5) - |GROUP BY GREATEST(COALESCE(t2.int_col_5, 109), COALESCE(t1.int_col_5, -449)), - | COALESCE(t1.int_col_5, t2.int_col_2) - |HAVING (SUM(COALESCE(t1.int_col_5, t2.int_col_2))) - | > ((COALESCE(t1.int_col_5, t2.int_col_2)) * 2) - """.stripMargin), - Row(-367, -734) :: Row(-800, -1600) :: Row(-769, -1538) :: Row(-507, -1014) :: Nil) - } - - test("SPARK-17120: Analyzer incorrectly optimizes plan to empty LocalRelation") { - sparkContext.parallelize(Seq(97)).toDF("int_col_6").createOrReplaceTempView("t1") - sparkContext.parallelize(Seq(0)).toDF("int_col_1").createOrReplaceTempView("t2") - - withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { - checkAnswer( - sql( - """ - |SELECT * - |FROM ( - |SELECT - | COALESCE(t2.int_col_1, t1.int_col_6) AS int_col - | FROM t1 - | LEFT JOIN t2 ON false - |) t where (t.int_col) is not null - """.stripMargin), - Row(97) :: Nil) - } - } - test("SPARK-15752 optimize metadata only query for datasource table") { withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> "true") { withTable("srcpart_15752") {