Skip to content

Commit b31115d

Browse files
author
Davies Liu
committed
move shouldStop() to end of while loop
1 parent 75dc296 commit b31115d

File tree

3 files changed

+11
-7
lines changed

3 files changed

+11
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,11 @@ private[sql] case class DataSourceScan(
255255
| $numOutputRows.add(numRows);
256256
| }
257257
|
258-
| while (!shouldStop() && $idx < numRows) {
258+
| while ($idx < numRows) {
259259
| int $rowidx = $idx++;
260260
| ${consume(ctx, columns1).trim}
261+
| if (shouldStop()) return;
261262
| }
262-
| if (shouldStop()) return;
263263
|
264264
| if (!$input.hasNext()) {
265265
| $batch = null;
@@ -280,14 +280,15 @@ private[sql] case class DataSourceScan(
280280
s"""
281281
| private void $scanRows(InternalRow $row) throws java.io.IOException {
282282
| boolean firstRow = true;
283-
| while (!shouldStop() && (firstRow || $input.hasNext())) {
283+
| while (firstRow || $input.hasNext()) {
284284
| if (firstRow) {
285285
| firstRow = false;
286286
| } else {
287287
| $row = (InternalRow) $input.next();
288288
| }
289289
| $numOutputRows.add(1);
290290
| ${consume(ctx, columns2, inputRow).trim}
291+
| if (shouldStop()) return;
291292
| }
292293
| }""".stripMargin)
293294

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,12 @@ trait CodegenSupport extends SparkPlan {
104104
* # call child.produce()
105105
* initialized = true;
106106
* }
107-
* while (!shouldStop() && hashmap.hasNext()) {
107+
* while (hashmap.hasNext()) {
108108
* row = hashmap.next();
109109
* # build the aggregation results
110110
* # create variables for results
111111
* # call consume(), which will call parent.doConsume()
112+
* if (shouldStop()) return;
112113
* }
113114
*/
114115
protected def doProduce(ctx: CodegenContext): String
@@ -252,9 +253,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
252253
ctx.currentVars = null
253254
val columns = exprs.map(_.gen(ctx))
254255
s"""
255-
| while (!shouldStop() && $input.hasNext()) {
256+
| while ($input.hasNext()) {
256257
| InternalRow $row = (InternalRow) $input.next();
257258
| ${consume(ctx, columns, row).trim}
259+
| if (shouldStop()) return;
258260
| }
259261
""".stripMargin
260262
}
@@ -321,7 +323,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
321323
/** Codegened pipeline for:
322324
* ${toCommentSafeString(child.treeString.trim)}
323325
*/
324-
class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
326+
final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
325327

326328
private Object[] references;
327329
${ctx.declareMutableStates()}

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,13 +282,14 @@ case class Range(
282282
| }
283283
| }
284284
|
285-
| while (!$overflow && $checkEnd && !shouldStop()) {
285+
| while (!$overflow && $checkEnd) {
286286
| long $value = $number;
287287
| $number += ${step}L;
288288
| if ($number < $value ^ ${step}L < 0) {
289289
| $overflow = true;
290290
| }
291291
| ${consume(ctx, Seq(ev))}
292+
| if (shouldStop()) return;
292293
| }
293294
""".stripMargin
294295
}

0 commit comments

Comments
 (0)