Skip to content

Commit fe2fb7f

Browse files
Davies Liurxin
authored andcommitted
[SPARK-9620] [SQL] generated UnsafeProjection should support many columns or large exressions
Currently, generated UnsafeProjection can reach 64k byte code limit of Java. This patch will split the generated expressions into multiple functions, to avoid the limitation. After this patch, we can work well with table that have up to 64k columns (hit max number of constants limit in Java), it should be enough in practice. cc rxin Author: Davies Liu <[email protected]> Closes apache#8044 from davies/wider_table and squashes the following commits: 9192e6c [Davies Liu] fix generated safe projection d1ef81a [Davies Liu] fix failed tests 737b3d3 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table ffcd132 [Davies Liu] address comments 1b95be4 [Davies Liu] put the generated class into sql package 77ed72d [Davies Liu] address comments 4518e17 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table 75ccd01 [Davies Liu] Merge branch 'master' of github.com:apache/spark into wider_table 495e932 [Davies Liu] support wider table with more than 1k columns for generated projections
1 parent 40ed2af commit fe2fb7f

File tree

6 files changed

+207
-142
lines changed

6 files changed

+207
-142
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.expressions.codegen
1919

2020
import scala.collection.mutable
21+
import scala.collection.mutable.ArrayBuffer
2122
import scala.language.existentials
2223

2324
import com.google.common.cache.{CacheBuilder, CacheLoader}
@@ -265,6 +266,45 @@ class CodeGenContext {
265266
def isPrimitiveType(jt: String): Boolean = primitiveTypes.contains(jt)
266267

267268
def isPrimitiveType(dt: DataType): Boolean = isPrimitiveType(javaType(dt))
269+
270+
/**
271+
* Splits the generated code of expressions into multiple functions, because function has
272+
* 64kb code size limit in JVM
273+
*
274+
* @param row the variable name of row that is used by expressions
275+
*/
276+
def splitExpressions(row: String, expressions: Seq[String]): String = {
277+
val blocks = new ArrayBuffer[String]()
278+
val blockBuilder = new StringBuilder()
279+
for (code <- expressions) {
280+
// We can't know how many byte code will be generated, so use the number of bytes as limit
281+
if (blockBuilder.length > 64 * 1000) {
282+
blocks.append(blockBuilder.toString())
283+
blockBuilder.clear()
284+
}
285+
blockBuilder.append(code)
286+
}
287+
blocks.append(blockBuilder.toString())
288+
289+
if (blocks.length == 1) {
290+
// inline execution if only one block
291+
blocks.head
292+
} else {
293+
val apply = freshName("apply")
294+
val functions = blocks.zipWithIndex.map { case (body, i) =>
295+
val name = s"${apply}_$i"
296+
val code = s"""
297+
|private void $name(InternalRow $row) {
298+
| $body
299+
|}
300+
""".stripMargin
301+
addNewFunction(name, code)
302+
name
303+
}
304+
305+
functions.map(name => s"$name($row);").mkString("\n")
306+
}
307+
}
268308
}
269309

