From 9295731f24591db188bd252e1ef557804111f2c6 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 1 Sep 2019 08:55:54 +0200 Subject: [PATCH 1/5] [SPARK-28916][SQL] Split subexpression elimination functions code to avoid 64KB limit --- .../expressions/codegen/CodeGenerator.scala | 15 ++++++++++++--- .../codegen/GenerateMutableProjection.scala | 2 +- .../codegen/GenerateUnsafeProjection.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 8 ++++++++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 95fad412002e2..f02845010480f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -403,13 +403,14 @@ class CodegenContext { * equivalentExpressions will match the tree containing `col1 + col2` and it will only * be evaluated once. */ - val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions + private val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions // Foreach expression that is participating in subexpression elimination, the state to use. - var subExprEliminationExprs = Map.empty[Expression, SubExprEliminationState] + // Visible for testing. + private[expressions] var subExprEliminationExprs = Map.empty[Expression, SubExprEliminationState] // The collection of sub-expression result resetting methods that need to be called on each row. - val subexprFunctions = mutable.ArrayBuffer.empty[String] + private val subexprFunctions = mutable.ArrayBuffer.empty[String] val outerClassName = "OuterClass" @@ -993,6 +994,14 @@ class CodegenContext { } } + /** + * Returns the code for subexpression elimination after splitting it if necessary. + */ + def subexprFunctionsCode: String = { + // Wholestage codegen does not allow subexpression elimination + splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW)) + } + /** * Perform a function which generates a sequence of ExprCodes with a given mapping between * expressions and common expressions, instead of using the mapping in current context. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 838bd1c679e4d..2e018de07101e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -92,7 +92,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP } // Evaluate all the subexpressions. - val evalSubexpr = ctx.subexprFunctions.mkString("\n") + val evalSubexpr = ctx.subexprFunctionsCode val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._1)) val allUpdates = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._2)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index fb1d8a3c8e739..8da7f65bdeee6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -299,7 +299,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro v => s"$v = new $rowWriterClass(${expressions.length}, ${numVarLenFields * 32});") // Evaluate all the subexpression. - val evalSubexpr = ctx.subexprFunctions.mkString("\n") + val evalSubexpr = ctx.subexprFunctionsCode val writeExpressions = writeExpressionsToBuffer( ctx, ctx.INPUT_ROW, exprEvals, exprSchemas, rowWriter, isTopLevel = true) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b4ddfecaee469..0cdcc4b087ed9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2202,4 +2202,12 @@ class DataFrameSuite extends QueryTest with SharedSparkSession { |*(1) Range (0, 10, step=1, splits=2)""".stripMargin)) } } + + test("SPARK-28916: subexrepssion elimination can cause 64kb code limit") { + val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*) + df.createOrReplaceTempView("spark64kb") + val data = spark.sql("select * from spark64kb limit 10") + // This fails if 64Kb limit is reached in code generation + data.describe() + } } From 0167929ce3fe2543fef95cddfa576d00008e7661 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 2 Sep 2019 10:37:54 +0200 Subject: [PATCH 2/5] address comment --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f02845010480f..3756e72f8fe43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -999,6 +999,7 @@ class CodegenContext { */ def subexprFunctionsCode: String = { // Wholestage codegen does not allow subexpression elimination + assert(currentVars == null) splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW)) } From 2c6b64edbb3e7b0ff261baef5ffcca0773cea183 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 2 Sep 2019 13:57:12 +0200 Subject: [PATCH 3/5] revert --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 3756e72f8fe43..9fdaa9c86b660 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -998,8 +998,8 @@ class CodegenContext { * Returns the code for subexpression elimination after splitting it if necessary. */ def subexprFunctionsCode: String = { - // Wholestage codegen does not allow subexpression elimination - assert(currentVars == null) + // Wholestage codegen does not allow subexpression elimination: in that case, subexprFunctions + // is empty and an empty string is returned splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW)) } From 6f4c524b4c554b35eb7b1e8b6204f76d39347ac6 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 2 Sep 2019 16:53:26 +0200 Subject: [PATCH 4/5] reword comment --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 9fdaa9c86b660..0d2c1c0e3341e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -998,8 +998,7 @@ class CodegenContext { * Returns the code for subexpression elimination after splitting it if necessary. */ def subexprFunctionsCode: String = { - // Wholestage codegen does not allow subexpression elimination: in that case, subexprFunctions - // is empty and an empty string is returned + // Whole-stage codegen's subexpression elimination is handled in another code path splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW)) } From 2d4b8f8ab952fd41cab7c8d782944dddc2d78dc1 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 6 Sep 2019 21:44:27 +0200 Subject: [PATCH 5/5] change uts --- .../catalyst/expressions/codegen/CodeGenerator.scala | 1 + .../catalyst/expressions/CodeGenerationSuite.scala | 12 ++++++++++++ .../scala/org/apache/spark/sql/DataFrameSuite.scala | 8 -------- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 0d2c1c0e3341e..349eda8035441 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -999,6 +999,7 @@ class CodegenContext { */ def subexprFunctionsCode: String = { // Whole-stage codegen's subexpression elimination is handled in another code path + assert(currentVars == null || subexprFunctions.isEmpty) splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 4e64313da136b..28d2607e6e43e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -545,6 +545,18 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } assert(appender.seenMessage) } + + test("SPARK-28916: subexrepssion elimination can cause 64kb code limit on UnsafeProjection") { + val numOfExprs = 10000 + val exprs = (0 to numOfExprs).flatMap(colIndex => + Seq(Add(BoundReference(colIndex, DoubleType, true), + BoundReference(numOfExprs + colIndex, DoubleType, true)), + Add(BoundReference(colIndex, DoubleType, true), + BoundReference(numOfExprs + colIndex, DoubleType, true)))) + // these should not fail to compile due to 64K limit + GenerateUnsafeProjection.generate(exprs, true) + GenerateMutableProjection.generate(exprs, true) + } } case class HugeCodeIntExpression(value: Int) extends Expression { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 0cdcc4b087ed9..b4ddfecaee469 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2202,12 +2202,4 @@ class DataFrameSuite extends QueryTest with SharedSparkSession { |*(1) Range (0, 10, step=1, splits=2)""".stripMargin)) } } - - test("SPARK-28916: subexrepssion elimination can cause 64kb code limit") { - val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*) - df.createOrReplaceTempView("spark64kb") - val data = spark.sql("select * from spark64kb limit 10") - // This fails if 64Kb limit is reached in code generation - data.describe() - } }