From a213ed141fe313ef43de0722d52966b5eda1a8ce Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 26 Feb 2016 10:27:01 +0000 Subject: [PATCH 1/8] Add wholestage codegen for limit. --- .../apache/spark/sql/execution/limit.scala | 49 +++++++++++++++++-- .../BenchmarkWholeStageCodegen.scala | 14 ++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index cd543d419528..0aaac34bb95c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -21,9 +21,10 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -32,7 +33,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange * This operator will be used when a logical `Limit` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) @@ -43,12 +44,33 @@ case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { child.execute(), child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } + + override def upstreams(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].upstreams() + } + + protected override def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + val countTerm = ctx.freshName("count") + ctx.addMutableState("int", countTerm, s"$countTerm = 0;") + ctx.currentVars = input + s""" + | while ($countTerm < $limit) { + | $countTerm += 1; + | ${consume(ctx, ctx.currentVars)} + | } + | if (true) return; + """.stripMargin + } } /** * Helper trait which defines methods that are shared by both [[LocalLimit]] and [[GlobalLimit]]. */ -trait BaseLimit extends UnaryNode { +trait BaseLimit extends UnaryNode with CodegenSupport { val limit: Int override def output: Seq[Attribute] = child.output override def outputOrdering: Seq[SortOrder] = child.outputOrdering @@ -56,6 +78,27 @@ trait BaseLimit extends UnaryNode { protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) } + + override def upstreams(): Seq[RDD[InternalRow]] = { + child.asInstanceOf[CodegenSupport].upstreams() + } + + protected override def doProduce(ctx: CodegenContext): String = { + child.asInstanceOf[CodegenSupport].produce(ctx, this) + } + + override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + val countTerm = ctx.freshName("count") + ctx.addMutableState("int", countTerm, s"$countTerm = 0;") + ctx.currentVars = input + s""" + | while ($countTerm < $limit) { + | $countTerm += 1; + | ${consume(ctx, ctx.currentVars)} + | } + | if (true) return; + """.stripMargin + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 6d6cc0186a96..bff88e6b2cdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -70,6 +70,20 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { */ } + ignore("range/limit/sum") { + val N = 500 << 20 + runBenchmark("range/limit/sum", N) { + sqlContext.range(N).limit(100000).groupBy().sum().collect() + } + /* + Westmere E56xx/L56xx/X56xx (Nehalem-C) + range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + ------------------------------------------------------------------------------------------- + range/limit/sum codegen=false 191 / 248 2748.0 0.4 1.0X + range/limit/sum codegen=true 140 / 153 3747.5 0.3 1.4X + */ + } + ignore("stat functions") { val N = 100 << 20 From fa38e7c5138ad3424fd5b30e3ee6f7ef4d2088e4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 26 Feb 2016 14:46:55 +0000 Subject: [PATCH 2/8] Fix implementation. --- .../spark/sql/execution/BufferedRowIterator.java | 13 ++++++++++++- .../org/apache/spark/sql/execution/limit.scala | 10 ++++++---- .../sql/execution/BenchmarkWholeStageCodegen.scala | 6 +++--- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java index 1d1d7edb240d..51a2f71d1151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -35,6 +35,8 @@ public abstract class BufferedRowIterator { // used when there is no column in output protected UnsafeRow unsafeRow = new UnsafeRow(0); + private boolean stopEarly = false; + public boolean hasNext() throws IOException { if (currentRows.isEmpty()) { processNext(); @@ -64,7 +66,16 @@ protected void append(InternalRow row) { * If it returns true, the caller should exit the loop (return from processNext()). */ protected boolean shouldStop() { - return !currentRows.isEmpty(); + return !currentRows.isEmpty() || stopEarly; + } + + /** + * Set the value of stopEarly which indicates whether `processNext()` should stop processing + * next row from `input` or not. + * + */ + protected void setStopEarly(boolean value) { + stopEarly = value; } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 0aaac34bb95c..33c6176ae2ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -58,11 +58,12 @@ case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode with Cod ctx.addMutableState("int", countTerm, s"$countTerm = 0;") ctx.currentVars = input s""" - | while ($countTerm < $limit) { + | if ($countTerm < $limit) { | $countTerm += 1; | ${consume(ctx, ctx.currentVars)} + | } else { + | setStopEarly(true); | } - | if (true) return; """.stripMargin } } @@ -92,11 +93,12 @@ trait BaseLimit extends UnaryNode with CodegenSupport { ctx.addMutableState("int", countTerm, s"$countTerm = 0;") ctx.currentVars = input s""" - | while ($countTerm < $limit) { + | if ($countTerm < $limit) { | $countTerm += 1; | ${consume(ctx, ctx.currentVars)} + | } else { + | setStopEarly(true); | } - | if (true) return; """.stripMargin } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index bff88e6b2cdb..022402b6c7fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -73,14 +73,14 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { ignore("range/limit/sum") { val N = 500 << 20 runBenchmark("range/limit/sum", N) { - sqlContext.range(N).limit(100000).groupBy().sum().collect() + sqlContext.range(N).limit(100).groupBy().sum().collect() } /* Westmere E56xx/L56xx/X56xx (Nehalem-C) range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - range/limit/sum codegen=false 191 / 248 2748.0 0.4 1.0X - range/limit/sum codegen=true 140 / 153 3747.5 0.3 1.4X + range/limit/sum codegen=false 154 / 201 3402.0 0.3 1.0X + range/limit/sum codegen=true 110 / 129 4760.9 0.2 1.4X */ } From 9c072aa3d02460ffb55691a6b0aa82fff8dc0e4c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 27 Feb 2016 07:32:58 +0000 Subject: [PATCH 3/8] CollectLimit doesn't need wholestage codegen. --- .../apache/spark/sql/execution/limit.scala | 24 +------------------ 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 33c6176ae2ad..d2795d4d9cd3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics * This operator will be used when a logical `Limit` operation is the final operator in an * logical plan, which happens when the user is collecting results back to the driver. */ -case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode with CodegenSupport { +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) @@ -44,28 +44,6 @@ case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode with Cod child.execute(), child.output, SinglePartition, serializer)) shuffled.mapPartitionsInternal(_.take(limit)) } - - override def upstreams(): Seq[RDD[InternalRow]] = { - child.asInstanceOf[CodegenSupport].upstreams() - } - - protected override def doProduce(ctx: CodegenContext): String = { - child.asInstanceOf[CodegenSupport].produce(ctx, this) - } - - override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val countTerm = ctx.freshName("count") - ctx.addMutableState("int", countTerm, s"$countTerm = 0;") - ctx.currentVars = input - s""" - | if ($countTerm < $limit) { - | $countTerm += 1; - | ${consume(ctx, ctx.currentVars)} - | } else { - | setStopEarly(true); - | } - """.stripMargin - } } /** From 8e69d8d3d8e29cddb56b05efab6c4b90f8c8ffd6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 27 Feb 2016 10:52:11 +0000 Subject: [PATCH 4/8] Fix test. --- .../parquet/UnsafeRowParquetRecordReader.java | 3 ++ .../parquet/VectorizedRleValuesReader.java | 34 ++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index 9d50cfab3bd3..e7f0ec2e7789 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -765,6 +765,9 @@ private void readIntBatch(int rowId, int num, ColumnVector column) throws IOExce } else if (DecimalType.is64BitDecimalType(column.dataType())) { defColumn.readIntsAsLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); + } else if (column.dataType() == DataTypes.ShortType) { + defColumn.readShorts( + num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); } else { throw new NotImplementedException("Unimplemented type: " + column.dataType()); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java index b2048c0e397e..8613fcae0b80 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java @@ -301,6 +301,38 @@ public void readBytes(int total, ColumnVector c, } } + public void readShorts(int total, ColumnVector c, + int rowId, int level, VectorizedValuesReader data) { + int left = total; + while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + switch (mode) { + case RLE: + if (currentValue == level) { + for (int i = 0; i < n; i++) { + c.putShort(rowId + i, (short)data.readInteger()); + } + } else { + c.putNulls(rowId, n); + } + break; + case PACKED: + for (int i = 0; i < n; ++i) { + if (currentBuffer[currentBufferIdx++] == level) { + c.putShort(rowId + i, (short)data.readInteger()); + } else { + c.putNull(rowId + i); + } + } + break; + } + rowId += n; + left -= n; + currentCount -= n; + } + } + public void readLongs(int total, ColumnVector c, int rowId, int level, VectorizedValuesReader data) { int left = total; @@ -611,4 +643,4 @@ private void readNextGroup() { throw new ParquetDecodingException("not a valid mode " + this.mode); } } -} \ No newline at end of file +} From c887cf47a36da8d34e33afeca273f415df629fbb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 1 Mar 2016 03:37:57 +0000 Subject: [PATCH 5/8] Address reviewer comments. --- .../apache/spark/sql/execution/BufferedRowIterator.java | 4 ++-- .../scala/org/apache/spark/sql/execution/limit.scala | 9 +++++++-- .../spark/sql/execution/BenchmarkWholeStageCodegen.scala | 6 +++--- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java index 51a2f71d1151..df27e0a43c35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -35,7 +35,7 @@ public abstract class BufferedRowIterator { // used when there is no column in output protected UnsafeRow unsafeRow = new UnsafeRow(0); - private boolean stopEarly = false; + protected boolean stopEarly = false; public boolean hasNext() throws IOException { if (currentRows.isEmpty()) { @@ -66,7 +66,7 @@ protected void append(InternalRow row) { * If it returns true, the caller should exit the loop (return from processNext()). */ protected boolean shouldStop() { - return !currentRows.isEmpty() || stopEarly; + return !currentRows.isEmpty(); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index d2795d4d9cd3..e01c9c50590f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -67,13 +67,18 @@ trait BaseLimit extends UnaryNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { + ctx.addNewFunction("shouldStop", """ + @Override + protected boolean shouldStop() { + return !currentRows.isEmpty() || stopEarly; + } + """) val countTerm = ctx.freshName("count") ctx.addMutableState("int", countTerm, s"$countTerm = 0;") - ctx.currentVars = input s""" | if ($countTerm < $limit) { | $countTerm += 1; - | ${consume(ctx, ctx.currentVars)} + | ${consume(ctx, input)} | } else { | setStopEarly(true); | } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 022402b6c7fd..2d3e34d0e129 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -73,14 +73,14 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { ignore("range/limit/sum") { val N = 500 << 20 runBenchmark("range/limit/sum", N) { - sqlContext.range(N).limit(100).groupBy().sum().collect() + sqlContext.range(N).limit(1000000).groupBy().sum().collect() } /* Westmere E56xx/L56xx/X56xx (Nehalem-C) range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- - range/limit/sum codegen=false 154 / 201 3402.0 0.3 1.0X - range/limit/sum codegen=true 110 / 129 4760.9 0.2 1.4X + range/limit/sum codegen=false 609 / 672 861.6 1.2 1.0X + range/limit/sum codegen=true 561 / 621 935.3 1.1 1.1X */ } From 8d254d206686dd9c6edd053d4abcd184799fcc2a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 1 Mar 2016 06:17:57 +0000 Subject: [PATCH 6/8] Remove stopEarly from BufferedRowIterator. --- .../spark/sql/execution/BufferedRowIterator.java | 11 ----------- .../scala/org/apache/spark/sql/execution/limit.scala | 9 ++++++--- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java index df27e0a43c35..1d1d7edb240d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java @@ -35,8 +35,6 @@ public abstract class BufferedRowIterator { // used when there is no column in output protected UnsafeRow unsafeRow = new UnsafeRow(0); - protected boolean stopEarly = false; - public boolean hasNext() throws IOException { if (currentRows.isEmpty()) { processNext(); @@ -69,15 +67,6 @@ protected boolean shouldStop() { return !currentRows.isEmpty(); } - /** - * Set the value of stopEarly which indicates whether `processNext()` should stop processing - * next row from `input` or not. - * - */ - protected void setStopEarly(boolean value) { - stopEarly = value; - } - /** * Increase the peak execution memory for current task. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index e01c9c50590f..45175d36d5c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -67,10 +67,13 @@ trait BaseLimit extends UnaryNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { - ctx.addNewFunction("shouldStop", """ + val stopEarly = ctx.freshName("stopEarly") + ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;") + + ctx.addNewFunction("shouldStop", s""" @Override protected boolean shouldStop() { - return !currentRows.isEmpty() || stopEarly; + return !currentRows.isEmpty() || $stopEarly; } """) val countTerm = ctx.freshName("count") @@ -80,7 +83,7 @@ trait BaseLimit extends UnaryNode with CodegenSupport { | $countTerm += 1; | ${consume(ctx, input)} | } else { - | setStopEarly(true); + | $stopEarly = true; | } """.stripMargin } From b64e52d189e5041cc1af2ebb0d656c5f5c12c82d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 1 Mar 2016 06:37:19 +0000 Subject: [PATCH 7/8] Add unit test. --- .../sql/execution/WholeStageCodegenSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index de371d85d9fd..fc1a534bc695 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -78,4 +78,21 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[Sort]).isDefined) assert(df.collect() === Array(Row(1), Row(2), Row(3))) } + + test("Limit should be included in WholeStageCodegen") { + val df = sqlContext.range(10000).limit(100).sort(col("id")) + val plan = df.queryExecution.executedPlan + + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegen] && + p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[Sort] && + p.asInstanceOf[WholeStageCodegen].plan.asInstanceOf[Sort] + .child.isInstanceOf[GlobalLimit]).isDefined) + + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegen] && + p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[LocalLimit]).isDefined) + + assert(df.collect().size === 100) + } } From 3d1e3972d65ef17e02c21affab97346ebafc156b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 1 Mar 2016 11:32:20 +0000 Subject: [PATCH 8/8] Remove unnecessary test. --- .../sql/execution/WholeStageCodegenSuite.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index fc1a534bc695..de371d85d9fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -78,21 +78,4 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[Sort]).isDefined) assert(df.collect() === Array(Row(1), Row(2), Row(3))) } - - test("Limit should be included in WholeStageCodegen") { - val df = sqlContext.range(10000).limit(100).sort(col("id")) - val plan = df.queryExecution.executedPlan - - assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[Sort] && - p.asInstanceOf[WholeStageCodegen].plan.asInstanceOf[Sort] - .child.isInstanceOf[GlobalLimit]).isDefined) - - assert(plan.find(p => - p.isInstanceOf[WholeStageCodegen] && - p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[LocalLimit]).isDefined) - - assert(df.collect().size === 100) - } }