From d7da496d88296f20585c354b2b53b086daa49fc3 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 13 Aug 2015 14:55:56 -0700 Subject: [PATCH 1/4] Fix TakeOrderedAndProject's output. --- .../scala/org/apache/spark/sql/SQLConf.scala | 2 +- .../spark/sql/execution/basicOperators.scala | 12 ++++++++++- .../org/apache/spark/sql/SQLQuerySuite.scala | 17 ++++++++++++++++ .../spark/sql/execution/PlannerSuite.scala | 20 ++++++++++++++++--- 4 files changed, 46 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index e9de14f02550..4b36acfd60fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -228,7 +228,7 @@ private[spark] object SQLConf { doc = "When true, use the optimized Tungsten physical execution backend which explicitly " + "manages memory and dynamically generates bytecode for expression evaluation.") - val CODEGEN_ENABLED = booleanConf("spark.sql.codegen", + val CODEGEN_ENABLED = booleanConf("spark..codegen", defaultValue = Some(true), // use TUNGSTEN_ENABLED as default doc = "When true, code will be dynamically generated at runtime for expression evaluation in" + " a specific query.", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 247c900baae9..77b98064a9e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -237,7 +237,10 @@ case class TakeOrderedAndProject( projectList: Option[Seq[NamedExpression]], child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output + override def output: Seq[Attribute] = { + val projectOutput = projectList.map(_.map(_.toAttribute)) + projectOutput.getOrElse(child.output) + } override def outputPartitioning: Partitioning = SinglePartition @@ -263,6 +266,13 @@ case class TakeOrderedAndProject( protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1) override def outputOrdering: Seq[SortOrder] = sortOrder + + override def simpleString: String = { + val orderByString = sortOrder.mkString("[", ",", "]") + val outputString = output.mkString("[", ",", "]") + + s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8c2c328f8191..f781819d2896 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1626,4 +1626,21 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sqlContext.dropTempTable("1one") } + + test("SPARK-9949: Make sure TakeOrderedAndProject returns correct result") { + // We add a dummy limit (i.e. limit(count)) to trigger TakeOrderedAndProject. + val count = testData.count().toInt + val df = + testData + .select('key, 'value) + .sort('key) + .limit(count) + .select('value + 1 as "added") + .groupBy() + .sum("added") + checkAnswer( + df, + testData.select('value + 1 as "added").groupBy().sum("added") + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 937a10854353..8f7dc580481b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -162,9 +162,23 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext { } test("efficient limit -> project -> sort") { - val query = testData.sort('key).select('value).limit(2).logicalPlan - val planned = ctx.planner.TakeOrderedAndProject(query) - assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject]) + { + val query = + testData.select('key, 'value).sort('key).limit(2).logicalPlan + val planned = ctx.planner.TakeOrderedAndProject(query) + assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject]) + assert(planned.head.output === testData.select('key, 'value).logicalPlan.output) + } + + { + // We need to make sure TakeOrderedAndProject's is correct when we push a projec + // into it. + val query = + testData.select('key, 'value).sort('key).select('value, 'key).limit(2).logicalPlan + val planned = ctx.planner.TakeOrderedAndProject(query) + assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject]) + assert(planned.head.output === testData.select('value, 'key).logicalPlan.output) + } } test("PartitioningCollection") { From f5da743c60998a2eb790d2fa24f09af08cf4d048 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 14 Aug 2015 12:56:54 -0700 Subject: [PATCH 2/4] Remove unnecessary change. --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 4b36acfd60fc..e9de14f02550 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -228,7 +228,7 @@ private[spark] object SQLConf { doc = "When true, use the optimized Tungsten physical execution backend which explicitly " + "manages memory and dynamically generates bytecode for expression evaluation.") - val CODEGEN_ENABLED = booleanConf("spark..codegen", + val CODEGEN_ENABLED = booleanConf("spark.sql.codegen", defaultValue = Some(true), // use TUNGSTEN_ENABLED as default doc = "When true, code will be dynamically generated at runtime for expression evaluation in" + " a specific query.", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f781819d2896..6c3adea3f509 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1627,7 +1627,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sqlContext.dropTempTable("1one") } - test("SPARK-9949: Make sure TakeOrderedAndProject returns correct result") { + test("SPARK-9949: check TakeOrderedAndProject's results") { // We add a dummy limit (i.e. limit(count)) to trigger TakeOrderedAndProject. val count = testData.count().toInt val df = From ba48d88b75d1d6d4041bc12b7c7af6272986b6f6 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 14 Aug 2015 12:57:39 -0700 Subject: [PATCH 3/4] Remove test. --- .../org/apache/spark/sql/SQLQuerySuite.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6c3adea3f509..8c2c328f8191 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1626,21 +1626,4 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sqlContext.dropTempTable("1one") } - - test("SPARK-9949: check TakeOrderedAndProject's results") { - // We add a dummy limit (i.e. limit(count)) to trigger TakeOrderedAndProject. - val count = testData.count().toInt - val df = - testData - .select('key, 'value) - .sort('key) - .limit(count) - .select('value + 1 as "added") - .groupBy() - .sum("added") - checkAnswer( - df, - testData.select('value + 1 as "added").groupBy().sum("added") - ) - } } From a8a59e2061bf8be4f13a28054259f65a216d4337 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 14 Aug 2015 12:58:42 -0700 Subject: [PATCH 4/4] Comment. --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 8f7dc580481b..fad93b014c23 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -171,7 +171,7 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext { } { - // We need to make sure TakeOrderedAndProject's is correct when we push a projec + // We need to make sure TakeOrderedAndProject's output is correct when we push a project // into it. val query = testData.select('key, 'value).sort('key).select('value, 'key).limit(2).logicalPlan