Skip to content

Commit c887cf4

Browse files
committed
Address reviewer comments.
1 parent 87ceb4b commit c887cf4

File tree

3 files changed

+12
-7
lines changed

3 files changed

+12
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/BufferedRowIterator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public abstract class BufferedRowIterator {
3535
// used when there is no column in output
3636
protected UnsafeRow unsafeRow = new UnsafeRow(0);
3737

38-
private boolean stopEarly = false;
38+
protected boolean stopEarly = false;
3939

4040
public boolean hasNext() throws IOException {
4141
if (currentRows.isEmpty()) {
@@ -66,7 +66,7 @@ protected void append(InternalRow row) {
6666
* If it returns true, the caller should exit the loop (return from processNext()).
6767
*/
6868
protected boolean shouldStop() {
69-
return !currentRows.isEmpty() || stopEarly;
69+
return !currentRows.isEmpty();
7070
}
7171

7272
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,18 @@ trait BaseLimit extends UnaryNode with CodegenSupport {
6767
}
6868

6969
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
70+
ctx.addNewFunction("shouldStop", """
71+
@Override
72+
protected boolean shouldStop() {
73+
return !currentRows.isEmpty() || stopEarly;
74+
}
75+
""")
7076
val countTerm = ctx.freshName("count")
7177
ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
72-
ctx.currentVars = input
7378
s"""
7479
| if ($countTerm < $limit) {
7580
| $countTerm += 1;
76-
| ${consume(ctx, ctx.currentVars)}
81+
| ${consume(ctx, input)}
7782
| } else {
7883
| setStopEarly(true);
7984
| }

sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,14 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
7373
ignore("range/limit/sum") {
7474
val N = 500 << 20
7575
runBenchmark("range/limit/sum", N) {
76-
sqlContext.range(N).limit(100).groupBy().sum().collect()
76+
sqlContext.range(N).limit(1000000).groupBy().sum().collect()
7777
}
7878
/*
7979
Westmere E56xx/L56xx/X56xx (Nehalem-C)
8080
range/limit/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
8181
-------------------------------------------------------------------------------------------
82-
range/limit/sum codegen=false 154 / 201 3402.0 0.3 1.0X
83-
range/limit/sum codegen=true 110 / 129 4760.9 0.2 1.4X
82+
range/limit/sum codegen=false 609 / 672 861.6 1.2 1.0X
83+
range/limit/sum codegen=true 561 / 621 935.3 1.1 1.1X
8484
*/
8585
}
8686

0 commit comments

Comments
 (0)