From 86723436ba2b711d0eb6f2de92f3651006e3bff4 Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Mon, 5 Dec 2016 19:04:40 +0530 Subject: [PATCH 1/4] Fixed If expression code generation to never generate code large enough to exceed JVM method code size limit --- .../expressions/conditionalExpressions.scala | 78 +++++++++++++++---- .../expressions/CodeGenerationSuite.scala | 20 +++++ 2 files changed, 84 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index c4e5b844299a6..190c5bc416940 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -57,20 +57,70 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi val trueEval = trueValue.gen(ctx) val falseEval = falseValue.gen(ctx) - s""" - ${condEval.code} - boolean ${ev.isNull} = false; - ${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; - if (!${condEval.isNull} && ${condEval.value}) { - ${trueEval.code} - ${ev.isNull} = ${trueEval.isNull}; - ${ev.value} = ${trueEval.value}; - } else { - ${falseEval.code} - ${ev.isNull} = ${falseEval.isNull}; - ${ev.value} = ${falseEval.value}; - } - """ + // place generated code of condition, true value and false value in separate methods if + // their code combined is large + val combinedLength = condEval.code.length + trueEval.code.length + falseEval.code.length + if (combinedLength > 1024) { + val (condFuncName, condGlobalIsNull, condGlobalValue) = + createAndAddFunction(ctx, condEval, predicate.dataType, "evalIfCondExpr") + val (trueFuncName, trueGlobalIsNull, trueGlobalValue) = + createAndAddFunction(ctx, trueEval, trueValue.dataType, "evalIfTrueExpr") + val (falseFuncName, falseGlobalIsNull, falseGlobalValue) = + createAndAddFunction(ctx, falseEval, falseValue.dataType, "evalIfFalseExpr") + s""" + $condFuncName(${ctx.INPUT_ROW}); + boolean ${ev.isNull} = false; + ${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; + if (!$condGlobalIsNull && $condGlobalValue) { + $trueFuncName(${ctx.INPUT_ROW}); + ${ev.isNull} = $trueGlobalIsNull; + ${ev.value} = $trueGlobalValue; + } else { + $falseFuncName(${ctx.INPUT_ROW}); + ${ev.isNull} = $falseGlobalIsNull; + ${ev.value} = $falseGlobalValue; + } + """ + } + else { + s""" + ${condEval.code} + boolean ${ev.isNull} = false; + ${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)}; + if (!${condEval.isNull} && ${condEval.value}) { + ${trueEval.code} + ${ev.isNull} = ${trueEval.isNull}; + ${ev.value} = ${trueEval.value}; + } else { + ${falseEval.code} + ${ev.isNull} = ${falseEval.isNull}; + ${ev.value} = ${falseEval.value}; + } + """ + } + } + + private def createAndAddFunction( + ctx: CodeGenContext, + ev: GeneratedExpressionCode, + dataType: DataType, + baseFuncName: String): (String, String, String) = { + val globalIsNull = ctx.freshName("isNull") + ctx.addMutableState("boolean", globalIsNull, s"$globalIsNull = false;") + val globalValue = ctx.freshName("value") + ctx.addMutableState(ctx.javaType(dataType), globalValue, + s"$globalValue = ${ctx.defaultValue(dataType)};") + val funcName = ctx.freshName(baseFuncName) + val funcBody = + s""" + |private void $funcName(InternalRow ${ctx.INPUT_ROW}) { + | ${ev.code.trim} + | $globalIsNull = ${ev.isNull}; + | $globalValue = ${ev.value}; + |} + """.stripMargin + ctx.addNewFunction(funcName, funcBody) + (funcName, globalIsNull, globalValue) } override def toString: String = s"if ($predicate) $trueValue else $falseValue" diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index e35a1b2d7c9a4..fb68eba6c9141 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -80,6 +80,26 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { assert(actual(0) == cases) } + test("SPARK-18091: split large if expressions into blocks due to JVM code size limit") { + val inStr = "StringForTesting" + val row = create_row(inStr) + val inputStrAttr = 'a.string.at(0) + + var strExpr: Expression = inputStrAttr + for (_ <- 1 to 13) { + strExpr = If(EqualTo(Decode(Encode(strExpr, "utf-8"), "utf-8"), inputStrAttr), + strExpr, strExpr) + } + + val expressions = Seq(strExpr) + val plan = GenerateUnsafeProjection.generate(expressions, true) + val actual = plan(row).toSeq(expressions.map(_.dataType)) + val expected = Seq(UTF8String.fromString(inStr)) + + if (!checkResult(actual, expected)) { + fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } + } test("test generated safe and unsafe projection") { val schema = new StructType(Array( From caafdc6262ff728c26f1987d711c484e97556138 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 12 Apr 2016 17:26:37 -0700 Subject: [PATCH 2/4] [SPARK-14578] [SQL] [BACKPORT-1.6] Fix codegen for CreateExternalRow with nested wide schema The wide schema, the expression of fields will be splitted into multiple functions, but the variable for loopVar can't be accessed in splitted functions, this PR change them as class member. Added regression test. Author: Davies Liu Closes #12338 from davies/nested_row. Conflicts: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- .../spark/sql/catalyst/expressions/objects.scala | 8 +++++--- .../sql/execution/datasources/json/JsonSuite.scala | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index f4b0cdc4c7b74..aaa5b859d76f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -414,6 +414,8 @@ case class MapObjects private( override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { val javaType = ctx.javaType(dataType) val elementJavaType = ctx.javaType(loopVar.dataType) + ctx.addMutableState("boolean", loopVar.isNull, "") + ctx.addMutableState(elementJavaType, loopVar.value, "") val genInputData = inputData.gen(ctx) val genFunction = lambdaFunction.gen(ctx) val dataLength = ctx.freshName("dataLength") @@ -434,9 +436,9 @@ case class MapObjects private( } val loopNullCheck = if (primitiveElement) { - s"boolean ${loopVar.isNull} = ${genInputData.value}.isNullAt($loopIndex);" + s"${loopVar.isNull} = ${genInputData.value}.isNullAt($loopIndex);" } else { - s"boolean ${loopVar.isNull} = ${genInputData.isNull} || ${loopVar.value} == null;" + s"${loopVar.isNull} = ${genInputData.isNull} || ${loopVar.value} == null;" } s""" @@ -452,7 +454,7 @@ case class MapObjects private( int $loopIndex = 0; while ($loopIndex < $dataLength) { - $elementJavaType ${loopVar.value} = + ${loopVar.value} = ($elementJavaType)${genInputData.value}${itemAccessor(loopIndex)}; $loopNullCheck diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index baa258ad26152..f376c2b7a4848 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1464,4 +1464,18 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { } } + test("wide nested json table") { + val nested = (1 to 100).map { i => + s""" + |"c$i": $i + """.stripMargin + }.mkString(", ") + val json = s""" + |{"a": [{$nested}], "b": [{$nested}]} + """.stripMargin + val rdd = sqlContext.sparkContext.makeRDD(Seq(json)) + val df = sqlContext.read.json(rdd) + assert(df.schema.size === 2) + df.collect() + } } From b60e788d8c584876e0aa36161ac2312609552596 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 11 Dec 2016 09:12:46 +0000 Subject: [PATCH 3/4] [SQL][MINOR] simplify a test to fix the maven tests ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/15620 , all of the Maven-based 2.0 Jenkins jobs time out consistently. As I pointed out in https://github.com/apache/spark/pull/15620#discussion_r91829129 , it seems that the regression test is an overkill and may hit constants pool size limitation, which is a known issue and hasn't been fixed yet. Since #15620 only fix the code size limitation problem, we can simplify the test to avoid hitting constants pool size limitation. ## How was this patch tested? test only change Author: Wenchen Fan Closes #16244 from cloud-fan/minor. --- .../expressions/CodeGenerationSuite.scala | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index fb68eba6c9141..1317e19da0ace 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -81,20 +81,15 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } test("SPARK-18091: split large if expressions into blocks due to JVM code size limit") { - val inStr = "StringForTesting" - val row = create_row(inStr) - val inputStrAttr = 'a.string.at(0) - - var strExpr: Expression = inputStrAttr - for (_ <- 1 to 13) { - strExpr = If(EqualTo(Decode(Encode(strExpr, "utf-8"), "utf-8"), inputStrAttr), - strExpr, strExpr) + var strExpr: Expression = Literal("abc") + for (_ <- 1 to 150) { + strExpr = Decode(Encode(strExpr, "utf-8"), "utf-8") } - val expressions = Seq(strExpr) - val plan = GenerateUnsafeProjection.generate(expressions, true) - val actual = plan(row).toSeq(expressions.map(_.dataType)) - val expected = Seq(UTF8String.fromString(inStr)) + val expressions = Seq(If(EqualTo(strExpr, strExpr), strExpr, strExpr)) + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(null).toSeq(expressions.map(_.dataType)) + val expected = Seq(UTF8String.fromString("abc")) if (!checkResult(actual, expected)) { fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") From 708c847bd86ded2f58ec0ff05424cc437aac99b5 Mon Sep 17 00:00:00 2001 From: Kapil Singh Date: Mon, 12 Dec 2016 15:56:41 +0530 Subject: [PATCH 4/4] [minor] Fixed unit test as per 1.6 --- .../spark/sql/catalyst/expressions/CodeGenerationSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 1317e19da0ace..030c99c5489e3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -82,12 +82,12 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { test("SPARK-18091: split large if expressions into blocks due to JVM code size limit") { var strExpr: Expression = Literal("abc") - for (_ <- 1 to 150) { + for (_ <- 1 to 100) { strExpr = Decode(Encode(strExpr, "utf-8"), "utf-8") } val expressions = Seq(If(EqualTo(strExpr, strExpr), strExpr, strExpr)) - val plan = GenerateMutableProjection.generate(expressions) + val plan = GenerateMutableProjection.generate(expressions)() val actual = plan(null).toSeq(expressions.map(_.dataType)) val expected = Seq(UTF8String.fromString("abc"))