Skip to content

Commit 4a779bd

Browse files
maropugatorsmile
authored andcommitted
[SPARK-21871][SQL] Check actual bytecode size when compiling generated code
## What changes were proposed in this pull request? This pr added code to check actual bytecode size when compiling generated code. In #18810, we added code to give up code compilation and use interpreter execution in `SparkPlan` if the line number of generated functions goes over `maxLinesPerFunction`. But, we already have code to collect metrics for compiled bytecode size in `CodeGenerator` object. So,we could easily reuse the code for this purpose. ## How was this patch tested? Added tests in `WholeStageCodegenSuite`. Author: Takeshi Yamamuro <[email protected]> Closes #19083 from maropu/SPARK-21871.
1 parent 64df08b commit 4a779bd

File tree

14 files changed

+94
-149
lines changed

14 files changed

+94
-149
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,6 @@ object CodeFormatter {
8989
}
9090
new CodeAndComment(code.result().trim(), map)
9191
}
92-
93-
def stripExtraNewLinesAndComments(input: String): String = {
94-
val commentReg =
95-
("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/
96-
"""([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment
97-
val codeWithoutComment = commentReg.replaceAllIn(input, "")
98-
codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines
99-
}
10092
}
10193

10294
private class CodeFormatter {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -373,20 +373,6 @@ class CodegenContext {
373373
*/
374374
private val placeHolderToComments = new mutable.HashMap[String, String]
375375

376-
/**
377-
* It will count the lines of every Java function generated by whole-stage codegen,
378-
* if there is a function of length greater than spark.sql.codegen.maxLinesPerFunction,
379-
* it will return true.
380-
*/
381-
def isTooLongGeneratedFunction: Boolean = {
382-
classFunctions.values.exists { _.values.exists {
383-
code =>
384-
val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code)
385-
codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction
386-
}
387-
}
388-
}
389-
390376
/**
391377
* Returns a term name that is unique within this instance of a `CodegenContext`.
392378
*/
@@ -1020,10 +1006,16 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
10201006
}
10211007

10221008
object CodeGenerator extends Logging {
1009+
1010+
// This is the value of HugeMethodLimit in the OpenJDK JVM settings
1011+
val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
1012+
10231013
/**
10241014
* Compile the Java source code into a Java class, using Janino.
1015+
*
1016+
* @return a pair of a generated class and the max bytecode size of generated functions.
10251017
*/
1026-
def compile(code: CodeAndComment): GeneratedClass = try {
1018+
def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
10271019
cache.get(code)
10281020
} catch {
10291021
// Cache.get() may wrap the original exception. See the following URL
@@ -1036,7 +1028,7 @@ object CodeGenerator extends Logging {
10361028
/**
10371029
* Compile the Java source code into a Java class, using Janino.
10381030
*/
1039-
private[this] def doCompile(code: CodeAndComment): GeneratedClass = {
1031+
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
10401032
val evaluator = new ClassBodyEvaluator()
10411033

10421034
// A special classloader used to wrap the actual parent classloader of
@@ -1075,9 +1067,9 @@ object CodeGenerator extends Logging {
10751067
s"\n${CodeFormatter.format(code)}"
10761068
})
10771069

1078-
try {
1070+
val maxCodeSize = try {
10791071
evaluator.cook("generated.java", code.body)
1080-
recordCompilationStats(evaluator)
1072+
updateAndGetCompilationStats(evaluator)
10811073
} catch {
10821074
case e: JaninoRuntimeException =>
10831075
val msg = s"failed to compile: $e"
@@ -1092,13 +1084,15 @@ object CodeGenerator extends Logging {
10921084
logInfo(s"\n${CodeFormatter.format(code, maxLines)}")
10931085
throw new CompileException(msg, e.getLocation)
10941086
}
1095-
evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass]
1087+
1088+
(evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass], maxCodeSize)
10961089
}
10971090

10981091
/**
1099-
* Records the generated class and method bytecode sizes by inspecting janino private fields.
1092+
* Returns the max bytecode size of the generated functions by inspecting janino private fields.
1093+
* Also, this method updates the metrics information.
11001094
*/
1101-
private def recordCompilationStats(evaluator: ClassBodyEvaluator): Unit = {
1095+
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
11021096
// First retrieve the generated classes.
11031097
val classes = {
11041098
val resultField = classOf[SimpleCompiler].getDeclaredField("result")
@@ -1113,23 +1107,26 @@ object CodeGenerator extends Logging {
11131107
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
11141108
val codeAttrField = codeAttr.getDeclaredField("code")
11151109
codeAttrField.setAccessible(true)
1116-
classes.foreach { case (_, classBytes) =>
1110+
val codeSizes = classes.flatMap { case (_, classBytes) =>
11171111
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
11181112
try {
11191113
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
1120-
cf.methodInfos.asScala.foreach { method =>
1121-
method.getAttributes().foreach { a =>
1122-
if (a.getClass.getName == codeAttr.getName) {
1123-
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
1124-
codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
1125-
}
1114+
val stats = cf.methodInfos.asScala.flatMap { method =>
1115+
method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a =>
1116+
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
1117+
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
1118+
byteCodeSize
11261119
}
11271120
}
1121+
Some(stats)
11281122
} catch {
11291123
case NonFatal(e) =>
11301124
logWarning("Error calculating stats of compiled class.", e)
1125+
None
11311126
}
1132-
}
1127+
}.flatten
1128+
1129+
codeSizes.max
11331130
}
11341131

11351132
/**
@@ -1144,8 +1141,8 @@ object CodeGenerator extends Logging {
11441141
private val cache = CacheBuilder.newBuilder()
11451142
.maximumSize(100)
11461143
.build(
1147-
new CacheLoader[CodeAndComment, GeneratedClass]() {
1148-
override def load(code: CodeAndComment): GeneratedClass = {
1144+
new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
1145+
override def load(code: CodeAndComment): (GeneratedClass, Int) = {
11491146
val startTime = System.nanoTime()
11501147
val result = doCompile(code)
11511148
val endTime = System.nanoTime()

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
142142
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
143143
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
144144

145-
val c = CodeGenerator.compile(code)
146-
c.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
145+
val (clazz, _) = CodeGenerator.compile(code)
146+
clazz.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
147147
}
148148
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
185185
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
186186
logDebug(s"Generated Ordering by ${ordering.mkString(",")}:\n${CodeFormatter.format(code)}")
187187

188-
CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
188+
val (clazz, _) = CodeGenerator.compile(code)
189+
clazz.generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
189190
}
190191
}
191192

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
7878
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
7979
logDebug(s"Generated predicate '$predicate':\n${CodeFormatter.format(code)}")
8080

81-
CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate]
81+
val (clazz, _) = CodeGenerator.compile(code)
82+
clazz.generate(ctx.references.toArray).asInstanceOf[Predicate]
8283
}
8384
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
189189
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
190190
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
191191

192-
val c = CodeGenerator.compile(code)
192+
val (clazz, _) = CodeGenerator.compile(code)
193193
val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
194-
c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
194+
clazz.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
195195
}
196196
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
409409
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
410410
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
411411

412-
val c = CodeGenerator.compile(code)
413-
c.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
412+
val (clazz, _) = CodeGenerator.compile(code)
413+
clazz.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
414414
}
415415
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
196196
val code = CodeFormatter.stripOverlappingComments(new CodeAndComment(codeBody, Map.empty))
197197
logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}")
198198

199-
val c = CodeGenerator.compile(code)
200-
c.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
199+
val (clazz, _) = CodeGenerator.compile(code)
200+
clazz.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
201201
}
202202
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
3030
import org.apache.spark.internal.config._
3131
import org.apache.spark.network.util.ByteUnit
3232
import org.apache.spark.sql.catalyst.analysis.Resolver
33+
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
3334
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
3435

3536
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -575,15 +576,15 @@ object SQLConf {
575576
"disable logging or -1 to apply no limit.")
576577
.createWithDefault(1000)
577578

578-
val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction")
579+
val WHOLESTAGE_HUGE_METHOD_LIMIT = buildConf("spark.sql.codegen.hugeMethodLimit")
579580
.internal()
580-
.doc("The maximum lines of a single Java function generated by whole-stage codegen. " +
581-
"When the generated function exceeds this threshold, " +
581+
.doc("The maximum bytecode size of a single compiled Java function generated by whole-stage " +
582+
"codegen. When the compiled function exceeds this threshold, " +
582583
"the whole-stage codegen is deactivated for this subtree of the current query plan. " +
583-
"The default value 4000 is the max length of byte code JIT supported " +
584-
"for a single function(8000) divided by 2.")
584+
s"The default value is ${CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT} and " +
585+
"this is a limit in the OpenJDK JVM implementation.")
585586
.intConf
586-
.createWithDefault(4000)
587+
.createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)
587588

588589
val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
589590
.doc("The maximum number of bytes to pack into a single partition when reading files.")
@@ -1058,7 +1059,7 @@ class SQLConf extends Serializable with Logging {
10581059

10591060
def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)
10601061

1061-
def maxLinesPerFunction: Int = getConf(WHOLESTAGE_MAX_LINES_PER_FUNCTION)
1062+
def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)
10621063

10631064
def tableRelationCacheSize: Int =
10641065
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -53,38 +53,6 @@ class CodeFormatterSuite extends SparkFunSuite {
5353
assert(reducedCode.body === "/*project_c4*/")
5454
}
5555

56-
test("removing extra new lines and comments") {
57-
val code =
58-
"""
59-
|/*
60-
| * multi
61-
| * line
62-
| * comments
63-
| */
64-
|
65-
|public function() {
66-
|/*comment*/
67-
| /*comment_with_space*/
68-
|code_body
69-
|//comment
70-
|code_body
71-
| //comment_with_space
72-
|
73-
|code_body
74-
|}
75-
""".stripMargin
76-
77-
val reducedCode = CodeFormatter.stripExtraNewLinesAndComments(code)
78-
assert(reducedCode ===
79-
"""
80-
|public function() {
81-
|code_body
82-
|code_body
83-
|code_body
84-
|}
85-
""".stripMargin)
86-
}
87-
8856
testCase("basic example") {
8957
"""
9058
|class A {

0 commit comments

Comments
 (0)