Skip to content

Commit c411579

Browse files
mgaido91cloud-fan
authored andcommitted
[SPARK-28916][SQL] Split subexpression elimination functions code for Generate[Mutable|Unsafe]Projection
### What changes were proposed in this pull request? The PR proposes to split the code for subexpression elimination before inlining the function calls all in the apply method for `Generate[Mutable|Unsafe]Projection`. ### Why are the changes needed? Before this PR, code generation can fail due to the 64KB code size limit if a lot of subexpression elimination functions are generated. The added UT is a reproducer for the issue (thanks to the JIRA reporter and HyukjinKwon for it). ### Does this PR introduce any user-facing change? No. ### How was this patch tested? added UT Closes apache#25642 from mgaido91/SPARK-28916. Authored-by: Marco Gaido <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 0ed9fae commit c411579

File tree

4 files changed

+27
-5
lines changed

4 files changed

+27
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -403,13 +403,14 @@ class CodegenContext {
403403
* equivalentExpressions will match the tree containing `col1 + col2` and it will only
404404
* be evaluated once.
405405
*/
406-
val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions
406+
private val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions
407407

408408
// Foreach expression that is participating in subexpression elimination, the state to use.
409-
var subExprEliminationExprs = Map.empty[Expression, SubExprEliminationState]
409+
// Visible for testing.
410+
private[expressions] var subExprEliminationExprs = Map.empty[Expression, SubExprEliminationState]
410411

411412
// The collection of sub-expression result resetting methods that need to be called on each row.
412-
val subexprFunctions = mutable.ArrayBuffer.empty[String]
413+
private val subexprFunctions = mutable.ArrayBuffer.empty[String]
413414

414415
val outerClassName = "OuterClass"
415416

@@ -993,6 +994,15 @@ class CodegenContext {
993994
}
994995
}
995996

997+
/**
998+
* Returns the code for subexpression elimination after splitting it if necessary.
999+
*/
1000+
def subexprFunctionsCode: String = {
1001+
// Whole-stage codegen's subexpression elimination is handled in another code path
1002+
assert(currentVars == null || subexprFunctions.isEmpty)
1003+
splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW))
1004+
}
1005+
9961006
/**
9971007
* Perform a function which generates a sequence of ExprCodes with a given mapping between
9981008
* expressions and common expressions, instead of using the mapping in current context.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
9292
}
9393

9494
// Evaluate all the subexpressions.
95-
val evalSubexpr = ctx.subexprFunctions.mkString("\n")
95+
val evalSubexpr = ctx.subexprFunctionsCode
9696

9797
val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._1))
9898
val allUpdates = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._2))

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
299299
v => s"$v = new $rowWriterClass(${expressions.length}, ${numVarLenFields * 32});")
300300

301301
// Evaluate all the subexpression.
302-
val evalSubexpr = ctx.subexprFunctions.mkString("\n")
302+
val evalSubexpr = ctx.subexprFunctionsCode
303303

304304
val writeExpressions = writeExpressionsToBuffer(
305305
ctx, ctx.INPUT_ROW, exprEvals, exprSchemas, rowWriter, isTopLevel = true)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,18 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
545545
}
546546
assert(appender.seenMessage)
547547
}
548+
549+
test("SPARK-28916: subexrepssion elimination can cause 64kb code limit on UnsafeProjection") {
550+
val numOfExprs = 10000
551+
val exprs = (0 to numOfExprs).flatMap(colIndex =>
552+
Seq(Add(BoundReference(colIndex, DoubleType, true),
553+
BoundReference(numOfExprs + colIndex, DoubleType, true)),
554+
Add(BoundReference(colIndex, DoubleType, true),
555+
BoundReference(numOfExprs + colIndex, DoubleType, true))))
556+
// these should not fail to compile due to 64K limit
557+
GenerateUnsafeProjection.generate(exprs, true)
558+
GenerateMutableProjection.generate(exprs, true)
559+
}
548560
}
549561

550562
case class HugeCodeIntExpression(value: Int) extends Expression {

0 commit comments

Comments
 (0)