Skip to content

Commit e8ccba5

Browse files
committed
add test cases
rebase master
1 parent e216c6a commit e8ccba5

File tree

6 files changed

+88
-81
lines changed

6 files changed

+88
-81
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,8 +1237,9 @@ object CodeGenerator extends Logging {
12371237
// bytecode instruction
12381238
final val MUTABLESTATEARRAY_SIZE_LIMIT = 32768
12391239

1240-
private lazy val compilerImpl: CompilerBase = {
1241-
val compiler = SparkEnv.get.conf.get("spark.sql.javaCompiler", "jdk").toLowerCase(Locale.ROOT)
1240+
// Make this non-private and non-lazy method for testing
1241+
def _compilerImpl(): CompilerBase = {
1242+
val compiler = SQLConf.get.javaCompiler
12421243
val compilerInstance = compiler match {
12431244
case "janino" => JaninoCompiler
12441245
case "jdk" => if (JdkCompiler.javaCompiler != null) JdkCompiler else JaninoCompiler
@@ -1249,12 +1250,14 @@ object CodeGenerator extends Logging {
12491250
compilerInstance
12501251
}
12511252

1253+
private lazy val compilerImpl = _compilerImpl
1254+
12521255
lazy val janinoCompilerEnabled: Boolean = {
12531256
compilerImpl == JaninoCompiler
12541257
}
12551258

12561259
/**
1257-
* Compile the Java source code into a Java class, using Janino.
1260+
* Compile the Java source code into a Java class, using Janino or javac
12581261
*
12591262
* @return a pair of a generated class and the max bytecode size of generated functions.
12601263
*/
@@ -1269,7 +1272,7 @@ object CodeGenerator extends Logging {
12691272
}
12701273

12711274
/**
1272-
* Compile the Java source code into a Java class, using Janino.
1275+
* Compile the Java source code into a Java class, using Janino or javac
12731276
*/
12741277
private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
12751278
compilerImpl.compile(code)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/compiler/CompilerBase.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,29 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions.codegen.compiler
1919

20+
import java.io.ByteArrayInputStream
21+
22+
import scala.collection.JavaConverters._
23+
import scala.language.existentials
24+
import scala.util.control.NonFatal
25+
26+
import org.codehaus.janino.util.ClassFile
27+
2028
import org.apache.spark.{TaskContext, TaskKilledException}
2129
import org.apache.spark.executor.InputMetrics
30+
import org.apache.spark.internal.Logging
31+
import org.apache.spark.metrics.source.CodegenMetrics
2232
import org.apache.spark.sql.catalyst.InternalRow
23-
import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData, UnsafeMapData, UnsafeProjection, UnsafeRow}
33+
import org.apache.spark.sql.catalyst.expressions._
2434
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, GeneratedClass}
2535
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
2636
import org.apache.spark.sql.types.Decimal
2737
import org.apache.spark.unsafe.Platform
2838
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
39+
import org.apache.spark.util.Utils
2940

3041

31-
abstract class CompilerBase {
42+
abstract class CompilerBase extends Logging {
3243
protected val className = "org.apache.spark.sql.catalyst.expressions.GeneratedClass"
3344

3445
protected val importClassNames = Seq(
@@ -77,4 +88,35 @@ abstract class CompilerBase {
7788
}
7889

7990
def compile(code: CodeAndComment): (GeneratedClass, Int)
91+
92+
/**
93+
* Returns the max bytecode size of the generated functions by inspecting janino private fields.
94+
* Also, this method updates the metrics information.
95+
*/
96+
protected def updateAndGetBytecodeSize(byteCodes: Iterable[Array[Byte]]): Int = {
97+
// Walk the classes to get at the method bytecode.
98+
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
99+
val codeAttrField = codeAttr.getDeclaredField("code")
100+
codeAttrField.setAccessible(true)
101+
val codeSizes = byteCodes.flatMap { byteCode =>
102+
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(byteCode.size)
103+
try {
104+
val cf = new ClassFile(new ByteArrayInputStream(byteCode))
105+
val stats = cf.methodInfos.asScala.flatMap { method =>
106+
method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a =>
107+
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
108+
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
109+
byteCodeSize
110+
}
111+
}
112+
Some(stats)
113+
} catch {
114+
case NonFatal(e) =>
115+
logWarning("Error calculating stats of compiled class.", e)
116+
None
117+
}
118+
}.flatten
119+
120+
codeSizes.max
121+
}
80122
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/compiler/JaninoCompiler.scala

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,20 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions.codegen.compiler
1919

20-
import java.io.ByteArrayInputStream
2120
import java.util.{Map => JavaMap}
2221

2322
import scala.collection.JavaConverters._
24-
import scala.language.existentials
25-
import scala.util.control.NonFatal
2623

2724
import org.codehaus.commons.compiler.CompileException
28-
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, InternalCompilerException, SimpleCompiler}
29-
import org.codehaus.janino.util.ClassFile
25+
import org.codehaus.janino._
3026

31-
import org.apache.spark.internal.Logging
32-
import org.apache.spark.metrics.source.CodegenMetrics
33-
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, GeneratedClass}
27+
import org.apache.spark.sql.catalyst.expressions.codegen._
3428
import org.apache.spark.sql.internal.SQLConf
3529
import org.apache.spark.util.{ParentClassLoader, Utils}
3630

3731

38-
object JaninoCompiler extends CompilerBase with Logging {
32+
object JaninoCompiler extends CompilerBase {
3933

40-
/**
41-
* Returns the max bytecode size of the generated functions by inspecting janino private fields.
42-
* Also, this method updates the metrics information.
43-
*/
4434
private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
4535
// First retrieve the generated classes.
4636
val classes = {
@@ -51,31 +41,7 @@ object JaninoCompiler extends CompilerBase with Logging {
5141
classesField.setAccessible(true)
5242
classesField.get(loader).asInstanceOf[JavaMap[String, Array[Byte]]].asScala
5343
}
54-
55-
// Then walk the classes to get at the method bytecode.
56-
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
57-
val codeAttrField = codeAttr.getDeclaredField("code")
58-
codeAttrField.setAccessible(true)
59-
val codeSizes = classes.flatMap { case (_, classBytes) =>
60-
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
61-
try {
62-
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
63-
val stats = cf.methodInfos.asScala.flatMap { method =>
64-
method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a =>
65-
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
66-
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
67-
byteCodeSize
68-
}
69-
}
70-
Some(stats)
71-
} catch {
72-
case NonFatal(e) =>
73-
logWarning("Error calculating stats of compiled class.", e)
74-
None
75-
}
76-
}.flatten
77-
78-
codeSizes.max
44+
updateAndGetBytecodeSize(classes.values)
7945
}
8046

8147
override def compile(code: CodeAndComment): (GeneratedClass, Int) = {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/compiler/JdkCompiler.scala

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ import javax.tools._
2525
import javax.tools.JavaFileManager.Location
2626
import javax.tools.JavaFileObject.Kind
2727

28+
import scala.collection.JavaConverters._
2829
import scala.collection.mutable
30+
import scala.language.existentials
2931

3032
import org.codehaus.commons.compiler.{CompileException, Location => CompilerLocation}
3133

3234
import org.apache.spark.internal.Logging
33-
import org.apache.spark.metrics.source.CodegenMetrics
3435
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeFormatter, GeneratedClass}
35-
import org.apache.spark.util.{ParentClassLoader, Utils}
36+
import org.apache.spark.sql.internal.SQLConf
37+
import org.apache.spark.util.ParentClassLoader
3638

3739

3840
class JavaCodeManager(fileManager: JavaFileManager)
@@ -123,13 +125,11 @@ class JDKDiagnosticListener extends DiagnosticListener[JavaFileObject] {
123125
// Wrap the exception in a RuntimeException, because "report()"
124126
// does not declare checked exceptions.
125127
throw new RuntimeException(new CompileException(message, loc))
126-
// } else if (logger.isTraceEnabled()) {
127-
// logger.trace(diagnostic.toString() + " (" + diagnostic.getCode() + ")")
128128
}
129129
}
130130
}
131131

132-
object JdkCompiler extends CompilerBase with Logging {
132+
object JdkCompiler extends CompilerBase {
133133
val javaCompiler = {
134134
ToolProvider.getSystemJavaCompiler
135135
}
@@ -142,7 +142,12 @@ object JdkCompiler extends CompilerBase with Logging {
142142
if ("-g".equals(debugOption)) {
143143
debugOption.append("none,")
144144
}
145-
Arrays.asList("-classpath", System.getProperty("java.class.path"), debugOption.init.toString())
145+
val compilerOption = SQLConf.get.jdkCompilerOptions
146+
if (compilerOption != null) {
147+
debugOption.append(compilerOption).append(",")
148+
}
149+
150+
Arrays.asList("-classpath", System.getProperty("java.class.path"), debugOption.toString())
146151
}
147152

148153
private val listener = new JDKDiagnosticListener()
@@ -194,39 +199,10 @@ object JdkCompiler extends CompilerBase with Logging {
194199
case _: Throwable => throw new CompileException("Compilation failed", null)
195200
}
196201

197-
// TODO: Needs to get the max bytecode size of generated methods
198-
val maxMethodBytecodeSize = updateAndGetCompilationStats(fileManager.objects.toMap)
202+
val byteCodes = fileManager.objects.toMap.values.map(_.getBytecode)
203+
val maxMethodBytecodeSize = updateAndGetBytecodeSize(byteCodes)
199204

200205
val clazz = fileManager.getClassLoader(null).loadClass(clazzName)
201206
(clazz.newInstance().asInstanceOf[GeneratedClass], maxMethodBytecodeSize)
202207
}
203-
204-
private def updateAndGetCompilationStats(objects: Map[String, JavaCode]): Int = {
205-
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
206-
val codeAttrField = codeAttr.getDeclaredField("code")
207-
codeAttrField.setAccessible(true)
208-
/*
209-
val codeSizes = objects.foreach { case (_, javaCode) =>
210-
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(javaCode.getBytecode.size)
211-
try {
212-
val cf = new ClassFile(new ByteArrayInputStream(javaCode.getBytecode))
213-
val stats = cf.methodInfos.asScala.flatMap { method =>
214-
method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a =>
215-
val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
216-
CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
217-
byteCodeSize
218-
}
219-
}
220-
Some(stats)
221-
} catch {
222-
case NonFatal(e) =>
223-
logWarning("Error calculating stats of compiled class.", e)
224-
None
225-
}
226-
}.flatten
227-
228-
codeSizes.max
229-
*/
230-
0
231-
}
232208
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1740,6 +1740,10 @@ class SQLConf extends Serializable with Logging {
17401740

17411741
def codegenCacheMaxEntries: Int = getConf(StaticSQLConf.CODEGEN_CACHE_MAX_ENTRIES)
17421742

1743+
def javaCompiler: String = getConf(StaticSQLConf.CODEGEN_JAVA_COMPILER)
1744+
1745+
def jdkCompilerOptions: String = getConf(StaticSQLConf.CODEGEN_JDK_JAVA_COMPILER_OPTION)
1746+
17431747
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
17441748

17451749
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,22 @@ object StaticSQLConf {
8282
.booleanConf
8383
.createWithDefault(false)
8484

85+
val CODEGEN_JAVA_COMPILER = buildStaticConf("spark.sql.codegen.javaCompiler")
86+
.internal()
87+
.doc("Sets the Java bytecode compiler for compiling Java methods for DataFrame or Dataset " +
88+
"program. Acceptable values include: jdk or janino")
89+
.stringConf
90+
.checkValues(Set("jdk", "janino"))
91+
.createWithDefault("jdk")
92+
93+
val CODEGEN_JDK_JAVA_COMPILER_OPTION =
94+
buildStaticConf("spark.sql.codegen.javaCompiler.jdkOption")
95+
.internal()
96+
.doc("Sets compiler options for JDK Java bytecode compiler. This is ignored when janino is" +
97+
"is selected")
98+
.stringConf
99+
.createWithDefault("")
100+
85101
// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
86102
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
87103
val DEBUG_MODE = buildStaticConf("spark.sql.debug")

0 commit comments

Comments
 (0)