From aa060722fbf28543da78d88ae81cd5fa185ad8e4 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 20 Jun 2014 13:15:23 +0800 Subject: [PATCH 01/51] Reformat the code sytle Reformat the code sytle as intent 4 --- .../apache/spark/sql/execution/joins.scala | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 31cc26962ad9..6cf7b21929b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -236,3 +236,107 @@ case class BroadcastNestedLoopJoin( streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches)) } } + +/* +In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key +rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd +with the largest key. +Then,join the two table with the buildtable. +Finally,union the two result rdd. +*/ +@DeveloperApi +case class SkewJoin( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + buildSide: BuildSide, + left: SparkPlan, + right: SparkPlan, + @transient sc: SparkContext) extends BinaryNode { + override def outputPartitioning: Partitioning = left.outputPartitioning + + override def requiredChildDistribution = + ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + val (buildPlan, streamedPlan) = buildSide match { + case BuildLeft => (left, right) + case BuildRight => (right, left) + } + val (buildKeys, streamedKeys) = buildSide match { + case BuildLeft => (leftKeys, rightKeys) + case BuildRight => (rightKeys, leftKeys) + } + + def output = left.output ++ right.output + + @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) + @transient lazy val streamSideKeyGenerator = new Projection(streamedKeys, streamedPlan.output) + + + def execute() = { + val streamedtable = streamedPlan.execute() + //This will later write as configuration + val sample = streamedtable.sample(false, 0.3, 9).map(row => streamSideKeyGenerator(row)).collect() + val sortedsample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode()) + var max = 0 + var num = sample.size - 1 + var temp = 0 + var maxrowkey = sortedsample(0) + //find the largest key + if (sortedsample.size > 1) { + for (i <- 1 to num) { + if (sortedsample(i - 1) == sortedsample(i)) temp += 1 + else { + if (temp > max) { + max = temp + maxrowkey = sortedsample(i - 1) + } + temp = 0 + } + } + } + val maxkeystreamedtable = streamedtable.filter(row => { + streamSideKeyGenerator(row).toString().equals(maxrowkey.toString()) + }) + val mainstreamedtable = streamedtable.filter(row => { + !streamSideKeyGenerator(row).toString().equals(maxrowkey.toString()) + }) + val buildrdd = buildPlan.execute() + val maxkeyjoinedrdd = maxkeystreamedtable.map(_.copy()).cartesian(buildrdd.map(_.copy())).map { + case (l: Row, r: Row) => buildRow(l ++ r) + } + val mainjoinedrdd = mainstreamedtable.map(_.copy()).cartesian(buildrdd.map(_.copy())).map { + case (l: Row, r: Row) => buildRow(l ++ r) + } + sc.union(maxkeyjoinedrdd, mainjoinedrdd) + } +} + + +object SkewJoin extends Strategy with PredicateHelper { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + // Find inner joins where at least some predicates can be evaluated by matching hash keys + // using the HashFilteredJoin pattern. + case SkewFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => + val hashJoin = + execution.SkewJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case _ => Nil + } +} + +object SkewFilteredJoin extends Logging with PredicateHelper { + /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ + type ReturnType = + (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) + + def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { + // All predicates can be evaluated for inner join (i.e., those that are in the ON + // clause and WHERE clause.) + case FilteredOperation(predicates, join@Join(left, right, Inner, condition)) => + logger.debug(s"Considering Skew inner join on: ${predicates ++ condition}") + splitPredicates(predicates ++ condition, join) + case join@Join(left, right, joinType, condition) => + logger.debug(s"Considering Skew join on: $condition") + splitPredicates(condition.toSeq, join) + case _ => None + } From 4eb43ec21a6bf890f52c61dd1502c8a7893d3824 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 20 Jun 2014 15:57:28 +0800 Subject: [PATCH 02/51] Update basicOperators.scala Hi all, I want to submit a Except operator in basicOperators.scala In SQL case.SQL support two table do except operator. select * from table1 except select * from table2 This operator support the substract function .Return an table with the elements from `this` that are not in `other`. --- .../spark/sql/execution/basicOperators.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8969794c6993..fbbb349aad9a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -204,3 +204,18 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } +/** + * :: DeveloperApi :: + * This operator support the substract function .Return an table with the elements from `this` that are not in `other`. + */ +@DeveloperApi +case class Except(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan { + // TODO:The input children:Seq[SparkPlan] should only contain two SparkPlan + override def output = children.head.output + + override def execute() = { + children(0).execute().subtract(children(1).execute()) + } + + override def otherCopyArgs = sc :: Nil +} From 68815b2bc1ba1eb045eb7c12e403e44de3d82d29 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 20 Jun 2014 16:52:34 +0800 Subject: [PATCH 03/51] Reformat the code style Reformat the name and anotation.Thanks rxin .If there are other code style problems ,i will try to reformat it . --- .../apache/spark/sql/execution/joins.scala | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 6cf7b21929b9..8e3a6758b33b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -237,13 +237,14 @@ case class BroadcastNestedLoopJoin( } } -/* -In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key -rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd -with the largest key. -Then,join the two table with the buildtable. -Finally,union the two result rdd. -*/ +/** + * :: DeveloperApi :: + *In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key + *rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd + *with the largest key. + *Then,join the two table with the buildtable. + *Finally,union the two result rdd. + */ @DeveloperApi case class SkewJoin( leftKeys: Seq[Expression], @@ -273,41 +274,41 @@ case class SkewJoin( def execute() = { - val streamedtable = streamedPlan.execute() + val streamedTable = streamedPlan.execute() //This will later write as configuration - val sample = streamedtable.sample(false, 0.3, 9).map(row => streamSideKeyGenerator(row)).collect() - val sortedsample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode()) + val sample = streamedTable.sample(false, 0.3, 9).map(row => streamSideKeyGenerator(row)).collect() + val sortedSample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode()) var max = 0 var num = sample.size - 1 var temp = 0 - var maxrowkey = sortedsample(0) + var maxrowKey = sortedSample(0) //find the largest key - if (sortedsample.size > 1) { + if (sortedSample.size > 1) { for (i <- 1 to num) { - if (sortedsample(i - 1) == sortedsample(i)) temp += 1 + if (sortedSample(i - 1) == sortedSample(i)) temp += 1 else { if (temp > max) { max = temp - maxrowkey = sortedsample(i - 1) + maxrowKey = sortedSample(i - 1) } temp = 0 } } } - val maxkeystreamedtable = streamedtable.filter(row => { - streamSideKeyGenerator(row).toString().equals(maxrowkey.toString()) + val maxKeyStreamedTable = streamedTable.filter(row => { + streamSideKeyGenerator(row).toString().equals(maxrowKey.toString()) }) - val mainstreamedtable = streamedtable.filter(row => { - !streamSideKeyGenerator(row).toString().equals(maxrowkey.toString()) + val mainStreamedTable = streamedTable.filter(row => { + !streamSideKeyGenerator(row).toString().equals(maxrowKey.toString()) }) - val buildrdd = buildPlan.execute() - val maxkeyjoinedrdd = maxkeystreamedtable.map(_.copy()).cartesian(buildrdd.map(_.copy())).map { + val buildRdd = buildPlan.execute() + val maxKeyJoinedRdd = maxKeyStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map { case (l: Row, r: Row) => buildRow(l ++ r) } - val mainjoinedrdd = mainstreamedtable.map(_.copy()).cartesian(buildrdd.map(_.copy())).map { + val mainJoinedRdd = mainStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map { case (l: Row, r: Row) => buildRow(l ++ r) } - sc.union(maxkeyjoinedrdd, mainjoinedrdd) + sc.union(maxKeyJoinedRdd, mainJoinedRdd) } } From dd329804ffb32033e371a445262836ddcc46c5d3 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Sun, 22 Jun 2014 08:53:53 +0800 Subject: [PATCH 04/51] update1 1delete sc 2change args to left right 3annotation limit less than 100 charactors --- .../org/apache/spark/sql/execution/basicOperators.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index fbbb349aad9a..fbc4a05190c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -206,15 +206,16 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { /** * :: DeveloperApi :: - * This operator support the substract function .Return an table with the elements from `this` that are not in `other`. + * This operator support the substract function . + * Return an table with the elements from `this` that are not in `other`. */ @DeveloperApi -case class Except(children: Seq[SparkPlan])(@transient sc: SparkContext) extends SparkPlan { +case class Subtract(left:SparkPlan,right:SparkPlan) extends SparkPlan { // TODO:The input children:Seq[SparkPlan] should only contain two SparkPlan override def output = children.head.output override def execute() = { - children(0).execute().subtract(children(1).execute()) + left.execute().subtract(right.execute()) } override def otherCopyArgs = sc :: Nil From 2b98962b3b0adce25225ff0632e74739c3b40354 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Sun, 22 Jun 2014 23:16:55 +0800 Subject: [PATCH 05/51] Update SqlParser.scala --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index b3a3a1ef1b5e..b16606ab44c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -133,6 +133,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val TRUE = Keyword("TRUE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") + protected val WHERE = Keyword("EXCEPT") + // Use reflection to find the reserved words defined in this class. protected val reservedWords = From 8945835c0ad6affb3aa0092e5d657e6fe3ff95ea Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Sun, 22 Jun 2014 23:36:07 +0800 Subject: [PATCH 06/51] object SkewJoin extends Strategy --- .../apache/spark/sql/execution/SparkStrategies.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f763106da4e0..1faea19e6ea8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -193,3 +193,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } } + +object SkewJoin extends Strategy with PredicateHelper { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + // Find inner joins where at least some predicates can be evaluated by matching hash keys + // using the HashFilteredJoin pattern. + case SkewFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => + val hashJoin = + execution.SkewJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext) + condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil + case _ => Nil + } +} From cf5b9d0e87bd3e87079951cd1145bfaa5a3ba0eb Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 09:21:29 +0800 Subject: [PATCH 07/51] Update basicOperators.scala --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index fbc4a05190c6..1d3ca10204c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -217,6 +217,4 @@ case class Subtract(left:SparkPlan,right:SparkPlan) extends SparkPlan { override def execute() = { left.execute().subtract(right.execute()) } - - override def otherCopyArgs = sc :: Nil } From 7a98c37b4220f5efc9f56fd304d9038e8382429e Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 09:23:47 +0800 Subject: [PATCH 08/51] Update basicOperators.scala --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 1d3ca10204c1..91b41a9a5245 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -210,7 +210,7 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { * Return an table with the elements from `this` that are not in `other`. */ @DeveloperApi -case class Subtract(left:SparkPlan,right:SparkPlan) extends SparkPlan { +case class Subtract(left:SparkPlan,right:SparkPlan) extends BinaryNode { // TODO:The input children:Seq[SparkPlan] should only contain two SparkPlan override def output = children.head.output From 3305e409f05380dedf0f7200f36a9e9ada5bd8a8 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 09:30:16 +0800 Subject: [PATCH 09/51] SqlParser --- .../src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 0cc4592047b1..12d86e14404e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -119,6 +119,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") + protected val SUBTRACT = Keyword("SUBTRACT") // Use reflection to find the reserved words defined in this class. protected val reservedWords = From 1fe96c0b0c73686a3953867280193f00ee29e6d0 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 09:35:23 +0800 Subject: [PATCH 10/51] HiveQl.scala update --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index ec653efcc8c5..979973119081 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -614,6 +614,8 @@ private[hive] object HiveQl { queries.reduceLeft(Union) case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right)) + + case Token("TOK_SUBTRACT", left :: right :: Nil) => Subtract(nodeToPlan(left), nodeToPlan(right)) case a: ASTNode => throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") From a8a194803b9e9f27d88b062bed5fd6375e2db452 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 09:41:49 +0800 Subject: [PATCH 11/51] SparkStrategies --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4694f25d6d63..578f837e15e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -236,6 +236,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sparkContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil + case Subtract(left,right) => //new subtract operator + execution.Subtract(left,right) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => From 08089210ebbf85fc9bf4080cb9e358834c5f4361 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 09:52:13 +0800 Subject: [PATCH 12/51] SQLQuerySuite --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e9360b0fc791..8c858859dbb8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -369,6 +369,17 @@ class SQLQuerySuite extends QueryTest { (3, null))) } + test("subtract") { + checkAnswer( + sql("SELECT * FROM lowerCaseData SUBTRACT SELECT * FROM upperCaseData "), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData SUBTRACT SELECT * FROM lowerCaseData "), Nil) + } + test("SET commands semantics using sql()") { clear() val testKey = "test.key.0" From 2d4bfbdb090a350082b993d18678fd46aa767677 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 09:53:28 +0800 Subject: [PATCH 13/51] Update SQLQuerySuite.scala --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8c858859dbb8..e5165488e964 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -373,9 +373,9 @@ class SQLQuerySuite extends QueryTest { checkAnswer( sql("SELECT * FROM lowerCaseData SUBTRACT SELECT * FROM upperCaseData "), (1, "a") :: - (2, "b") :: - (3, "c") :: - (4, "d") :: Nil) + (2, "b") :: + (3, "c") :: + 4, "d") :: Nil) checkAnswer( sql("SELECT * FROM lowerCaseData SUBTRACT SELECT * FROM lowerCaseData "), Nil) } From 4bdd5205a4beef1f45abf726a555de0f790fbaff Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 11:13:25 +0800 Subject: [PATCH 14/51] Update SqlParser.scala --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 12d86e14404e..51bf1eeb98be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -140,7 +140,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | - UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } + UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } | + SUBTRACT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Subtract(q1, q2) } ) | insert | cache ) From 4bf80b1071a327b1e2a66396e0180e6c955ecc80 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 13:20:29 +0800 Subject: [PATCH 15/51] update subtract to except --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 51bf1eeb98be..5bcba7a6d440 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -119,7 +119,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") - protected val SUBTRACT = Keyword("SUBTRACT") + protected val SUBTRACT = Keyword("EXCEPT") // Use reflection to find the reserved words defined in this class. protected val reservedWords = From aab3785863c432c91fb7576d16b2711317371156 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 13:21:52 +0800 Subject: [PATCH 16/51] Update SQLQuerySuite.scala --- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e5165488e964..0aece34bd345 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -375,7 +375,7 @@ class SQLQuerySuite extends QueryTest { (1, "a") :: (2, "b") :: (3, "c") :: - 4, "d") :: Nil) + (4, "d") :: Nil) checkAnswer( sql("SELECT * FROM lowerCaseData SUBTRACT SELECT * FROM lowerCaseData "), Nil) } From 052346dcadbcb159f3518b3e9c04889a36721581 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 15:19:59 +0800 Subject: [PATCH 17/51] Subtract is conflict with Subtract(e1,e2) Subtract is conflict with Subtract(e1,e2) in this file so i change the name except in basic operator i rename it Subtract --- .../apache/spark/sql/catalyst/SqlParser.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 5bcba7a6d440..db7c771957f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -119,7 +119,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") - protected val SUBTRACT = Keyword("EXCEPT") + protected val EXCEPT = Keyword("EXCEPT") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -137,14 +137,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers { } } - protected lazy val query: Parser[LogicalPlan] = ( - select * ( - UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | - UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } | - SUBTRACT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Subtract(q1, q2) } - ) - | insert | cache - ) + protected lazy val query: Parser[LogicalPlan] = + select * ( + UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2)} | + UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} | + EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} + ) | insert protected lazy val select: Parser[LogicalPlan] = SELECT ~> opt(DISTINCT) ~ projections ~ From 7859e56250d2946209402c1d57b3e2a1e3591892 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 15:23:13 +0800 Subject: [PATCH 18/51] Update SparkStrategies.scala --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 578f837e15e8..9ea5d0704d8d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -236,8 +236,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sparkContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil - case Subtract(left,right) => //new subtract operator - execution.Subtract(left,right) :: Nil + case logical.Except(left,right) => //yanjiegaonew + execution.Subtract(planLater(left),planLater(right)) :: Nil //yanjiegaonew case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => From a36eb0a1ec805f33c2f60e0f88fd1133803d2e2e Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 15:30:28 +0800 Subject: [PATCH 19/51] Update basicOperators.scala --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 3e0639867b27..5853c6978382 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -89,6 +89,13 @@ case class Join( } } +case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + // TODO: These aren't really the same attributes as nullability etc might change. + def output = left.output + + def references = Set.empty +} + case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], From 3fe77460029292ab6fdf9d6d18546f3aebac9b89 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 15:32:29 +0800 Subject: [PATCH 20/51] Update SQLQuerySuite.scala --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 0aece34bd345..9384cb8c3d6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -369,17 +369,19 @@ class SQLQuerySuite extends QueryTest { (3, null))) } - test("subtract") { + test("EXCEPT") { + checkAnswer( - sql("SELECT * FROM lowerCaseData SUBTRACT SELECT * FROM upperCaseData "), + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), (1, "a") :: (2, "b") :: (3, "c") :: (4, "d") :: Nil) checkAnswer( - sql("SELECT * FROM lowerCaseData SUBTRACT SELECT * FROM lowerCaseData "), Nil) + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil) + checkAnswer( + sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) } - test("SET commands semantics using sql()") { clear() val testKey = "test.key.0" From 670a1bb1de8f4222019253346ca0420d378c5e88 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Mon, 23 Jun 2014 15:50:23 +0800 Subject: [PATCH 21/51] Update HiveQl.scala --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 979973119081..02fd152869aa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -615,7 +615,7 @@ private[hive] object HiveQl { case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right)) - case Token("TOK_SUBTRACT", left :: right :: Nil) => Subtract(nodeToPlan(left), nodeToPlan(right)) + case Token("TOK_EXCEPT", left :: right :: Nil) => Subtract(nodeToPlan(left), nodeToPlan(right)) case a: ASTNode => throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") From 7f3d6139863ed22dd25cb5e3d323841b4f40d2a9 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 24 Jun 2014 03:27:48 +0800 Subject: [PATCH 22/51] update .map(_.copy()) --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 91b41a9a5245..248e68d22b84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -215,6 +215,6 @@ case class Subtract(left:SparkPlan,right:SparkPlan) extends BinaryNode { override def output = children.head.output override def execute() = { - left.execute().subtract(right.execute()) + left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) } } From 7ea9b91ebd681aee9b6a9fb68a5acd425f202418 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 10:38:55 +0800 Subject: [PATCH 23/51] remove annotation --- .../apache/spark/sql/catalyst/plans/logical/basicOperators.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 5853c6978382..bac5a724647f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -90,7 +90,6 @@ case class Join( } case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - // TODO: These aren't really the same attributes as nullability etc might change. def output = left.output def references = Set.empty From f1ea3f334ed823dbc3dee1dfef59be9fbc4abf36 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 10:40:24 +0800 Subject: [PATCH 24/51] remove comment --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 9ea5d0704d8d..8b7e26246b0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -236,8 +236,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sparkContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil - case logical.Except(left,right) => //yanjiegaonew - execution.Subtract(planLater(left),planLater(right)) :: Nil //yanjiegaonew + case logical.Except(left,right) => + execution.Subtract(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => From cf232ebd327ea91760077d82a20395c6e09d6ac0 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 10:44:45 +0800 Subject: [PATCH 25/51] update 1comment 2space3 left.out --- .../org/apache/spark/sql/execution/basicOperators.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 248e68d22b84..c90e39d5e215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -207,12 +207,12 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { /** * :: DeveloperApi :: * This operator support the substract function . - * Return an table with the elements from `this` that are not in `other`. + * Return an table with the elements from `left` that are not in `right`. */ @DeveloperApi -case class Subtract(left:SparkPlan,right:SparkPlan) extends BinaryNode { +case class Subtract(left: SparkPlan, right: SparkPlan) extends BinaryNode { // TODO:The input children:Seq[SparkPlan] should only contain two SparkPlan - override def output = children.head.output + override def output = left.output override def execute() = { left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) From 32a80abce6bfff7450a8f906c324cadd4d0c8d4f Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 10:47:01 +0800 Subject: [PATCH 26/51] remove except in HiveQl --- sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 02fd152869aa..ec653efcc8c5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -614,8 +614,6 @@ private[hive] object HiveQl { queries.reduceLeft(Union) case Token("TOK_UNION", left :: right :: Nil) => Union(nodeToPlan(left), nodeToPlan(right)) - - case Token("TOK_EXCEPT", left :: right :: Nil) => Subtract(nodeToPlan(left), nodeToPlan(right)) case a: ASTNode => throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") From ee066b38ea2a9d16e84ca470c9fc795578ce4ff6 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 17:04:34 +0800 Subject: [PATCH 27/51] Update basicOperators.scala --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index c90e39d5e215..d89bad652f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -211,7 +211,6 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { */ @DeveloperApi case class Subtract(left: SparkPlan, right: SparkPlan) extends BinaryNode { - // TODO:The input children:Seq[SparkPlan] should only contain two SparkPlan override def output = left.output override def execute() = { From 5c8a224cdeae3605a1639406c93bbbe0ff8330da Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Fri, 27 Jun 2014 18:24:36 +0800 Subject: [PATCH 28/51] update the line less than 100c --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index db7c771957f0..4193dd90234a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -140,8 +140,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2)} | - UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} | - EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} + EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | + UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} ) | insert protected lazy val select: Parser[LogicalPlan] = From 76807425bdf13c9eec9969e43e9b7eae7636691a Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 10:25:05 +0800 Subject: [PATCH 29/51] Update SqlParser.scala --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 08f92e2974d4..8429eb0ba4c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -119,12 +119,8 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") -<<<<<<< HEAD protected val EXCEPT = Keyword("EXCEPT") -======= - protected val WHERE = Keyword("EXCEPT") ->>>>>>> origin/master // Use reflection to find the reserved words defined in this class. protected val reservedWords = From 424c5078b8dfe62199059a451d50fe9355110c28 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 10:26:06 +0800 Subject: [PATCH 30/51] Update joins.scala --- .../apache/spark/sql/execution/joins.scala | 104 ------------------ 1 file changed, 104 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 05b6a39331e7..4bcdfe7c2ee9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -342,107 +342,3 @@ case class BroadcastNestedLoopJoin( } } -/** - * :: DeveloperApi :: - *In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key - *rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd - *with the largest key. - *Then,join the two table with the buildtable. - *Finally,union the two result rdd. - */ -@DeveloperApi -case class SkewJoin( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - buildSide: BuildSide, - left: SparkPlan, - right: SparkPlan, - @transient sc: SparkContext) extends BinaryNode { - override def outputPartitioning: Partitioning = left.outputPartitioning - - override def requiredChildDistribution = - ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - - val (buildPlan, streamedPlan) = buildSide match { - case BuildLeft => (left, right) - case BuildRight => (right, left) - } - val (buildKeys, streamedKeys) = buildSide match { - case BuildLeft => (leftKeys, rightKeys) - case BuildRight => (rightKeys, leftKeys) - } - - def output = left.output ++ right.output - - @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) - @transient lazy val streamSideKeyGenerator = new Projection(streamedKeys, streamedPlan.output) - - - def execute() = { - val streamedTable = streamedPlan.execute() - //This will later write as configuration - val sample = streamedTable.sample(false, 0.3, 9).map(row => streamSideKeyGenerator(row)).collect() - val sortedSample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode()) - var max = 0 - var num = sample.size - 1 - var temp = 0 - var maxrowKey = sortedSample(0) - //find the largest key - if (sortedSample.size > 1) { - for (i <- 1 to num) { - if (sortedSample(i - 1) == sortedSample(i)) temp += 1 - else { - if (temp > max) { - max = temp - maxrowKey = sortedSample(i - 1) - } - temp = 0 - } - } - } - val maxKeyStreamedTable = streamedTable.filter(row => { - streamSideKeyGenerator(row).toString().equals(maxrowKey.toString()) - }) - val mainStreamedTable = streamedTable.filter(row => { - !streamSideKeyGenerator(row).toString().equals(maxrowKey.toString()) - }) - val buildRdd = buildPlan.execute() - val maxKeyJoinedRdd = maxKeyStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map { - case (l: Row, r: Row) => buildRow(l ++ r) - } - val mainJoinedRdd = mainStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map { - case (l: Row, r: Row) => buildRow(l ++ r) - } - sc.union(maxKeyJoinedRdd, mainJoinedRdd) - } -} - - -object SkewJoin extends Strategy with PredicateHelper { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Find inner joins where at least some predicates can be evaluated by matching hash keys - // using the HashFilteredJoin pattern. - case SkewFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => - val hashJoin = - execution.SkewJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil - case _ => Nil - } -} - -object SkewFilteredJoin extends Logging with PredicateHelper { - /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ - type ReturnType = - (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) - - def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { - // All predicates can be evaluated for inner join (i.e., those that are in the ON - // clause and WHERE clause.) - case FilteredOperation(predicates, join@Join(left, right, Inner, condition)) => - logger.debug(s"Considering Skew inner join on: ${predicates ++ condition}") - splitPredicates(predicates ++ condition, join) - case join@Join(left, right, joinType, condition) => - logger.debug(s"Considering Skew join on: $condition") - splitPredicates(condition.toSeq, join) - case _ => None - } From d6a460488c487dda6b59d212ea37632a9da92a7a Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 10:27:32 +0800 Subject: [PATCH 31/51] Update joins.scala --- .../src/main/scala/org/apache/spark/sql/execution/joins.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 4bcdfe7c2ee9..8d7a5ba59f96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -341,4 +341,3 @@ case class BroadcastNestedLoopJoin( streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches)) } } - From 9847dcf1389997579a7a1518fc94559de1421b3f Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 11:18:19 +0800 Subject: [PATCH 32/51] update SqlParser on masterbranch --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index b16606ab44c4..82262bad8f77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -133,7 +133,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val TRUE = Keyword("TRUE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") - protected val WHERE = Keyword("EXCEPT") + protected val WHERE = Keyword("EXCEPT") // Use reflection to find the reserved words defined in this class. @@ -170,6 +170,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | + EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert From 8dd063f8552865170f552ae70978450cbbc7c41c Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 11:21:41 +0800 Subject: [PATCH 33/51] Update logical/basicOperators on master branch --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 732708e146b0..2c2dde17af4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -84,6 +84,12 @@ case class Join( def output = left.output ++ right.output } +case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + def output = left.output + + def references = Set.empty +} + case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], From 26f833f711fa2a7e69dbbba21051eb6440979551 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 11:25:06 +0800 Subject: [PATCH 34/51] update SparkStrategies.scala on master branch [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151 --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 1faea19e6ea8..24fb885e84a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -182,6 +182,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Limit(limit, planLater(child))(sparkContext) :: Nil case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil + case logical.Except(left,right) => + execution.Subtract(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => From 3bf7def5983ef4519e2bd239c99eb07307a88cdc Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 11:29:18 +0800 Subject: [PATCH 35/51] update SqlParser on master branch [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151 --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 82262bad8f77..7156bc0a9190 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -133,7 +133,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val TRUE = Keyword("TRUE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") - protected val WHERE = Keyword("EXCEPT") + protected val EXCEPT = Keyword("EXCEPT") // Use reflection to find the reserved words defined in this class. From a639935d1167c6038f7b8dea6ab70859f48bdf95 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 11:31:49 +0800 Subject: [PATCH 36/51] Update SparkStrategies.scala --- .../apache/spark/sql/execution/SparkStrategies.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 544dce3247a9..8a1ec2b250a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -262,14 +262,3 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } -object SkewJoin extends Strategy with PredicateHelper { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Find inner joins where at least some predicates can be evaluated by matching hash keys - // using the HashFilteredJoin pattern. - case SkewFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => - val hashJoin = - execution.SkewJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil - case _ => Nil - } -} From a28decedc843c067cdd2067fdbf636b7f95824da Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 11:33:10 +0800 Subject: [PATCH 37/51] Update logical/basicOperators on master branch [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151 --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 2c2dde17af4a..b88a3b51e3ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -90,6 +90,12 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { def references = Set.empty } +case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + def output = left.output + + def references = Set.empty +} + case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], From 7f916b5f4db59f9813b02d75016f99327a0b222e Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 11:34:24 +0800 Subject: [PATCH 38/51] update execution/basicOperators on master branch [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151 --- .../apache/spark/sql/execution/basicOperators.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 8969794c6993..d89bad652f7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -204,3 +204,16 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { override def execute() = rdd } +/** + * :: DeveloperApi :: + * This operator support the substract function . + * Return an table with the elements from `left` that are not in `right`. + */ +@DeveloperApi +case class Subtract(left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def output = left.output + + override def execute() = { + left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) + } +} From 0e72233b18d446f8023bbccf73fd7ef3f87e6142 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 11:35:55 +0800 Subject: [PATCH 39/51] update SQLQuerySuite on master branch [SPARK-2234][SQL]Spark SQL basicOperators add Except operator #1151 --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e966d89c30cf..9903f9db6965 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -267,3 +267,17 @@ class SQLQuerySuite extends QueryTest { (4, "D"))) } } + +test("EXCEPT") { + + checkAnswer( + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil) + checkAnswer( + sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) + } From 60f5ddd1de996e7dfe62858af5df3e2d69d814ab Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Tue, 1 Jul 2014 14:16:45 +0800 Subject: [PATCH 40/51] update less than 100c --- .../scala/org/apache/spark/sql/catalyst/SqlParser.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 8429eb0ba4c7..d4f700c13814 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -140,9 +140,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = select * ( - UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2)} | - EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | - UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} + UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2)} | + EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | + UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} ) | insert protected lazy val select: Parser[LogicalPlan] = From a0d4e73c9280a45db01cf01fd924aec333e931bf Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Wed, 2 Jul 2014 13:24:40 +0800 Subject: [PATCH 41/51] delete skew join --- .../apache/spark/sql/execution/SparkStrategies.scala | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 24fb885e84a7..837f4aa7c082 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -196,14 +196,4 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } -object SkewJoin extends Strategy with PredicateHelper { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Find inner joins where at least some predicates can be evaluated by matching hash keys - // using the HashFilteredJoin pattern. - case SkewFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => - val hashJoin = - execution.SkewJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil - case _ => Nil - } -} + From dd9ba5e27b5cc4765b727bfd3249e585b67fa182 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Wed, 2 Jul 2014 13:26:11 +0800 Subject: [PATCH 42/51] Update joins.scala --- .../apache/spark/sql/execution/joins.scala | 102 ------------------ 1 file changed, 102 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 8e3a6758b33b..86a0e6c35c50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -237,107 +237,5 @@ case class BroadcastNestedLoopJoin( } } -/** - * :: DeveloperApi :: - *In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key,then make the largest key - *rows as a table rdd.The streamed rdd will be made as mainstreamedtable rdd without the largest key and the maxkeystreamedtable rdd - *with the largest key. - *Then,join the two table with the buildtable. - *Finally,union the two result rdd. - */ -@DeveloperApi -case class SkewJoin( - leftKeys: Seq[Expression], - rightKeys: Seq[Expression], - buildSide: BuildSide, - left: SparkPlan, - right: SparkPlan, - @transient sc: SparkContext) extends BinaryNode { - override def outputPartitioning: Partitioning = left.outputPartitioning - - override def requiredChildDistribution = - ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - - val (buildPlan, streamedPlan) = buildSide match { - case BuildLeft => (left, right) - case BuildRight => (right, left) - } - val (buildKeys, streamedKeys) = buildSide match { - case BuildLeft => (leftKeys, rightKeys) - case BuildRight => (rightKeys, leftKeys) - } - - def output = left.output ++ right.output - - @transient lazy val buildSideKeyGenerator = new Projection(buildKeys, buildPlan.output) - @transient lazy val streamSideKeyGenerator = new Projection(streamedKeys, streamedPlan.output) - - - def execute() = { - val streamedTable = streamedPlan.execute() - //This will later write as configuration - val sample = streamedTable.sample(false, 0.3, 9).map(row => streamSideKeyGenerator(row)).collect() - val sortedSample = sample.sortWith((row1, row2) => row1.hashCode() > row2.hashCode()) - var max = 0 - var num = sample.size - 1 - var temp = 0 - var maxrowKey = sortedSample(0) - //find the largest key - if (sortedSample.size > 1) { - for (i <- 1 to num) { - if (sortedSample(i - 1) == sortedSample(i)) temp += 1 - else { - if (temp > max) { - max = temp - maxrowKey = sortedSample(i - 1) - } - temp = 0 - } - } - } - val maxKeyStreamedTable = streamedTable.filter(row => { - streamSideKeyGenerator(row).toString().equals(maxrowKey.toString()) - }) - val mainStreamedTable = streamedTable.filter(row => { - !streamSideKeyGenerator(row).toString().equals(maxrowKey.toString()) - }) - val buildRdd = buildPlan.execute() - val maxKeyJoinedRdd = maxKeyStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map { - case (l: Row, r: Row) => buildRow(l ++ r) - } - val mainJoinedRdd = mainStreamedTable.map(_.copy()).cartesian(buildRdd.map(_.copy())).map { - case (l: Row, r: Row) => buildRow(l ++ r) - } - sc.union(maxKeyJoinedRdd, mainJoinedRdd) - } -} -object SkewJoin extends Strategy with PredicateHelper { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - // Find inner joins where at least some predicates can be evaluated by matching hash keys - // using the HashFilteredJoin pattern. - case SkewFilteredJoin(Inner, leftKeys, rightKeys, condition, left, right) => - val hashJoin = - execution.SkewJoin(leftKeys, rightKeys, BuildRight, planLater(left), planLater(right), sparkContext) - condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil - case _ => Nil - } -} - -object SkewFilteredJoin extends Logging with PredicateHelper { - /** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */ - type ReturnType = - (JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan) - - def unapply(plan: LogicalPlan): Option[ReturnType] = plan match { - // All predicates can be evaluated for inner join (i.e., those that are in the ON - // clause and WHERE clause.) - case FilteredOperation(predicates, join@Join(left, right, Inner, condition)) => - logger.debug(s"Considering Skew inner join on: ${predicates ++ condition}") - splitPredicates(predicates ++ condition, join) - case join@Join(left, right, joinType, condition) => - logger.debug(s"Considering Skew join on: $condition") - splitPredicates(condition.toSeq, join) - case _ => None - } From 8e6bb0085b8b2f00277aa9e2a3d08433a4628cac Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Wed, 2 Jul 2014 13:27:22 +0800 Subject: [PATCH 43/51] delete conflict except --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index b88a3b51e3ea..2c2dde17af4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -90,12 +90,6 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { def references = Set.empty } -case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - def output = left.output - - def references = Set.empty -} - case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], From fa68a988d4cbdcbfaf00fdbe468f5fbabb4360b8 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Wed, 2 Jul 2014 13:28:12 +0800 Subject: [PATCH 44/51] Update joins.scala --- .../src/main/scala/org/apache/spark/sql/execution/joins.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala index 86a0e6c35c50..31cc26962ad9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala @@ -236,6 +236,3 @@ case class BroadcastNestedLoopJoin( streamedPlusMatches.flatMap(_._1), sc.makeRDD(rightOuterMatches)) } } - - - From 7e7c83fadc736efd9066ccfba31c7ac23fcc1240 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Wed, 2 Jul 2014 17:54:59 +0800 Subject: [PATCH 45/51] delete conflict except --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 38b079c32ebb..bac5a724647f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -95,12 +95,6 @@ case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { def references = Set.empty } -case class Except(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { - def output = left.output - - def references = Set.empty -} - case class InsertIntoTable( table: LogicalPlan, partition: Map[String, Option[String]], From 7e0ec2988360949033e0acb80d2931a10e2be5b2 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Wed, 2 Jul 2014 17:55:42 +0800 Subject: [PATCH 46/51] delete multi test --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index fb609fcd30e3..f42e358d49c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -420,17 +420,3 @@ class SQLQuerySuite extends QueryTest { clear() } } - -test("EXCEPT") { - - checkAnswer( - sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM upperCaseData "), - (1, "a") :: - (2, "b") :: - (3, "c") :: - (4, "d") :: Nil) - checkAnswer( - sql("SELECT * FROM lowerCaseData EXCEPT SELECT * FROM lowerCaseData "), Nil) - checkAnswer( - sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) - } From b4c38691969c65be5580d881c563b0b875929e99 Mon Sep 17 00:00:00 2001 From: Yanjie Gao Date: Wed, 2 Jul 2014 19:21:15 +0800 Subject: [PATCH 47/51] change SparkStrategies Sparkcontext to SqlContext union operator --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 9b6b45442304..78a90acb7ae0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -272,7 +272,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Limit(IntegerLiteral(limit), child) => execution.Limit(limit, planLater(child))(sqlContext) :: Nil case Unions(unionChildren) => - execution.Union(unionChildren.map(planLater))(sparkContext) :: Nil + execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil case logical.Except(left,right) => execution.Subtract(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => From 09c7413009207d599e130d49f51f0023174891bc Mon Sep 17 00:00:00 2001 From: YanjieGao <396154235@qq.com> Date: Thu, 3 Jul 2014 09:16:52 +0800 Subject: [PATCH 48/51] pr 1151 SqlParser add cache ,basic Operator rename Except and modify comment --- .../main/scala/org/apache/spark/sql/catalyst/SqlParser.scala | 2 +- .../org/apache/spark/sql/execution/SparkStrategies.scala | 2 +- .../org/apache/spark/sql/execution/basicOperators.scala | 5 ++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 3549570de5cd..5befac38c6f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -142,7 +142,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2)} | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} - ) | insert + ) | insert | cache protected lazy val select: Parser[LogicalPlan] = SELECT ~> opt(DISTINCT) ~ projections ~ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 78a90acb7ae0..c5a7f306bf4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -274,7 +274,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case Unions(unionChildren) => execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil case logical.Except(left,right) => - execution.Subtract(planLater(left),planLater(right)) :: Nil + execution.Except(planLater(left),planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 95d3b8255ef6..2dabfb099076 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -208,11 +208,10 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { /** * :: DeveloperApi :: - * This operator support the substract function . - * Return an table with the elements from `left` that are not in `right`. + * Returns a table with the elements from left that are not in right using the b * uilt-in spark subtract function. */ @DeveloperApi -case class Subtract(left: SparkPlan, right: SparkPlan) extends BinaryNode { +case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { override def output = left.output override def execute() = { From 9940d192ed704c9705219841086697d648a47709 Mon Sep 17 00:00:00 2001 From: YanjieGao <396154235@qq.com> Date: Thu, 3 Jul 2014 09:41:33 +0800 Subject: [PATCH 49/51] make comment less than 100c --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 2dabfb099076..4b59e0b4e58e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -208,7 +208,8 @@ case class ExistingRdd(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode { /** * :: DeveloperApi :: - * Returns a table with the elements from left that are not in right using the b * uilt-in spark subtract function. + * Returns a table with the elements from left that are not in right using + * the built-in spark subtract function. */ @DeveloperApi case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { From 2ff7d73bd969a8d38acf36fde4318b7d397da25d Mon Sep 17 00:00:00 2001 From: YanjieGao <396154235@qq.com> Date: Fri, 4 Jul 2014 09:35:30 +0800 Subject: [PATCH 50/51] resolve the identation in SqlParser and SparkStrategies --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 14 ++++++++------ .../spark/sql/execution/SparkStrategies.scala | 2 -- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 5befac38c6f3..ecb11129557b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -137,12 +137,14 @@ class SqlParser extends StandardTokenParsers with PackratParsers { } } - protected lazy val query: Parser[LogicalPlan] = - select * ( - UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2)} | - EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | - UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2))} - ) | insert | cache + protected lazy val query: Parser[LogicalPlan] = ( + select * ( + UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | + EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | + UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } + ) + | insert | cache + ) protected lazy val select: Parser[LogicalPlan] = SELECT ~> opt(DISTINCT) ~ projections ~ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c5a7f306bf4c..9e036e127bd5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -298,5 +298,3 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } } - - From f19f89990b335cdd5a372300d76034ceb04f6d20 Mon Sep 17 00:00:00 2001 From: YanjieGao <396154235@qq.com> Date: Fri, 4 Jul 2014 09:41:44 +0800 Subject: [PATCH 51/51] add a new blank line in basicoperators.scala --- sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6669a9c1ad47..5c6701e203d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -384,6 +384,7 @@ class SQLQuerySuite extends QueryTest { checkAnswer( sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) } + test("SET commands semantics using sql()") { TestSQLContext.settings.synchronized { clear()