Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.TakeOrderedAndProjectExec(
limit, order, projectList, planLater(child)) :: Nil
case logical.Limit(IntegerLiteral(limit), child) =>
execution.CollectLimitExec(limit, planLater(child)) :: Nil
// Normally wrapping child with `LocalLimitExec` here is a no-op, because
// `CollectLimitExec.executeCollect` will call `LocalLimitExec.executeTake`, which
// calls `child.executeTake`. If child supports whole stage codegen, adding this
// `LocalLimitExec` can stop the processing of whole stage codegen and trigger the
// resource releasing work, after we consume `limit` rows.
execution.CollectLimitExec(limit, LocalLimitExec(limit, planLater(child))) :: Nil
case other => planLater(other) :: Nil
}
case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
override def output: Seq[Attribute] = child.output

// Do not enable whole stage codegen for a single limit.
override def supportCodegen: Boolean = child match {
case plan: CodegenSupport => plan.supportCodegen
case _ => false
}

override def executeTake(n: Int): Array[InternalRow] = child.executeTake(math.min(n, limit))

protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.take(limit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2658,4 +2658,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
checkAnswer(sql("SELECT __auto_generated_subquery_name.i from (SELECT i FROM v)"), Row(1))
}
}

test("SPARK-21743: top-most limit should not cause memory leak") {
// In unit test, Spark will fail the query if memory leak detected.
Copy link
Member

@gatorsmile gatorsmile Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the fix, the test did not fail, but I saw the warning message:

22:05:07.455 WARN org.apache.spark.executor.Executor: Managed memory leak detected; size = 33554432 bytes, TID = 2

With the fix, the warning message is gone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you try this test in spark shell? We only throw exception for memory leak if spark.unsafe.exceptionOnMemoryLeak is true. But this config is false by default, and is true in unit test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is also executed on Intellij, this test does not fail. How about this?

class SQLQuerySparkContextSuite extends QueryTest with LocalSparkContext {
  val spark = SparkSession
    .builder()
    .config("spark.unsafe.exceptionOnMemoryLeak", "true")
    .master("local[1]")
    .getOrCreate()
  test("SPARK-21743: top-most limit should not cause memory leak") {
    spark.range(100).groupBy("id").count().limit(1).collect()
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be fine, as long as our test framework can capture it. : )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is fixed in #18967

I think we need to move this test case to DataFrameSuite

Copy link
Member

@sameeragarwal sameeragarwal Aug 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just adding it in TestSparkSession instead? Ah, seems like Xiao already did that.

spark.range(100).groupBy("id").count().limit(1).collect()
}
}