270310
/**
@@ -289,15 +329,15 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
289329
protected def declareMutableStates(ctx: CodeGenContext): String = {
290330
ctx.mutableStates.map { case (javaType, variableName, _) =>
291331
s"private $javaType $variableName;"
292-
}.mkString
332+
}.mkString("\n")
293333
}
294334

295335
protected def initMutableStates(ctx: CodeGenContext): String = {
296-
ctx.mutableStates.map(_._3).mkString
336+
ctx.mutableStates.map(_._3).mkString("\n")
297337
}
298338

299339
protected def declareAddedFunctions(ctx: CodeGenContext): String = {
300-
ctx.addedFuntions.map { case (funcName, funcCode) => funcCode }.mkString
340+
ctx.addedFuntions.map { case (funcName, funcCode) => funcCode }.mkString("\n")
301341
}
302342

303343
/**
@@ -328,6 +368,8 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
328368
private[this] def doCompile(code: String): GeneratedClass = {
329369
val evaluator = new ClassBodyEvaluator()
330370
evaluator.setParentClassLoader(getClass.getClassLoader)
371+
// Cannot be under package codegen, or fail with java.lang.InstantiationException
372+
evaluator.setClassName("org.apache.spark.sql.catalyst.expressions.GeneratedClass")
331373
evaluator.setDefaultImports(Array(
332374
classOf[PlatformDependent].getName,
333375
classOf[InternalRow].getName,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
4040

4141
protected def create(expressions: Seq[Expression]): (() => MutableProjection) = {
4242
val ctx = newCodeGenContext()
43-
val projectionCode = expressions.zipWithIndex.map {
43+
val projectionCodes = expressions.zipWithIndex.map {
4444
case (NoOp, _) => ""
4545
case (e, i) =>
4646
val evaluationCode = e.gen(ctx)
@@ -65,49 +65,21 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
6565
"""
6666
}
6767
}
68-
// collect projections into blocks as function has 64kb codesize limit in JVM
69-
val projectionBlocks = new ArrayBuffer[String]()
70-
val blockBuilder = new StringBuilder()
71-
for (projection <- projectionCode) {
72-
if (blockBuilder.length > 16 * 1000) {
73-
projectionBlocks.append(blockBuilder.toString())
74-
blockBuilder.clear()
75-
}
76-
blockBuilder.append(projection)
77-
}
78-
projectionBlocks.append(blockBuilder.toString())
79-
80-
val (projectionFuns, projectionCalls) = {
81-
// inline execution if codesize limit was not broken
82-
if (projectionBlocks.length == 1) {
83-
("", projectionBlocks.head)
84-
} else {
85-
(
86-
projectionBlocks.zipWithIndex.map { case (body, i) =>
87-
s"""
88-
|private void apply$i(InternalRow i) {
89-
| $body
90-
|}
91-
""".stripMargin
92-
}.mkString,
93-
projectionBlocks.indices.map(i => s"apply$i(i);").mkString("\n")
94-
)
95-
}
96-
}
68+
val allProjections = ctx.splitExpressions("i", projectionCodes)
9769

9870
val code = s"""
9971
public Object generate($exprType[] expr) {
100-
return new SpecificProjection(expr);
72+
return new SpecificMutableProjection(expr);
10173
}
10274

103-
class SpecificProjection extends ${classOf[BaseMutableProjection].getName} {
75+
class SpecificMutableProjection extends ${classOf[BaseMutableProjection].getName} {
10476

10577
private $exprType[] expressions;
10678
private $mutableRowType mutableRow;
10779
${declareMutableStates(ctx)}
10880
${declareAddedFunctions(ctx)}
10981

110-
public SpecificProjection($exprType[] expr) {
82+
public SpecificMutableProjection($exprType[] expr) {
11183
expressions = expr;
11284
mutableRow = new $genericMutableRowType(${expressions.size});
11385
${initMutableStates(ctx)}
@@ -123,12 +95,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
12395
return (InternalRow) mutableRow;
12496
}
12597

126-
$projectionFuns
127-
12898
public Object apply(Object _i) {
12999
InternalRow i = (InternalRow) _i;
130-
$projectionCalls
131-
100+
$allProjections
132101
return mutableRow;
133102
}
134103
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions.codegen
1919

20-
import scala.collection.mutable.ArrayBuffer
21-
2220
import org.apache.spark.sql.catalyst.expressions._
2321
import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
2422
import org.apache.spark.sql.types._
@@ -43,6 +41,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
4341
val tmp = ctx.freshName("tmp")
4442
val output = ctx.freshName("safeRow")
4543
val values = ctx.freshName("values")
44+
// These expressions could be splitted into multiple functions
45+
ctx.addMutableState("Object[]", values, s"this.$values = null;")
46+
4647
val rowClass = classOf[GenericInternalRow].getName
4748

4849
val fieldWriters = schema.map(_.dataType).zipWithIndex.map { case (dt, i) =>
@@ -53,12 +54,12 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
5354
$values[$i] = ${converter.primitive};
5455
}
5556
"""
56-
}.mkString("\n")
57-
57+
}
58+
val allFields = ctx.splitExpressions(tmp, fieldWriters)
5859
val code = s"""
5960
final InternalRow $tmp = $input;
60-
final Object[] $values = new Object[${schema.length}];
61-
$fieldWriters
61+
this.$values = new Object[${schema.length}];
62+
$allFields
6263
final InternalRow $output = new $rowClass($values);
6364
"""
6465

@@ -128,7 +129,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
128129

129130
protected def create(expressions: Seq[Expression]): Projection = {
130131
val ctx = newCodeGenContext()
131-
val projectionCode = expressions.zipWithIndex.map {
132+
val expressionCodes = expressions.zipWithIndex.map {
132133
case (NoOp, _) => ""
133134
case (e, i) =>
134135
val evaluationCode = e.gen(ctx)
@@ -143,36 +144,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
143144
}
144145
"""
145146
}
146-
// collect projections into blocks as function has 64kb codesize limit in JVM
147-
val projectionBlocks = new ArrayBuffer[String]()
148-
val blockBuilder = new StringBuilder()
149-
for (projection <- projectionCode) {
150-
if (blockBuilder.length > 16 * 1000) {
151-
projectionBlocks.append(blockBuilder.toString())
152-
blockBuilder.clear()
153-
}
154-
blockBuilder.append(projection)
155-
}
156-
projectionBlocks.append(blockBuilder.toString())
157-
158-
val (projectionFuns, projectionCalls) = {
159-
// inline it if we have only one block
160-
if (projectionBlocks.length == 1) {
161-
("", projectionBlocks.head)
162-
} else {
163-
(
164-
projectionBlocks.zipWithIndex.map { case (body, i) =>
165-
s"""
166-
|private void apply$i(InternalRow i) {
167-
| $body
168-
|}
169-
""".stripMargin
170-
}.mkString,
171-
projectionBlocks.indices.map(i => s"apply$i(i);").mkString("\n")
172-
)
173-
}
174-
}
175-
147+
val allExpressions = ctx.splitExpressions("i", expressionCodes)
176148
val code = s"""
177149
public Object generate($exprType[] expr) {
178150
return new SpecificSafeProjection(expr);
@@ -183,19 +155,17 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
183155
private $exprType[] expressions;
184156
private $mutableRowType mutableRow;
185157
${declareMutableStates(ctx)}
158+
${declareAddedFunctions(ctx)}
186159

187160
public SpecificSafeProjection($exprType[] expr) {
188161
expressions = expr;
189162
mutableRow = new $genericMutableRowType(${expressions.size});
190163
${initMutableStates(ctx)}
191164
}
192165

193-
$projectionFuns
194-
195166
public Object apply(Object _i) {
196167
InternalRow i = (InternalRow) _i;
197-
$projectionCalls
198-
168+
$allExpressions
199169
return mutableRow;
200170
}
201171
}

0 commit comments

Comments
 (0)