From 0ad119490ac5fba06d6cdeda1d4a9e9e0f8c0c8e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 12 Apr 2016 11:43:08 +0800 Subject: [PATCH 1/2] Dataset.map may generate wrong java code for wide table --- .../spark/sql/catalyst/expressions/objects.scala | 12 +++++++++++- .../scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index 28b6b2adf80a..b3494c03e4bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -536,7 +536,17 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType) } """ } - val childrenCode = ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes) + + val childrenCode = if (ctx.currentVars == null) { + ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes) + } else { + // If ctx.currentVars != null, which means we are in whole stage codegen context and the input + // to children expressions maybe not the input row but some local variables, so we should not + // split children expressions codes here. + // TODO: figure out a way to integrate ctx.currentVars and ctx.splitExpressions. + childrenCodes.mkString("\n") + } + val schemaField = ctx.addReferenceObj("schema", schema) s""" boolean ${ev.isNull} = false; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index e8e801084ffa..47251681e307 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -620,6 +620,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val df = streaming.join(static, Seq("b")) assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.") } + + test("SPARK-14554: Dataset.map may generate wrong java code for wide table") { + val wideDF = sqlContext.range(10).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*) + // Make sure the generated code for this plan can compile and execute. + wideDF.map(_.getLong(0)).collect() + } } case class OtherTuple(_1: String, _2: Int) From 4d63e15188b2477a21ea5fd8ba9d5a285dad46a1 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 12 Apr 2016 12:19:56 +0800 Subject: [PATCH 2/2] better fix --- .../spark/sql/catalyst/expressions/objects.scala | 12 +----------- .../spark/sql/execution/WholeStageCodegen.scala | 7 +++++-- 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index b3494c03e4bd..28b6b2adf80a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -536,17 +536,7 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType) } """ } - - val childrenCode = if (ctx.currentVars == null) { - ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes) - } else { - // If ctx.currentVars != null, which means we are in whole stage codegen context and the input - // to children expressions maybe not the input row but some local variables, so we should not - // split children expressions codes here. - // TODO: figure out a way to integrate ctx.currentVars and ctx.splitExpressions. - childrenCodes.mkString("\n") - } - + val childrenCode = ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes) val schemaField = ctx.addReferenceObj("schema", schema) s""" boolean ${ev.isNull} = false; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index c4594f0480e7..447dbe701815 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -446,8 +446,11 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { case plan: CodegenSupport if plan.supportCodegen => val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns - val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields - !willFallback && !haveTooManyFields + val hasTooManyOutputFields = + numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields + val hasTooManyInputFields = + plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields) + !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields case _ => false }