Skip to content

Commit 9c072aa

Browse files
committed
CollectLimit doesn't need wholestage codegen.
1 parent fa38e7c commit 9c072aa

File tree

1 file changed

+1
-23
lines changed
  • sql/core/src/main/scala/org/apache/spark/sql/execution

1 file changed

+1
-23
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
3333
* This operator will be used when a logical `Limit` operation is the final operator in an
3434
* logical plan, which happens when the user is collecting results back to the driver.
3535
*/
36-
case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode with CodegenSupport {
36+
case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode {
3737
override def output: Seq[Attribute] = child.output
3838
override def outputPartitioning: Partitioning = SinglePartition
3939
override def executeCollect(): Array[InternalRow] = child.executeTake(limit)
@@ -44,28 +44,6 @@ case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode with Cod
4444
child.execute(), child.output, SinglePartition, serializer))
4545
shuffled.mapPartitionsInternal(_.take(limit))
4646
}
47-
48-
override def upstreams(): Seq[RDD[InternalRow]] = {
49-
child.asInstanceOf[CodegenSupport].upstreams()
50-
}
51-
52-
protected override def doProduce(ctx: CodegenContext): String = {
53-
child.asInstanceOf[CodegenSupport].produce(ctx, this)
54-
}
55-
56-
override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
57-
val countTerm = ctx.freshName("count")
58-
ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
59-
ctx.currentVars = input
60-
s"""
61-
| if ($countTerm < $limit) {
62-
| $countTerm += 1;
63-
| ${consume(ctx, ctx.currentVars)}
64-
| } else {
65-
| setStopEarly(true);
66-
| }
67-
""".stripMargin
68-
}
6947
}
7048

7149
/**

0 commit comments

Comments
 (0)