From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 01/20] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdba..5a5b71e52dd79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da02..5e00546a74c00 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 02/20] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00aa..47962ebe6ef82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c00..61d9dcd37572b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 03/20] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572b..3427152b2da02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 0bd177135bc1c8210fa0ea5a8c7a4c5144d6227e Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 6 Jan 2016 21:40:33 -0800 Subject: [PATCH 04/20] replace Intersect with Left-semi Join --- .../sql/catalyst/optimizer/Optimizer.scala | 45 +++++++++++-------- .../optimizer/SetOperationPushDownSuite.scala | 13 +----- .../spark/sql/execution/SparkStrategies.scala | 2 - .../spark/sql/execution/basicOperators.scala | 12 ----- .../org/apache/spark/sql/DataFrameSuite.scala | 24 +++++++++- 5 files changed, 49 insertions(+), 47 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0b1c74293bb8b..5a98a15795c66 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -40,6 +40,8 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { Batch("Aggregate", FixedPoint(100), ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: + Batch("Intersect", FixedPoint(100), + ReplaceIntersectWithLeftSemi) :: Batch("Operator Optimizations", FixedPoint(100), // Operator push down SetOperationPushDown, @@ -93,18 +95,13 @@ object SamplePushDown extends Rule[LogicalPlan] { } /** - * Pushes certain operations to both sides of a Union, Intersect or Except operator. + * Pushes certain operations to both sides of a Union or Except operator. * Operations that are safe to pushdown are listed as follows. * Union: * Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is * safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT, * we will not be able to pushdown Projections. * - * Intersect: - * It is not safe to pushdown Projections through it because we need to get the - * intersect of rows by comparing the entire rows. It is fine to pushdown Filters - * with deterministic condition. - * * Except: * It is not safe to pushdown Projections through it because we need to get the * intersect of rows by comparing the entire rows. It is fine to pushdown Filters @@ -116,7 +113,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * Maps Attributes from the left side to the corresponding Attribute on the right side. */ private def buildRewrites(bn: BinaryNode): AttributeMap[Attribute] = { - assert(bn.isInstanceOf[Union] || bn.isInstanceOf[Intersect] || bn.isInstanceOf[Except]) + assert(bn.isInstanceOf[Union] || bn.isInstanceOf[Except]) assert(bn.left.output.size == bn.right.output.size) AttributeMap(bn.left.output.zip(bn.right.output)) @@ -124,7 +121,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { /** * Rewrites an expression so that it can be pushed to the right side of a - * Union, Intersect or Except operator. This method relies on the fact that the output attributes + * Union or Except operator. This method relies on the fact that the output attributes * of a union/intersect/except are always equal to the left child's output. */ private def pushToRight[A <: Expression](e: A, rewrites: AttributeMap[Attribute]) = { @@ -175,17 +172,6 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { p } - // Push down filter through INTERSECT - case Filter(condition, i @ Intersect(left, right)) => - val (deterministic, nondeterministic) = partitionByDeterministic(condition) - val rewrites = buildRewrites(i) - Filter(nondeterministic, - Intersect( - Filter(deterministic, left), - Filter(pushToRight(deterministic, rewrites), right) - ) - ) - // Push down filter through EXCEPT case Filter(condition, e @ Except(left, right)) => val (deterministic, nondeterministic) = partitionByDeterministic(condition) @@ -965,6 +951,27 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { } } +/** + * Replaces logical [[Intersect]] operator with a left-semi [[Join]] operator. + * {{{ + * SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2 + * ==> SELECT a1, a2 FROM Tab1, Tab2 ON a1<=>b1 AND a2<=>b2 + * }}} + */ +object ReplaceIntersectWithLeftSemi extends Rule[LogicalPlan] { + private def buildCond (left: LogicalPlan, right: LogicalPlan): Seq[Expression] = { + require(left.output.length == right.output.length) + left.output.zip(right.output).map { case (l, r) => + EqualNullSafe(l, r) + } + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Intersect(left, right) => + Join(left, right, LeftSemi, buildCond(left, right).reduceLeftOption(And)) + } +} + /** * Removes literals from group expressions in [[Aggregate]], as they have no effect to the result * but only makes the grouping key bigger. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index a498b463a69e9..497cc48c8f1db 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -37,27 +37,21 @@ class SetOperationPushDownSuite extends PlanTest { val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int) val testUnion = Union(testRelation, testRelation2) - val testIntersect = Intersect(testRelation, testRelation2) val testExcept = Except(testRelation, testRelation2) - test("union/intersect/except: filter to each side") { + test("union/except: filter to each side") { val unionQuery = testUnion.where('a === 1) - val intersectQuery = testIntersect.where('b < 10) val exceptQuery = testExcept.where('c >= 5) val unionOptimized = Optimize.execute(unionQuery.analyze) - val intersectOptimized = Optimize.execute(intersectQuery.analyze) val exceptOptimized = Optimize.execute(exceptQuery.analyze) val unionCorrectAnswer = Union(testRelation.where('a === 1), testRelation2.where('d === 1)).analyze - val intersectCorrectAnswer = - Intersect(testRelation.where('b < 10), testRelation2.where('e < 10)).analyze val exceptCorrectAnswer = Except(testRelation.where('c >= 5), testRelation2.where('f >= 5)).analyze comparePlans(unionOptimized, unionCorrectAnswer) - comparePlans(intersectOptimized, intersectCorrectAnswer) comparePlans(exceptOptimized, exceptCorrectAnswer) } @@ -70,13 +64,8 @@ class SetOperationPushDownSuite extends PlanTest { } test("SPARK-10539: Project should not be pushed down through Intersect or Except") { - val intersectQuery = testIntersect.select('b, 'c) val exceptQuery = testExcept.select('a, 'b, 'c) - - val intersectOptimized = Optimize.execute(intersectQuery.analyze) val exceptOptimized = Optimize.execute(exceptQuery.analyze) - - comparePlans(intersectOptimized, intersectQuery.analyze) comparePlans(exceptOptimized, exceptQuery.analyze) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 6cf75bc17039c..06a807e0ccc43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -351,8 +351,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Union(unionChildren.map(planLater)) :: Nil case logical.Except(left, right) => execution.Except(planLater(left), planLater(right)) :: Nil - case logical.Intersect(left, right) => - execution.Intersect(planLater(left), planLater(right)) :: Nil case g @ logical.Generate(generator, join, outer, _, _, child) => execution.Generate( generator, join = join, outer = outer, g.output, planLater(child)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 95bef683238a7..74e60a646589c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -307,18 +307,6 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { } } -/** - * Returns the rows in left that also appear in right using the built in spark - * intersection function. - */ -case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { - override def output: Seq[Attribute] = children.head.output - - protected override def doExecute(): RDD[InternalRow] = { - left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) - } -} - /** * A plan node that does nothing but lie about the output of its child. Used to spice a * (hopefully structurally equivalent) tree from a different optimization sequence into an already diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index ade1391ecd74a..5b0871de51dc0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -25,7 +25,8 @@ import scala.util.Random import org.scalatest.Matchers._ import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation +import org.apache.spark.sql.catalyst.plans.LeftSemi +import org.apache.spark.sql.catalyst.plans.logical.{Intersect, OneRowRelation, Join} import org.apache.spark.sql.execution.Exchange import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.functions._ @@ -322,13 +323,32 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("intersect") { + val intersectDF = lowerCaseData.intersect(lowerCaseData) + + // Before Optimizer, the operator is Intersect + assert(intersectDF.queryExecution.analyzed.collect { + case j@Intersect(_, _) => j + }.size === 1) + + // Before Optimizer, the operator is converted to LeftSemi Join + assert(intersectDF.queryExecution.optimizedPlan.collect { + case j@Join(_, _, LeftSemi, _) => j + }.size === 1) + checkAnswer( - lowerCaseData.intersect(lowerCaseData), + intersectDF, Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) checkAnswer(lowerCaseData.intersect(upperCaseData), Nil) + + checkAnswer( + nullInts.intersect(nullInts), + Row(1) :: + Row(2) :: + Row(3) :: + Row(null) :: Nil) } test("udf") { From bfa99c565ae1992a11c002ed38f892049c5bea9c Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 6 Jan 2016 22:48:54 -0800 Subject: [PATCH 05/20] address comments. --- .../sql/catalyst/optimizer/Optimizer.scala | 17 +++---- .../optimizer/ReplaceOperatorSuite.scala | 46 +++++++++++++++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 15 +----- 3 files changed, 54 insertions(+), 24 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5a98a15795c66..1428bcb22bccb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -41,7 +41,7 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: Batch("Intersect", FixedPoint(100), - ReplaceIntersectWithLeftSemi) :: + ReplaceIntersectWithSemiJoin) :: Batch("Operator Optimizations", FixedPoint(100), // Operator push down SetOperationPushDown, @@ -955,20 +955,15 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { * Replaces logical [[Intersect]] operator with a left-semi [[Join]] operator. * {{{ * SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2 - * ==> SELECT a1, a2 FROM Tab1, Tab2 ON a1<=>b1 AND a2<=>b2 + * ==> SELECT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2 * }}} */ -object ReplaceIntersectWithLeftSemi extends Rule[LogicalPlan] { - private def buildCond (left: LogicalPlan, right: LogicalPlan): Seq[Expression] = { - require(left.output.length == right.output.length) - left.output.zip(right.output).map { case (l, r) => - EqualNullSafe(l, r) - } - } - +object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Intersect(left, right) => - Join(left, right, LeftSemi, buildCond(left, right).reduceLeftOption(And)) + val joinCond = left.output.zip(right.output).map { case (l, r) => + EqualNullSafe(l, r) } + Join(left, right, LeftSemi, joinCond.reduceLeftOption(And)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala new file mode 100644 index 0000000000000..5b3abf2fae1bd --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.{LeftSemi, PlanTest} +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class ReplaceOperatorSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Intersect", FixedPoint(100), + ReplaceIntersectWithSemiJoin) :: Nil + } + + test("replace Intersect with Left-semi Join") { + val table1 = LocalRelation('a.int, 'b.int) + val table2 = LocalRelation('c.int, 'd.int) + + val query = Intersect(table1, table2) + val optimized = Optimize.execute(query.analyze) + + val correctAnswer = + Join(table1, table2, LeftSemi, Option('a <=> 'c && 'b <=> 'd)).analyze + + comparePlans(optimized, correctAnswer) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5b0871de51dc0..ac108898f0b24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -323,26 +323,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } test("intersect") { - val intersectDF = lowerCaseData.intersect(lowerCaseData) - - // Before Optimizer, the operator is Intersect - assert(intersectDF.queryExecution.analyzed.collect { - case j@Intersect(_, _) => j - }.size === 1) - - // Before Optimizer, the operator is converted to LeftSemi Join - assert(intersectDF.queryExecution.optimizedPlan.collect { - case j@Join(_, _, LeftSemi, _) => j - }.size === 1) - checkAnswer( - intersectDF, + lowerCaseData.intersect(lowerCaseData), Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Row(4, "d") :: Nil) checkAnswer(lowerCaseData.intersect(upperCaseData), Nil) + // check null equality checkAnswer( nullInts.intersect(nullInts), Row(1) :: From 100174ac58e5fffdd5c6cc0709d528a48cc796cc Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 6 Jan 2016 22:52:08 -0800 Subject: [PATCH 06/20] clean code. --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index ac108898f0b24..e7a413532ac09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -25,8 +25,7 @@ import scala.util.Random import org.scalatest.Matchers._ import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.plans.LeftSemi -import org.apache.spark.sql.catalyst.plans.logical.{Intersect, OneRowRelation, Join} +import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation} import org.apache.spark.sql.execution.Exchange import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.functions._ From 9aad1cf54bb51cdc27f8eadfc38eef03d0f1121f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 6 Jan 2016 22:52:46 -0800 Subject: [PATCH 07/20] clean code. --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e7a413532ac09..df5378598fb21 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -25,7 +25,7 @@ import scala.util.Random import org.scalatest.Matchers._ import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation} +import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation import org.apache.spark.sql.execution.Exchange import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.functions._ From 6742984f5ec0d480435988712e161838c3e8d6d5 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 6 Jan 2016 23:06:58 -0800 Subject: [PATCH 08/20] address comments. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1428bcb22bccb..77f85686f4417 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -961,8 +961,8 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Intersect(left, right) => - val joinCond = left.output.zip(right.output).map { case (l, r) => - EqualNullSafe(l, r) } + assert(left.output.size == right.output.size) + val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } Join(left, right, LeftSemi, joinCond.reduceLeftOption(And)) } } From e4c34f01acbb626a8084c99a5f0ad26fbfb175e2 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 6 Jan 2016 23:51:28 -0800 Subject: [PATCH 09/20] added one more case for duplicate values --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index df5378598fb21..bd97240be0749 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -337,6 +337,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(2) :: Row(3) :: Row(null) :: Nil) + + // check duplicate values + checkAnswer( + allNulls.intersect(allNulls), + Row(null) :: + Row(null) :: + Row(null) :: + Row(null) :: Nil) } test("udf") { From 9864b3f5a70c2b7f2e13f890339d04680e53daf6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 7 Jan 2016 22:09:53 -0800 Subject: [PATCH 10/20] added an exception in conversion from logical to physical operators. --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index fe524351be632..9d5190f9ca269 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -308,6 +308,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => throw new IllegalStateException( "logical distinct operator should have been replaced by aggregate in the optimizer") + case logical.Intersect(left, right) => + throw new IllegalStateException( + "logical intersect operator should have been replaced by semi-join in the optimizer") case logical.MapPartitions(f, tEnc, uEnc, output, child) => execution.MapPartitions(f, tEnc, uEnc, output, planLater(child)) :: Nil From 24cea7d10c7575714464feca04dfaa60e2119c0a Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 7 Jan 2016 22:26:23 -0800 Subject: [PATCH 11/20] Add DISTINCT and test cases. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 10 +++++----- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 13 +++++++++---- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 77f85686f4417..d00e98cce28d9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -37,11 +37,11 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { // SubQueries are only needed for analysis and can be removed before execution. Batch("Remove SubQueries", FixedPoint(100), EliminateSubQueries) :: + Batch("Replace Operators", FixedPoint(100), + ReplaceIntersectWithSemiJoin, + ReplaceDistinctWithAggregate) :: Batch("Aggregate", FixedPoint(100), - ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: - Batch("Intersect", FixedPoint(100), - ReplaceIntersectWithSemiJoin) :: Batch("Operator Optimizations", FixedPoint(100), // Operator push down SetOperationPushDown, @@ -955,7 +955,7 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { * Replaces logical [[Intersect]] operator with a left-semi [[Join]] operator. * {{{ * SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2 - * ==> SELECT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2 + * ==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2 * }}} */ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { @@ -963,7 +963,7 @@ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { case Intersect(left, right) => assert(left.output.size == right.output.size) val joinCond = left.output.zip(right.output).map { case (l, r) => EqualNullSafe(l, r) } - Join(left, right, LeftSemi, joinCond.reduceLeftOption(And)) + Distinct(Join(left, right, LeftSemi, joinCond.reduceLeftOption(And))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index bd97240be0749..e1bb7b0755a6e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -338,13 +338,18 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Row(3) :: Row(null) :: Nil) - // check duplicate values + // check if values are de-duplicated checkAnswer( allNulls.intersect(allNulls), - Row(null) :: - Row(null) :: - Row(null) :: Row(null) :: Nil) + + // check if values are de-duplicated + val df = Seq(("id1", 1), ("id1", 1), ("id", 1), ("id1", 2)).toDF("id", "value") + checkAnswer( + df.intersect(df), + Row("id1", 1) :: + Row("id", 1) :: + Row("id1", 2) :: Nil) } test("udf") { From 27192be27708265aba6707d1100eefca537fb2b8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 7 Jan 2016 23:01:34 -0800 Subject: [PATCH 12/20] test case updates. --- .../spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index 5b3abf2fae1bd..b2f1c211afe58 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -39,7 +39,7 @@ class ReplaceOperatorSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = - Join(table1, table2, LeftSemi, Option('a <=> 'c && 'b <=> 'd)).analyze + Distinct(Join(table1, table2, LeftSemi, Option('a <=> 'c && 'b <=> 'd))).analyze comparePlans(optimized, correctAnswer) } From 04a26bd3beaab9d8ea3a4b300eb9f413b5eaa704 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 8 Jan 2016 08:40:49 -0800 Subject: [PATCH 13/20] resolve the ambiguous attributes --- .../sql/catalyst/analysis/Analyzer.scala | 110 ++++++++++-------- .../plans/logical/basicOperators.scala | 15 ++- .../sql/catalyst/analysis/AnalysisSuite.scala | 5 + 3 files changed, 78 insertions(+), 52 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e362b55d80cd1..6e6fe720568f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -344,6 +344,59 @@ class Analyzer( } } + def buildRightChild4Deduplication (left: LogicalPlan, right: LogicalPlan): LogicalPlan = { + val conflictingAttributes = left.outputSet.intersect(right.outputSet) + logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " + + s"between $left and $right") + + right.collect { + // Handle base relations that might appear more than once. + case oldVersion: MultiInstanceRelation + if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => + val newVersion = oldVersion.newInstance() + (oldVersion, newVersion) + + // Handle projects that create conflicting aliases. + case oldVersion @ Project(projectList, _) + if findAliases(projectList).intersect(conflictingAttributes).nonEmpty => + (oldVersion, oldVersion.copy(projectList = newAliases(projectList))) + + case oldVersion @ Aggregate(_, aggregateExpressions, _) + if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty => + (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions))) + + case oldVersion: Generate + if oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty => + val newOutput = oldVersion.generatorOutput.map(_.newInstance()) + (oldVersion, oldVersion.copy(generatorOutput = newOutput)) + + case oldVersion @ Window(_, windowExpressions, _, _, child) + if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes) + .nonEmpty => + (oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions))) + } + // Only handle first case, others will be fixed on the next pass. + .headOption match { + case None => + /* + * No result implies that there is a logical plan node that produces new references + * that this rule cannot handle. When that is the case, there must be another rule + * that resolves these conflicts. Otherwise, the analysis will fail. + */ + right + case Some((oldRelation, newRelation)) => + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val newRight = right transformUp { + case r if r == oldRelation => newRelation + } transformUp { + case other => other transformExpressions { + case a: Attribute => attributeRewrites.get(a).getOrElse(a) + } + } + newRight + } + } + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case p: LogicalPlan if !p.childrenResolved => p @@ -388,57 +441,14 @@ class Analyzer( .map(_.asInstanceOf[NamedExpression]) a.copy(aggregateExpressions = expanded) - // Special handling for cases when self-join introduce duplicate expression ids. + // Special handling for cases when self-join introduces duplicate expression IDs. case j @ Join(left, right, _, _) if !j.selfJoinResolved => - val conflictingAttributes = left.outputSet.intersect(right.outputSet) - logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} in $j") - - right.collect { - // Handle base relations that might appear more than once. - case oldVersion: MultiInstanceRelation - if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => - val newVersion = oldVersion.newInstance() - (oldVersion, newVersion) - - // Handle projects that create conflicting aliases. - case oldVersion @ Project(projectList, _) - if findAliases(projectList).intersect(conflictingAttributes).nonEmpty => - (oldVersion, oldVersion.copy(projectList = newAliases(projectList))) - - case oldVersion @ Aggregate(_, aggregateExpressions, _) - if findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty => - (oldVersion, oldVersion.copy(aggregateExpressions = newAliases(aggregateExpressions))) - - case oldVersion: Generate - if oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty => - val newOutput = oldVersion.generatorOutput.map(_.newInstance()) - (oldVersion, oldVersion.copy(generatorOutput = newOutput)) - - case oldVersion @ Window(_, windowExpressions, _, _, child) - if AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes) - .nonEmpty => - (oldVersion, oldVersion.copy(windowExpressions = newAliases(windowExpressions))) - } - // Only handle first case, others will be fixed on the next pass. - .headOption match { - case None => - /* - * No result implies that there is a logical plan node that produces new references - * that this rule cannot handle. When that is the case, there must be another rule - * that resolves these conflicts. Otherwise, the analysis will fail. - */ - j - case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) - val newRight = right transformUp { - case r if r == oldRelation => newRelation - } transformUp { - case other => other transformExpressions { - case a: Attribute => attributeRewrites.get(a).getOrElse(a) - } - } - j.copy(right = newRight) - } + j.copy(right = buildRightChild4Deduplication(left, right)) + + // Special handling for cases when self-intersect introduces duplicate expression IDs. + // In Optimizer, Intersect will be converted to semi join. + case i @ Intersect(left, right) if !i.duplicateResolved => + i.copy(right = buildRightChild4Deduplication(left, right)) // When resolve `SortOrder`s in Sort based on child, don't report errors as // we still have chance to resolve it based on grandchild diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 79759b5a37b34..46144c862e027 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -98,7 +98,7 @@ abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends Binar leftAttr.withNullability(leftAttr.nullable || rightAttr.nullable) } - final override lazy val resolved: Boolean = + override lazy val resolved: Boolean = childrenResolved && left.output.length == right.output.length && left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } @@ -116,7 +116,18 @@ case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(lef } } -case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) +case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { + def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty + + // Intersect is only resolved if they don't introduce ambiguous expression ids, + // since it will be converted to semi Join by optimizer. + override lazy val resolved: Boolean = { + childrenResolved && + left.output.length == right.output.length && + left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && + duplicateResolved + } +} case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index fa823e3021835..9601797adf578 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -155,6 +155,11 @@ class AnalysisSuite extends AnalysisTest { checkAnalysis(plan, expected) } + test("self intersect should resolve duplicate expression IDs") { + val plan = testRelation.intersect(testRelation) + assertAnalysisSuccess(plan) + } + test("SPARK-8654: invalid CAST in NULL IN(...) expression") { val plan = Project(Alias(In(Literal(null), Seq(Literal(1), Literal(2))), "a")() :: Nil, LocalRelation() From f820c616fe217494ccaed0bf74a0a7410ce503cf Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 8 Jan 2016 19:48:37 -0800 Subject: [PATCH 14/20] resolve comments. --- .../sql/catalyst/analysis/Analyzer.scala | 30 +++++++++++++------ .../sql/catalyst/analysis/CheckAnalysis.scala | 8 ++--- .../catalyst/plans/logical/LogicalPlan.scala | 5 ++++ .../plans/logical/basicOperators.scala | 26 ++++++---------- .../analysis/AnalysisErrorSuite.scala | 3 +- 5 files changed, 41 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2bee1e0b8bc21..8cc0048709ae0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -343,7 +343,11 @@ class Analyzer( } } - def buildRightChild4Deduplication (left: LogicalPlan, right: LogicalPlan): LogicalPlan = { + /** + * Generate a new logical plan for the right child with different expression IDs + * for all conflicting attributes. + */ + private def dedupRight (left: LogicalPlan, right: LogicalPlan): LogicalPlan = { val conflictingAttributes = left.outputSet.intersect(right.outputSet) logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} " + s"between $left and $right") @@ -440,14 +444,22 @@ class Analyzer( .map(_.asInstanceOf[NamedExpression]) a.copy(aggregateExpressions = expanded) - // Special handling for cases when self-join introduces duplicate expression IDs. - case j @ Join(left, right, _, _) if !j.selfJoinResolved => - j.copy(right = buildRightChild4Deduplication(left, right)) - - // Special handling for cases when self-intersect introduces duplicate expression IDs. - // In Optimizer, Intersect will be converted to semi join. - case i @ Intersect(left, right) if !i.duplicateResolved => - i.copy(right = buildRightChild4Deduplication(left, right)) + // To resolve duplicate expression IDs for all the BinaryNode + case b: BinaryNode + if !b.duplicateResolved => b match { + case j @ Join(left, right, _, _) => + j.copy(right = dedupRight(left, right)) + case i @ Intersect(left, right) => + i.copy(right = dedupRight(left, right)) + case e @ Except(left, right) => + e.copy(right = dedupRight(left, right)) + case u @ Union(left, right) => + u.copy(right = dedupRight(left, right)) + case cg @ CoGroup(_, _, _, _, _, _, _, _, left, right) => + cg.copy(right = dedupRight(left, right)) + case other => + other + } // When resolve `SortOrder`s in Sort based on child, don't report errors as // we still have chance to resolve it based on grandchild diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 2a2e0d27d9435..9148d9e8af371 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -206,12 +206,12 @@ trait CheckAnalysis { s"""Only a single table generating function is allowed in a SELECT clause, found: | ${exprs.map(_.prettyString).mkString(",")}""".stripMargin) - // Special handling for cases when self-join introduce duplicate expression ids. - case j @ Join(left, right, _, _) if left.outputSet.intersect(right.outputSet).nonEmpty => - val conflictingAttributes = left.outputSet.intersect(right.outputSet) + case j: BinaryNode if !j.duplicateResolved => + val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet) failAnalysis( s""" - |Failure when resolving conflicting references in Join: + |Failure when resolving conflicting references + |in operator ${operator.simpleString}: |$plan |Conflicting attributes: ${conflictingAttributes.mkString(",")} |""".stripMargin) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 6d859551f8c52..563e39bbcfdcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -315,4 +315,9 @@ abstract class BinaryNode extends LogicalPlan { def right: LogicalPlan override def children: Seq[LogicalPlan] = Seq(left, right) + + def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty + + override lazy val resolved: Boolean = + expressions.forall(_.resolved) && childrenResolved && duplicateResolved } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index a3388efa04df4..58dff2c33af4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.Encoder +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression @@ -101,7 +102,8 @@ abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends Binar override lazy val resolved: Boolean = childrenResolved && left.output.length == right.output.length && - left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } + left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && + duplicateResolved } private[sql] object SetOperation { @@ -116,18 +118,7 @@ case class Union(left: LogicalPlan, right: LogicalPlan) extends SetOperation(lef } } -case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { - def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty - - // Intersect is only resolved if they don't introduce ambiguous expression ids, - // since it will be converted to semi Join by optimizer. - override lazy val resolved: Boolean = { - childrenResolved && - left.output.length == right.output.length && - left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && - duplicateResolved - } -} +case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ @@ -155,13 +146,11 @@ case class Join( } } - def selfJoinResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty - // Joins are only resolved if they don't introduce ambiguous expression ids. override lazy val resolved: Boolean = { childrenResolved && expressions.forall(_.resolved) && - selfJoinResolved && + duplicateResolved && condition.forall(_.dataType == BooleanType) } } @@ -235,7 +224,7 @@ case class Range( end: Long, step: Long, numSlices: Int, - output: Seq[Attribute]) extends LeafNode { + output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { require(step != 0, "step cannot be 0") val numElements: BigInt = { val safeStart = BigInt(start) @@ -248,6 +237,9 @@ case class Range( } } + override def newInstance(): Range = + Range(start, end, step, numSlices, output.map(_.newInstance())) + override def statistics: Statistics = { val sizeInBytes = LongType.defaultSize * numElements Statistics( sizeInBytes = sizeInBytes ) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index fc35959f20547..48018255a4396 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -270,7 +270,8 @@ class AnalysisErrorSuite extends AnalysisTest { val error = intercept[AnalysisException] { SimpleAnalyzer.checkAnalysis(join) } - assert(error.message.contains("Failure when resolving conflicting references in Join")) + assert(error.message.contains("Failure when resolving conflicting references\n" + + "in operator 'Join")) assert(error.message.contains("Conflicting attributes")) } From 4372170f600eb25996c3aa4f09d569312c263686 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 8 Jan 2016 20:16:35 -0800 Subject: [PATCH 15/20] style change. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8cc0048709ae0..c01dcde5bbb96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -445,8 +445,7 @@ class Analyzer( a.copy(aggregateExpressions = expanded) // To resolve duplicate expression IDs for all the BinaryNode - case b: BinaryNode - if !b.duplicateResolved => b match { + case b: BinaryNode if !b.duplicateResolved => b match { case j @ Join(left, right, _, _) => j.copy(right = dedupRight(left, right)) case i @ Intersect(left, right) => @@ -457,8 +456,7 @@ class Analyzer( u.copy(right = dedupRight(left, right)) case cg @ CoGroup(_, _, _, _, _, _, _, _, left, right) => cg.copy(right = dedupRight(left, right)) - case other => - other + case other => other } // When resolve `SortOrder`s in Sort based on child, don't report errors as From 6a7979d35d5cf89d9cd14eb6d6a2a4418a44a669 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 23 Jan 2016 01:08:12 -0800 Subject: [PATCH 16/20] fix failed test cases --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 ++---- .../sql/catalyst/plans/logical/basicOperators.scala | 4 ---- .../sql/catalyst/analysis/HiveTypeCoercionSuite.scala | 9 ++++++--- 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9d188b7db698f..0211ac9405766 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -453,10 +453,8 @@ class Analyzer( i.copy(right = dedupRight(left, right)) case e @ Except(left, right) => e.copy(right = dedupRight(left, right)) - case u @ Union(left, right) => - u.copy(right = dedupRight(left, right)) - case cg @ CoGroup(_, _, _, _, _, _, _, _, left, right) => - cg.copy(right = dedupRight(left, right)) + case cg: CoGroup => + cg.copy(right = dedupRight(cg.left, cg.right)) case other => other } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 21f205b702de6..901762640bc38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -145,10 +145,6 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan { children.length > 1 && childrenResolved && allChildrenCompatible } - def duplicateResolved: Boolean = - children.tail.forall( child => - child.outputSet.intersect(children.head.outputSet).isEmpty) - override def statistics: Statistics = { val sizeInBytes = children.map(_.statistics.sizeInBytes).sum Statistics(sizeInBytes = sizeInBytes) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala index c30434a0063b0..7e6dcf071affe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala @@ -421,13 +421,16 @@ class HiveTypeCoercionSuite extends PlanTest { assert(r2.left.isInstanceOf[Project]) assert(r2.right.isInstanceOf[Project]) + // Even if we are doing self Except, we still add Project. The node Except will not be marked + // as analyzed unless their exprId are de-duplicated. Thus, the func resolveOperators called in + // WidenSetOperationTypes does not skip and return the node before applying the rule. val r3 = wt(Except(firstTable, firstTable)).asInstanceOf[Except] checkOutput(r3.left, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType)) checkOutput(r3.right, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType)) - // Check if no Project is added - assert(r3.left.isInstanceOf[LocalRelation]) - assert(r3.right.isInstanceOf[LocalRelation]) + // Check if a Project is added + assert(r3.left.isInstanceOf[Project]) + assert(r3.right.isInstanceOf[Project]) } test("WidenSetOperationTypes for union") { From e566d79b12ab1d9ed9e3124fec8290cdba99549b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 26 Jan 2016 22:22:53 -0800 Subject: [PATCH 17/20] address comments. --- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 372c935497536..72e31ada9c51b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -127,10 +127,7 @@ object EliminateSerialization extends Rule[LogicalPlan] { } /** - * Pushes certain operations to both sides of a Union, Intersect or Except operator. -======= * Pushes certain operations to both sides of a Union or Except operator. ->>>>>>> IntersectBySemiJoinMerged * Operations that are safe to pushdown are listed as follows. * Union: * Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is @@ -1050,6 +1047,8 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { * SELECT a1, a2 FROM Tab1 INTERSECT SELECT b1, b2 FROM Tab2 * ==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2 * }}} + * + * This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL. */ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { From 3be78c4197519edb4659d8064468125b5223dd29 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 27 Jan 2016 22:27:42 -0800 Subject: [PATCH 18/20] address comments. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ---- .../org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 1 - 2 files changed, 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0211ac9405766..86b03e00dc42f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -451,10 +451,6 @@ class Analyzer( j.copy(right = dedupRight(left, right)) case i @ Intersect(left, right) => i.copy(right = dedupRight(left, right)) - case e @ Except(left, right) => - e.copy(right = dedupRight(left, right)) - case cg: CoGroup => - cg.copy(right = dedupRight(cg.left, cg.right)) case other => other } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 72e31ada9c51b..9a32204a0d44a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -113,7 +113,6 @@ object SamplePushDown extends Rule[LogicalPlan] { } /** -<<<<<<< HEAD * Removes cases where we are unnecessarily going between the object and serialized (InternalRow) * representation of data item. For example back to back map operations. */ From e51de8f98b25281b338ca57f0f13645b626c7673 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 28 Jan 2016 13:46:16 -0800 Subject: [PATCH 19/20] fixed the failed cases. --- .../sql/catalyst/analysis/Analyzer.scala | 13 ++++------ .../sql/catalyst/analysis/CheckAnalysis.scala | 12 ++++++++- .../sql/catalyst/optimizer/Optimizer.scala | 5 +++- .../catalyst/plans/logical/LogicalPlan.scala | 4 +-- .../plans/logical/basicOperators.scala | 26 +++++++++++++------ .../analysis/HiveTypeCoercionSuite.scala | 9 +++---- 6 files changed, 42 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 86b03e00dc42f..5fe700ee00673 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -445,14 +445,11 @@ class Analyzer( .map(_.asInstanceOf[NamedExpression]) a.copy(aggregateExpressions = expanded) - // To resolve duplicate expression IDs for all the BinaryNode - case b: BinaryNode if !b.duplicateResolved => b match { - case j @ Join(left, right, _, _) => - j.copy(right = dedupRight(left, right)) - case i @ Intersect(left, right) => - i.copy(right = dedupRight(left, right)) - case other => other - } + // To resolve duplicate expression IDs for Join and Intersect + case j @ Join(left, right, _, _) if !j.duplicateResolved => + j.copy(right = dedupRight(left, right)) + case i @ Intersect(left, right) if !i.duplicateResolved => + i.copy(right = dedupRight(left, right)) // When resolve `SortOrder`s in Sort based on child, don't report errors as // we still have chance to resolve it based on grandchild diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 9bac1c387fab1..a88478d96c7c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -214,7 +214,7 @@ trait CheckAnalysis { s"""Only a single table generating function is allowed in a SELECT clause, found: | ${exprs.map(_.prettyString).mkString(",")}""".stripMargin) - case j: BinaryNode if !j.duplicateResolved => + case j: Join if !j.duplicateResolved => val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet) failAnalysis( s""" @@ -224,6 +224,16 @@ trait CheckAnalysis { |Conflicting attributes: ${conflictingAttributes.mkString(",")} |""".stripMargin) + case i: Intersect if !i.duplicateResolved => + val conflictingAttributes = i.left.outputSet.intersect(i.right.outputSet) + failAnalysis( + s""" + |Failure when resolving conflicting references + |in operator ${operator.simpleString}: + |$plan + |Conflicting attributes: ${conflictingAttributes.mkString(",")} + |""".stripMargin) + case o if !o.resolved => failAnalysis( s"unresolved operator ${operator.simpleString}") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9a32204a0d44a..f156b5d10acc2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1047,7 +1047,10 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] { * ==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2 * }}} * - * This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL. + * Note: + * 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL. + * 2. This rule has to be done after de-duplicating the attributes; otherwise, the generated + * join conditions will be incorrect. */ object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 563e39bbcfdcf..61ce85086ffee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -316,8 +316,6 @@ abstract class BinaryNode extends LogicalPlan { override def children: Seq[LogicalPlan] = Seq(left, right) - def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty - override lazy val resolved: Boolean = - expressions.forall(_.resolved) && childrenResolved && duplicateResolved + expressions.forall(_.resolved) && childrenResolved } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 901762640bc38..16f4b355b1b6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -91,14 +91,7 @@ case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } -abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - - final override lazy val resolved: Boolean = - childrenResolved && - left.output.length == right.output.length && - left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && - duplicateResolved -} +abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode private[sql] object SetOperation { def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right)) @@ -106,15 +99,30 @@ private[sql] object SetOperation { case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { + def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty + override def output: Seq[Attribute] = left.output.zip(right.output).map { case (leftAttr, rightAttr) => leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable) } + + // Intersect are only resolved if they don't introduce ambiguous expression ids, + // since the Optimizer will convert Intersect to Join. + override lazy val resolved: Boolean = + childrenResolved && + left.output.length == right.output.length && + left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } && + duplicateResolved } case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) { /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output + + override lazy val resolved: Boolean = + childrenResolved && + left.output.length == right.output.length && + left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } } /** Factory for constructing new `Union` nodes. */ @@ -172,6 +180,8 @@ case class Join( } } + def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty + // Joins are only resolved if they don't introduce ambiguous expression ids. override lazy val resolved: Boolean = { childrenResolved && diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala index 7e6dcf071affe..c30434a0063b0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala @@ -421,16 +421,13 @@ class HiveTypeCoercionSuite extends PlanTest { assert(r2.left.isInstanceOf[Project]) assert(r2.right.isInstanceOf[Project]) - // Even if we are doing self Except, we still add Project. The node Except will not be marked - // as analyzed unless their exprId are de-duplicated. Thus, the func resolveOperators called in - // WidenSetOperationTypes does not skip and return the node before applying the rule. val r3 = wt(Except(firstTable, firstTable)).asInstanceOf[Except] checkOutput(r3.left, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType)) checkOutput(r3.right, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType)) - // Check if a Project is added - assert(r3.left.isInstanceOf[Project]) - assert(r3.right.isInstanceOf[Project]) + // Check if no Project is added + assert(r3.left.isInstanceOf[LocalRelation]) + assert(r3.right.isInstanceOf[LocalRelation]) } test("WidenSetOperationTypes for union") { From b600089d4736e9768a8968fb4dead08d28014ac1 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 29 Jan 2016 01:09:48 -0800 Subject: [PATCH 20/20] addressed comments. --- .../sql/catalyst/analysis/CheckAnalysis.scala | 6 ++---- .../catalyst/plans/logical/LogicalPlan.scala | 3 --- .../catalyst/analysis/AnalysisErrorSuite.scala | 3 +-- .../optimizer/AggregateOptimizeSuite.scala | 12 ------------ .../optimizer/ReplaceOperatorSuite.scala | 17 +++++++++++++++-- 5 files changed, 18 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index a88478d96c7c0..4a2f2b8bc6e4c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -218,8 +218,7 @@ trait CheckAnalysis { val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet) failAnalysis( s""" - |Failure when resolving conflicting references - |in operator ${operator.simpleString}: + |Failure when resolving conflicting references in Join: |$plan |Conflicting attributes: ${conflictingAttributes.mkString(",")} |""".stripMargin) @@ -228,8 +227,7 @@ trait CheckAnalysis { val conflictingAttributes = i.left.outputSet.intersect(i.right.outputSet) failAnalysis( s""" - |Failure when resolving conflicting references - |in operator ${operator.simpleString}: + |Failure when resolving conflicting references in Intersect: |$plan |Conflicting attributes: ${conflictingAttributes.mkString(",")} |""".stripMargin) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 61ce85086ffee..6d859551f8c52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -315,7 +315,4 @@ abstract class BinaryNode extends LogicalPlan { def right: LogicalPlan override def children: Seq[LogicalPlan] = Seq(left, right) - - override lazy val resolved: Boolean = - expressions.forall(_.resolved) && childrenResolved } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 48018255a4396..fc35959f20547 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -270,8 +270,7 @@ class AnalysisErrorSuite extends AnalysisTest { val error = intercept[AnalysisException] { SimpleAnalyzer.checkAnalysis(join) } - assert(error.message.contains("Failure when resolving conflicting references\n" + - "in operator 'Join")) + assert(error.message.contains("Failure when resolving conflicting references in Join")) assert(error.message.contains("Conflicting attributes")) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala index 37148a226f293..a4a12c0d62e92 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala @@ -28,21 +28,9 @@ class AggregateOptimizeSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Aggregate", FixedPoint(100), - ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: Nil } - test("replace distinct with aggregate") { - val input = LocalRelation('a.int, 'b.int) - - val query = Distinct(input) - val optimized = Optimize.execute(query.analyze) - - val correctAnswer = Aggregate(input.output, input.output, input) - - comparePlans(optimized, correctAnswer) - } - test("remove literals in grouping expression") { val input = LocalRelation('a.int, 'b.int) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index b2f1c211afe58..f8ae5d9be2084 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -27,7 +27,8 @@ class ReplaceOperatorSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = - Batch("Intersect", FixedPoint(100), + Batch("Replace Operators", FixedPoint(100), + ReplaceDistinctWithAggregate, ReplaceIntersectWithSemiJoin) :: Nil } @@ -39,7 +40,19 @@ class ReplaceOperatorSuite extends PlanTest { val optimized = Optimize.execute(query.analyze) val correctAnswer = - Distinct(Join(table1, table2, LeftSemi, Option('a <=> 'c && 'b <=> 'd))).analyze + Aggregate(table1.output, table1.output, + Join(table1, table2, LeftSemi, Option('a <=> 'c && 'b <=> 'd))).analyze + + comparePlans(optimized, correctAnswer) + } + + test("replace Distinct with Aggregate") { + val input = LocalRelation('a.int, 'b.int) + + val query = Distinct(input) + val optimized = Optimize.execute(query.analyze) + + val correctAnswer = Aggregate(input.output, input.output, input) comparePlans(optimized, correctAnswer) }