Skip to content

Commit 52a8011

Browse files
cloud-fanyhuai
authored andcommitted
[SPARK-14554][SQL] disable whole stage codegen if there are too many input columns
## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/12047/files#diff-94a1f59bcc9b6758c4ca874652437634R529, we may split field expressions codes in `CreateExternalRow` to support wide table. However, the whole stage codegen framework doesn't support it, because the input for expressions is not always the input row, but can be `CodeGenContext.currentVars`, which doesn't work well with `CodeGenContext.splitExpressions`. Actually we do have a check to guard against this cases, but it's incomplete, it only checks output fields. This PR improves the whole stage codegen support check, to disable it if there are too many input fields, so that we can avoid splitting field expressions codes in `CreateExternalRow` for whole stage codegen. TODO: Is it a better solution if we can make `CodeGenContext.currentVars` work well with `CodeGenContext.splitExpressions`? ## How was this patch tested? new test in DatasetSuite. Author: Wenchen Fan <[email protected]> Closes #12322 from cloud-fan/codegen.
1 parent 2d81ba5 commit 52a8011

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -446,8 +446,11 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
446446
case plan: CodegenSupport if plan.supportCodegen =>
447447
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
448448
// the generated code will be huge if there are too many columns
449-
val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
450-
!willFallback && !haveTooManyFields
449+
val hasTooManyOutputFields =
450+
numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
451+
val hasTooManyInputFields =
452+
plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields)
453+
!willFallback && !hasTooManyOutputFields && !hasTooManyInputFields
451454
case _ => false
452455
}
453456

sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
620620
val df = streaming.join(static, Seq("b"))
621621
assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
622622
}
623+
624+
test("SPARK-14554: Dataset.map may generate wrong java code for wide table") {
625+
val wideDF = sqlContext.range(10).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*)
626+
// Make sure the generated code for this plan can compile and execute.
627+
wideDF.map(_.getLong(0)).collect()
628+
}
623629
}
624630

625631
case class OtherTuple(_1: String, _2: Int)

0 commit comments

Comments
 (0)