Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,7 @@ object DecimalAggregates extends Rule[LogicalPlan] {
* Converts local operations (i.e. ones that don't require data exchange) on LocalRelation to
* another LocalRelation.
*
* This is relatively simple as it currently handles only a single case: Project.
* This is relatively simple as it currently handles only 2 single case: Project and Limit.
*/
object ConvertToLocalRelation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Expand All @@ -1180,6 +1180,9 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
val projection = new InterpretedProjection(projectList, output)
projection.initialize(0)
LocalRelation(projectList.map(_.toAttribute), data.map(projection))

case Limit(IntegerLiteral(limit), LocalRelation(output, data)) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to fix SQLQuerySuite.SPARK-19650: An action on a Command should not trigger a Spark job, limit over local relation should not trigger a spark job.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kinda violates the idea that we shouldn't rely on optimization for correctness, but I suppose this is ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically this is not about correctness, An action on a Command should not trigger a Spark job is also kind of optimization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right about that.

LocalRelation(output, data.take(limit))
}

private def hasUnevaluableExpr(expr: Expression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,29 +63,24 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
*/
object SpecialLimits extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ReturnAnswer(rootPlan) => rootPlan match {
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kinda unrelated, remove these logical and execution prefix to shorten the code.

case logical.Limit(
IntegerLiteral(limit),
logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
// Normally wrapping child with `LocalLimitExec` here is a no-op, because
// `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which
// calls `child.executeTake`. If child supports whole stage codegen, adding this
// `LocalLimitExec` can stop the processing of whole stage codegen and trigger the
// resource releasing work, after we consume `limit` rows.
execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
case ReturnAnswer(rootPlan) => rootPlan match {
case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), child) =>
// With whole stage codegen, Spark releases resources only when all the output data of the
// query plan are consumed. It's possible that `CollectLimitExec` only consumes a little
// data from child plan and finishes the query without releasing resources. Here we wrap
// the child plan with `LocalLimitExec`, to stop the processing of whole stage codegen and
// trigger the resource releasing work, after we consume `limit` rows.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments updated.

CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
case logical.Limit(
IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
execution.TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
case _ => Nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,6 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
override def output: Seq[Attribute] = child.output

// Do not enable whole stage codegen for a single limit.
override def supportCodegen: Boolean = child match {
case plan: CodegenSupport => plan.supportCodegen
case _ => false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong, we may have more operators above Limit, so it's not a single Limit.

}

override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))

protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.take(limit)
}
Expand Down