From 8d42d0308338d0b584a2a1c1e5e89d6ee2c18938 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 10 Jun 2014 17:22:57 -0700 Subject: [PATCH 1/5] Implement trigger() as limit() followed by collect() Implement trigger() as limit() followed by collect()gdfg --- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 8855c4e87691..44c671d05b31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -374,6 +374,9 @@ class SchemaRDD( override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() + override def take(num: Int): Array[Row] = + new SchemaRDD(sqlContext, Limit(Literal(num), logicalPlan)).collect() + // ======================================================================= // Base RDD functions that do NOT change schema // ======================================================================= From a0ff7c45d2d92367a36365c59377c3c7e2e730d2 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 10 Jun 2014 17:26:11 -0700 Subject: [PATCH 2/5] Adding catalyst rule to fold two consecutive limits Creating a LimitFolding Batch ssdg --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e41fd2db7485..6f9ae6f753dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -29,6 +29,8 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = + Batch("LimitFolding", FixedPoint(100), + CombineLimits) :: Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, @@ -362,3 +364,13 @@ object SimplifyCasts extends Rule[LogicalPlan] { case Cast(e, dataType) if e.dataType == dataType => e } } + +/** + * Combines two adjacent [[catalyst.plans.logical.Limit Limit]] operators into one, merging the + * expressions into one single expression. + */ +object CombineLimits extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case ll @ Limit(le, nl @ Limit(ne, grandChild)) => Limit(If(LessThan(ne, le), ne, le), grandChild) + } +} From b723ac460ad95bb78bb46719dc765b15411e4e09 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Tue, 10 Jun 2014 23:09:50 -0700 Subject: [PATCH 3/5] Added limit folding tests --- .../spark/sql/catalyst/dsl/package.scala | 2 + .../sql/catalyst/optimizer/Optimizer.scala | 5 +- .../optimizer/CombiningLimitsSuite.scala | 71 +++++++++++++++++++ 3 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 3cf163f9a9a7..d177339d40ae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -175,6 +175,8 @@ package object dsl { def where(condition: Expression) = Filter(condition, logicalPlan) + def limit(limitExpr: Expression) = Limit(limitExpr, logicalPlan) + def join( otherPlan: LogicalPlan, joinType: JoinType = Inner, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6f9ae6f753dc..28d1aa2e3aaf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = - Batch("LimitFolding", FixedPoint(100), + Batch("Combine Limits", FixedPoint(100), CombineLimits) :: Batch("ConstantFolding", FixedPoint(100), NullPropagation, @@ -371,6 +371,7 @@ object SimplifyCasts extends Rule[LogicalPlan] { */ object CombineLimits extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case ll @ Limit(le, nl @ Limit(ne, grandChild)) => Limit(If(LessThan(ne, le), ne, le), grandChild) + case ll @ Limit(le, nl @ Limit(ne, grandChild)) => + Limit(If(LessThan(ne, le), ne, le), grandChild) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala new file mode 100644 index 000000000000..3e053b57158d --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ + +class CombiningLimitsSuite extends OptimizerTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Combine Limit", FixedPoint(2), + CombineLimits) :: + Batch("Constant Folding", FixedPoint(3), + NullPropagation, + ConstantFolding, + BooleanSimplification) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("limits: combines two limits") { + val originalQuery = + testRelation + .select('a) + .limit(10).analyze + .limit(5).analyze + + val optimized = Optimize(originalQuery) + val correctAnswer = + testRelation + .select('a) + .limit(5).analyze + + comparePlans(optimized, correctAnswer) + } + + test("limits: combines three limits") { + val originalQuery = + testRelation + .select('a) + .limit(2).analyze + .limit(7).analyze + .limit(5).analyze + + val optimized = Optimize(originalQuery) + val correctAnswer = + testRelation + .select('a) + .limit(2).analyze + + comparePlans(optimized, correctAnswer) + } +} From 1b76ff141e4c9b452599af6c221143875a692eb0 Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 11 Jun 2014 10:12:10 -0700 Subject: [PATCH 4/5] Deprecating limit(limitExpr: Expression) in v1.1.0 --- .../main/scala/org/apache/spark/sql/SchemaRDD.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 44c671d05b31..7ad8edf5a5a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -178,14 +178,18 @@ class SchemaRDD( def orderBy(sortExprs: SortOrder*): SchemaRDD = new SchemaRDD(sqlContext, Sort(sortExprs, logicalPlan)) + @deprecated("use limit with integer argument", "1.1.0") + def limit(limitExpr: Expression): SchemaRDD = + new SchemaRDD(sqlContext, Limit(limitExpr, logicalPlan)) + /** - * Limits the results by the given expressions. + * Limits the results by the given integer. * {{{ * schemaRDD.limit(10) * }}} */ - def limit(limitExpr: Expression): SchemaRDD = - new SchemaRDD(sqlContext, Limit(limitExpr, logicalPlan)) + def limit(limitNum: Int): SchemaRDD = + new SchemaRDD(sqlContext, Limit(Literal(limitNum), logicalPlan)) /** * Performs a grouping followed by an aggregation. @@ -374,8 +378,7 @@ class SchemaRDD( override def collect(): Array[Row] = queryExecution.executedPlan.executeCollect() - override def take(num: Int): Array[Row] = - new SchemaRDD(sqlContext, Limit(Literal(num), logicalPlan)).collect() + override def take(num: Int): Array[Row] = limit(num).collect() // ======================================================================= // Base RDD functions that do NOT change schema From 3eeb8484dc7200c9263c8afb37d8e3bff75647dc Mon Sep 17 00:00:00 2001 From: Sameer Agarwal Date: Wed, 11 Jun 2014 10:13:11 -0700 Subject: [PATCH 5/5] Fixing Tests --- .../catalyst/plans/logical/basicOperators.scala | 4 ++-- .../catalyst/optimizer/CombiningLimitsSuite.scala | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index d3347b622f3d..b777cf424919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -135,9 +135,9 @@ case class Aggregate( def references = (groupingExpressions ++ aggregateExpressions).flatMap(_.references).toSet } -case class Limit(limit: Expression, child: LogicalPlan) extends UnaryNode { +case class Limit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode { def output = child.output - def references = limit.references + def references = limitExpr.references } case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 3e053b57158d..714f01843c0f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -40,10 +40,10 @@ class CombiningLimitsSuite extends OptimizerTest { val originalQuery = testRelation .select('a) - .limit(10).analyze - .limit(5).analyze + .limit(10) + .limit(5) - val optimized = Optimize(originalQuery) + val optimized = Optimize(originalQuery.analyze) val correctAnswer = testRelation .select('a) @@ -56,11 +56,11 @@ class CombiningLimitsSuite extends OptimizerTest { val originalQuery = testRelation .select('a) - .limit(2).analyze - .limit(7).analyze - .limit(5).analyze + .limit(2) + .limit(7) + .limit(5) - val optimized = Optimize(originalQuery) + val optimized = Optimize(originalQuery.analyze) val correctAnswer = testRelation .select('a)