Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) =>
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
case Limit(IntegerLiteral(limit), child) =>
// With whole stage codegen, Spark releases resources only when all the output data of the
// query plan are consumed. It's possible that `CollectLimitExec` only consumes a little
// data from child plan and finishes the query without releasing resources. Here we wrap
// the child plan with `LocalLimitExec`, to stop the processing of whole stage codegen and
// trigger the resource releasing work, after we consume `limit` rows.
CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
CollectLimitExec(limit, planLater(child)) :: Nil
case other => planLater(other) :: Nil
}
case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2720,11 +2720,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}

test("SPARK-21743: top-most limit should not cause memory leak") {
// In unit test, Spark will fail the query if memory leak detected.
spark.range(100).groupBy("id").count().limit(1).collect()
}

test("SPARK-21652: rule confliction of InferFiltersFromConstraints and ConstantPropagation") {
withTempView("t1", "t2") {
Seq((1, 1)).toDF("col1", "col2").createOrReplaceTempView("t1")
Expand Down