From b31115d63b9a1e354cab730ba4e897fa87b4ebb0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 22 Mar 2016 23:19:45 -0700 Subject: [PATCH 1/2] move shouldStop() to end of while loop --- .../org/apache/spark/sql/execution/ExistingRDD.scala | 7 ++++--- .../apache/spark/sql/execution/WholeStageCodegen.scala | 8 +++++--- .../org/apache/spark/sql/execution/basicOperators.scala | 3 ++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b4348d39c2b4..2e5001e0d535 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -255,11 +255,11 @@ private[sql] case class DataSourceScan( | $numOutputRows.add(numRows); | } | - | while (!shouldStop() && $idx < numRows) { + | while ($idx < numRows) { | int $rowidx = $idx++; | ${consume(ctx, columns1).trim} + | if (shouldStop()) return; | } - | if (shouldStop()) return; | | if (!$input.hasNext()) { | $batch = null; @@ -280,7 +280,7 @@ private[sql] case class DataSourceScan( s""" | private void $scanRows(InternalRow $row) throws java.io.IOException { | boolean firstRow = true; - | while (!shouldStop() && (firstRow || $input.hasNext())) { + | while (firstRow || $input.hasNext()) { | if (firstRow) { | firstRow = false; | } else { @@ -288,6 +288,7 @@ private[sql] case class DataSourceScan( | } | $numOutputRows.add(1); | ${consume(ctx, columns2, inputRow).trim} + | if (shouldStop()) return; | } | }""".stripMargin) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index e3c7d7209af1..2f1daed8bf22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -104,11 +104,12 @@ trait CodegenSupport extends SparkPlan { * # call child.produce() * initialized = true; * } - * while (!shouldStop() && hashmap.hasNext()) { + * while (hashmap.hasNext()) { * row = hashmap.next(); * # build the aggregation results * # create variables for results * # call consume(), which will call parent.doConsume() + * if (shouldStop()) return; * } */ protected def doProduce(ctx: CodegenContext): String @@ -252,9 +253,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport ctx.currentVars = null val columns = exprs.map(_.gen(ctx)) s""" - | while (!shouldStop() && $input.hasNext()) { + | while ($input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); | ${consume(ctx, columns, row).trim} + | if (shouldStop()) return; | } """.stripMargin } @@ -321,7 +323,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup /** Codegened pipeline for: * ${toCommentSafeString(child.treeString.trim)} */ - class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { + final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator { private Object[] references; ${ctx.declareMutableStates()} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 6e2a5aa4f97c..ee3f1d70e130 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -282,13 +282,14 @@ case class Range( | } | } | - | while (!$overflow && $checkEnd && !shouldStop()) { + | while (!$overflow && $checkEnd) { | long $value = $number; | $number += ${step}L; | if ($number < $value ^ ${step}L < 0) { | $overflow = true; | } | ${consume(ctx, Seq(ev))} + | if (shouldStop()) return; | } """.stripMargin } From 53ee657d7a9da952afcc1db6ce82ac0ad2054155 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 23 Mar 2016 11:58:03 -0700 Subject: [PATCH 2/2] add comment --- .../main/scala/org/apache/spark/sql/execution/ExistingRDD.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 2e5001e0d535..3e2c7997626f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -255,6 +255,7 @@ private[sql] case class DataSourceScan( | $numOutputRows.add(numRows); | } | + | // this loop is very perf sensitive and changes to it should be measured carefully | while ($idx < numRows) { | int $rowidx = $idx++; | ${consume(ctx, columns1).trim}