File tree Expand file tree Collapse file tree 3 files changed +8
-6
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution Expand file tree Collapse file tree 3 files changed +8
-6
lines changed Original file line number Diff line number Diff line change @@ -75,8 +75,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7575 // Normally wrapping child with `LocalLimitExec` here is a no-op, because
7676 // `CollectLimitExec.executeCollect` will call `LogicalLimitExec.executeTake`, which
7777 // calls `child.executeTake`. If child supports whole stage codegen, adding this
78- // `LocalLimitExec` can break the input consuming loop inside whole stage codegen and
79- // trigger the resource releasing work, after we consume `limit` rows.
78+ // `LocalLimitExec` can stop the processing of whole stage codegen and trigger the
79+ // resource releasing work, after we consume `limit` rows.
8080 execution.CollectLimitExec (limit, LocalLimitExec (limit, planLater(child))) :: Nil
8181 case other => planLater(other) :: Nil
8282 }
Original file line number Diff line number Diff line change @@ -474,10 +474,6 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
474474 }
475475
476476 private def supportCodegen (plan : SparkPlan ): Boolean = plan match {
477- // Do not enable whole stage codegen for a single limit.
478- case limit : BaseLimitExec if ! limit.child.isInstanceOf [CodegenSupport ] ||
479- ! limit.child.asInstanceOf [CodegenSupport ].supportCodegen =>
480- false
481477 case plan : CodegenSupport if plan.supportCodegen =>
482478 val willFallback = plan.expressions.exists(_.find(e => ! supportCodegen(e)).isDefined)
483479 // the generated code will be huge if there are too many columns
Original file line number Diff line number Diff line change @@ -54,6 +54,12 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
5454 val limit : Int
5555 override def output : Seq [Attribute ] = child.output
5656
57+ // Do not enable whole stage codegen for a single limit.
58+ override def supportCodegen : Boolean = child match {
59+ case plan : CodegenSupport => plan.supportCodegen
60+ case _ => false
61+ }
62+
5763 override def executeTake (n : Int ): Array [InternalRow ] = child.executeTake(math.min(n, limit))
5864
5965 override def executeCollect (): Array [InternalRow ] = child.executeTake(limit)
You can’t perform that action at this time.
0 commit comments