Skip to content

Commit b1b4727

Browse files
ueshinrxin
authored andcommitted
[SPARK-17702][SQL] Code generation including too many mutable states exceeds JVM size limit.
## What changes were proposed in this pull request? Code generation including too many mutable states exceeds JVM size limit to extract values from `references` into fields in the constructor. We should split the generated extractions in the constructor into smaller functions. ## How was this patch tested? I added some tests to check if the generated codes for the expressions exceed or not. Author: Takuya UESHIN <[email protected]> Closes #15275 from ueshin/issues/SPARK-17702.
1 parent c571cfb commit b1b4727

File tree

8 files changed

+48
-12
lines changed

8 files changed

+48
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ class CodegenContext {
178178
def initMutableStates(): String = {
179179
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
180180
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
181-
mutableStates.distinct.map(_._3).mkString("\n")
181+
val initCodes = mutableStates.distinct.map(_._3 + "\n")
182+
// The generated initialization code may exceed 64kb function size limit in JVM if there are too
183+
// many mutable states, so split it into multiple functions.
184+
splitExpressions(initCodes, "init", Nil)
182185
}
183186

184187
/**
@@ -604,6 +607,11 @@ class CodegenContext {
604607
// Cannot split these expressions because they are not created from a row object.
605608
return expressions.mkString("\n")
606609
}
610+
splitExpressions(expressions, "apply", ("InternalRow", row) :: Nil)
611+
}
612+
613+
private def splitExpressions(
614+
expressions: Seq[String], funcName: String, arguments: Seq[(String, String)]): String = {
607615
val blocks = new ArrayBuffer[String]()
608616
val blockBuilder = new StringBuilder()
609617
for (code <- expressions) {
@@ -623,19 +631,19 @@ class CodegenContext {
623631
// inline execution if only one block
624632
blocks.head
625633
} else {
626-
val apply = freshName("apply")
634+
val func = freshName(funcName)
627635
val functions = blocks.zipWithIndex.map { case (body, i) =>
628-
val name = s"${apply}_$i"
636+
val name = s"${func}_$i"
629637
val code = s"""
630-
|private void $name(InternalRow $row) {
638+
|private void $name(${arguments.map { case (t, name) => s"$t $name" }.mkString(", ")}) {
631639
| $body
632640
|}
633641
""".stripMargin
634642
addNewFunction(name, code)
635643
name
636644
}
637645

638-
functions.map(name => s"$name($row);").mkString("\n")
646+
functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")});").mkString("\n")
639647
}
640648
}
641649

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,15 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
104104
private Object[] references;
105105
private MutableRow mutableRow;
106106
${ctx.declareMutableStates()}
107-
${ctx.declareAddedFunctions()}
108107

109108
public SpecificMutableProjection(Object[] references) {
110109
this.references = references;
111110
mutableRow = new $genericMutableRowType(${expressions.size});
112111
${ctx.initMutableStates()}
113112
}
114113

114+
${ctx.declareAddedFunctions()}
115+
115116
public ${classOf[BaseMutableProjection].getName} target(MutableRow row) {
116117
mutableRow = row;
117118
return this;

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,14 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
133133

134134
private Object[] references;
135135
${ctx.declareMutableStates()}
136-
${ctx.declareAddedFunctions()}
137136

138137
public SpecificOrdering(Object[] references) {
139138
this.references = references;
140139
${ctx.initMutableStates()}
141140
}
142141

142+
${ctx.declareAddedFunctions()}
143+
143144
public int compare(InternalRow a, InternalRow b) {
144145
InternalRow ${ctx.INPUT_ROW} = null; // Holds current row being evaluated.
145146
$comparisons

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
4040
protected def create(predicate: Expression): ((InternalRow) => Boolean) = {
4141
val ctx = newCodeGenContext()
4242
val eval = predicate.genCode(ctx)
43+
4344
val codeBody = s"""
4445
public SpecificPredicate generate(Object[] references) {
4546
return new SpecificPredicate(references);
@@ -48,13 +49,14 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
4849
class SpecificPredicate extends ${classOf[Predicate].getName} {
4950
private final Object[] references;
5051
${ctx.declareMutableStates()}
51-
${ctx.declareAddedFunctions()}
5252

5353
public SpecificPredicate(Object[] references) {
5454
this.references = references;
5555
${ctx.initMutableStates()}
5656
}
5757

58+
${ctx.declareAddedFunctions()}
59+
5860
public boolean eval(InternalRow ${ctx.INPUT_ROW}) {
5961
${eval.code}
6062
return !${eval.isNull} && ${eval.value};

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
155155
"""
156156
}
157157
val allExpressions = ctx.splitExpressions(ctx.INPUT_ROW, expressionCodes)
158+
158159
val codeBody = s"""
159160
public java.lang.Object generate(Object[] references) {
160161
return new SpecificSafeProjection(references);
@@ -165,14 +166,15 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
165166
private Object[] references;
166167
private MutableRow mutableRow;
167168
${ctx.declareMutableStates()}
168-
${ctx.declareAddedFunctions()}
169169

170170
public SpecificSafeProjection(Object[] references) {
171171
this.references = references;
172172
mutableRow = (MutableRow) references[references.length - 1];
173173
${ctx.initMutableStates()}
174174
}
175175

176+
${ctx.declareAddedFunctions()}
177+
176178
public java.lang.Object apply(java.lang.Object _i) {
177179
InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i;
178180
$allExpressions

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,14 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
374374

375375
private Object[] references;
376376
${ctx.declareMutableStates()}
377-
${ctx.declareAddedFunctions()}
378377

379378
public SpecificUnsafeProjection(Object[] references) {
380379
this.references = references;
381380
${ctx.initMutableStates()}
382381
}
383382

383+
${ctx.declareAddedFunctions()}
384+
384385
// Scala.Function1 need this
385386
public java.lang.Object apply(java.lang.Object row) {
386387
return apply((InternalRow) row);

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,16 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20+
import java.sql.Timestamp
21+
2022
import org.apache.spark.SparkFunSuite
2123
import org.apache.spark.metrics.source.CodegenMetrics
2224
import org.apache.spark.sql.Row
2325
import org.apache.spark.sql.catalyst.InternalRow
2426
import org.apache.spark.sql.catalyst.dsl.expressions._
2527
import org.apache.spark.sql.catalyst.expressions.codegen._
2628
import org.apache.spark.sql.catalyst.expressions.objects.{CreateExternalRow, GetExternalRowField, ValidateExternalType}
27-
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
29+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
2830
import org.apache.spark.sql.types._
2931
import org.apache.spark.unsafe.types.UTF8String
3032
import org.apache.spark.util.ThreadUtils
@@ -164,6 +166,23 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
164166
}
165167
}
166168

169+
test("SPARK-17702: split wide constructor into blocks due to JVM code size limit") {
170+
val length = 5000
171+
val expressions = Seq.fill(length) {
172+
ToUTCTimestamp(
173+
Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType),
174+
Literal.create("PST", StringType))
175+
}
176+
val plan = GenerateMutableProjection.generate(expressions)
177+
val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
178+
val expected = Seq.fill(length)(
179+
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))
180+
181+
if (!checkResult(actual, expected)) {
182+
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
183+
}
184+
}
185+
167186
test("test generated safe and unsafe projection") {
168187
val schema = new StructType(Array(
169188
StructField("a", StringType, true),

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,14 +316,16 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
316316
final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
317317

318318
private Object[] references;
319+
private scala.collection.Iterator[] inputs;
319320
${ctx.declareMutableStates()}
320321

321322
public GeneratedIterator(Object[] references) {
322323
this.references = references;
323324
}
324325

325-
public void init(int index, scala.collection.Iterator inputs[]) {
326+
public void init(int index, scala.collection.Iterator[] inputs) {
326327
partitionIndex = index;
328+
this.inputs = inputs;
327329
${ctx.initMutableStates()}
328330
}
329331

0 commit comments

Comments
 (0)