Skip to content

Commit d3255a5

Browse files
cloud-fanhvanhovell
authored andcommitted
revert [SPARK-21743][SQL] top-most limit should not cause memory leak
## What changes were proposed in this pull request? There is a performance regression in Spark 2.3. When we read a big compressed text file which is un-splittable(e.g. gz), and then take the first record, Spark will scan all the data in the text file which is very slow. For example, `spark.read.text("/tmp/test.csv.gz").head(1)`, we can check out the SQL UI and see that the file is fully scanned. ![image](https://user-images.githubusercontent.com/3182036/41445252-264b1e5a-6ffd-11e8-9a67-4c31d129a314.png) This is introduced by #18955 , which adds a LocalLimit to the query when executing `Dataset.head`. The foundamental problem is, `Limit` is not well whole-stage-codegened. It keeps consuming the input even if we have already hit the limitation. However, if we just fix LIMIT whole-stage-codegen, the memory leak test will fail, as we don't fully consume the inputs to trigger the resource cleanup. To fix it completely, we should do the following 1. fix LIMIT whole-stage-codegen, stop consuming inputs after hitting the limitation. 2. in whole-stage-codegen, provide a way to release resource of the parant operator, and apply it in LIMIT 3. automatically release resource when task ends. Howere this is a non-trivial change, and is risky to backport to Spark 2.3. This PR proposes to revert #18955 in Spark 2.3. The memory leak is not a big issue. When task ends, Spark will release all the pages allocated by this task, which is kind of releasing most of the resources. I'll submit a exhaustive fix to master later. ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #21573 from cloud-fan/limit.
1 parent 7f1708a commit d3255a5

File tree

2 files changed

+1
-11
lines changed

2 files changed

+1
-11
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7070
case Limit(IntegerLiteral(limit), Project(projectList, Sort(order, true, child))) =>
7171
TakeOrderedAndProjectExec(limit, order, projectList, planLater(child)) :: Nil
7272
case Limit(IntegerLiteral(limit), child) =>
73-
// With whole stage codegen, Spark releases resources only when all the output data of the
74-
// query plan are consumed. It's possible that `CollectLimitExec` only consumes a little
75-
// data from child plan and finishes the query without releasing resources. Here we wrap
76-
// the child plan with `LocalLimitExec`, to stop the processing of whole stage codegen and
77-
// trigger the resource releasing work, after we consume `limit` rows.
78-
CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
73+
CollectLimitExec(limit, planLater(child)) :: Nil
7974
case other => planLater(other) :: Nil
8075
}
8176
case Limit(IntegerLiteral(limit), Sort(order, true, child)) =>

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2720,11 +2720,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
27202720
}
27212721
}
27222722

2723-
test("SPARK-21743: top-most limit should not cause memory leak") {
2724-
// In unit test, Spark will fail the query if memory leak detected.
2725-
spark.range(100).groupBy("id").count().limit(1).collect()
2726-
}
2727-
27282723
test("SPARK-21652: rule confliction of InferFiltersFromConstraints and ConstantPropagation") {
27292724
withTempView("t1", "t2") {
27302725
Seq((1, 1)).toDF("col1", "col2").createOrReplaceTempView("t1")

0 commit comments

Comments
 (0)