diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index a51b385399d8..e2d7164d93ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1171,7 +1171,7 @@ object DecimalAggregates extends Rule[LogicalPlan] { * Converts local operations (i.e. ones that don't require data exchange) on LocalRelation to * another LocalRelation. * - * This is relatively simple as it currently handles only a single case: Project. + * This is relatively simple as it currently handles only 2 single case: Project and Limit. */ object ConvertToLocalRelation extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { @@ -1180,6 +1180,9 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { val projection = new InterpretedProjection(projectList, output) projection.initialize(0) LocalRelation(projectList.map(_.toAttribute), data.map(projection)) + + case Limit(IntegerLiteral(limit), LocalRelation(output, data)) => + LocalRelation(output, data.take(limit)) } private def hasUnevaluableExpr(expr: Expression): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2e8ce4541865..c115cb6e80e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -63,29 +63,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object SpecialLimits extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case logical.ReturnAnswer(rootPlan) => rootPlan match { - case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => - execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil - case logical.Limit( - IntegerLiteral(limit), - logical.Project(projectList, logical.Sort(order, true, child))) => - execution.TakeOrderedAndProjectExec( - limit, order, projectList, planLater(child)) :: Nil - case logical.Limit(IntegerLiteral(limit), child) => - // Normally wrapping child with `LocalLimitExec` here is a no-op, because - // `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which - // calls `child.executeTake`. If child supports whole stage codegen, adding this - // `LocalLimitExec` can stop the processing of whole stage codegen and trigger the - // resource releasing work, after we consume `limit` rows. - execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil + case ReturnAnswer(rootPlan) => rootPlan match { + case Limit(IntegerLiteral(limit), Sort(order, true, child)) => + TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil + case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) => + TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil + case Limit(IntegerLiteral(limit), child) => + // With whole stage codegen, Spark releases resources only when all the output data of the + // query plan are consumed. It's possible that `CollectLimitExec` only consumes a little + // data from child plan and finishes the query without releasing resources. Here we wrap + // the child plan with `LocalLimitExec`, to stop the processing of whole stage codegen and + // trigger the resource releasing work, after we consume `limit` rows. + CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil case other => planLater(other) :: Nil } - case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => - execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil - case logical.Limit( - IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) => - execution.TakeOrderedAndProjectExec( - limit, order, projectList, planLater(child)) :: Nil + case Limit(IntegerLiteral(limit), Sort(order, true, child)) => + TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil + case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) => + TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 7cef5569717a..73a0f8735ed4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -54,14 +54,6 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { val limit: Int override def output: Seq[Attribute] = child.output - // Do not enable whole stage codegen for a single limit. - override def supportCodegen: Boolean = child match { - case plan: CodegenSupport => plan.supportCodegen - case _ => false - } - - override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit)) - protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) }