From c2cc295fd85a7a0e42debc954311ff74f5b52962 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 6 Oct 2017 17:33:08 +0200 Subject: [PATCH 01/21] add a method for each inner class and use it in the superclass --- .../expressions/codegen/CodeGenerator.scala | 61 +++++++++++++++++-- 1 file changed, 56 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f9c5ef8439085..2a8f7803160cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -77,6 +77,20 @@ case class SubExprEliminationState(isNull: String, value: String) */ case class SubExprCodes(codes: Seq[String], states: Map[Expression, SubExprEliminationState]) +/** + * The main information about a new added function. + * + * @param functionName String representing the name of the function + * @param subclassName Optional value which is empty if the function is added to + * the superclass, otherwise it contains the name of the + * inner class in which the function has been added. + * @param subclassInstance Optional value which is empty if the function is added to + * the superclass, otherwise it contains the name of the + * instance of the inner class in the outer class. + */ +private[codegen] case class NewFunction(functionName: String, subclassName: Option[String], + subclassInstance: Option[String]) + /** * A context for codegen, tracking a list of objects that could be passed into generated Java * function. @@ -277,6 +291,18 @@ class CodegenContext { funcName: String, funcCode: String, inlineToOuterClass: Boolean = false): String = { + val newFunction = addNewFunctionInternal(funcName, funcCode, inlineToOuterClass) + newFunction match { + case NewFunction(functionName, None, None) => functionName + case NewFunction(functionName, Some(_), Some(subclassInstance)) => + subclassInstance + "." + functionName + } + } + + private[this] def addNewFunctionInternal( + funcName: String, + funcCode: String, + inlineToOuterClass: Boolean): NewFunction = { // The number of named constants that can exist in the class is limited by the Constant Pool // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a // threshold of 1600k bytes to determine when a function should be inlined to a private, nested @@ -298,10 +324,9 @@ class CodegenContext { classFunctions(className) += funcName -> funcCode if (className == outerClassName) { - funcName + NewFunction(funcName, None, None) } else { - - s"$classInstance.$funcName" + NewFunction(funcName, Some(className), Some(classInstance)) } } @@ -798,10 +823,36 @@ class CodegenContext { | ${makeSplitFunction(body)} |} """.stripMargin - addNewFunction(name, code) + addNewFunctionInternal(name, code, inlineToOuterClass = false) } - foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})")) + val outerClassFunctions = functions + .filter(_.subclassName.isEmpty) + .map(_.functionName) + + val innerClassFunctions = functions + .filter(_.subclassName.isDefined) + .foldLeft(Map.empty[(String, String), Seq[String]]) { case (acc, f) => + val key = (f.subclassName.get, f.subclassInstance.get) + acc.updated(key, acc.getOrElse(key, Seq.empty[String]) ++ Seq(f.functionName)) + } + .map { case ((subclassName, subclassInstance), subclassFunctions) => + // Adding a new function to each subclass which contains + // the invocation of all the ones which have been added to + // that subclass + val code = s""" + |private $returnType $func($argString) { + | ${makeSplitFunction(foldFunctions(subclassFunctions.map(name => + s"$name(${arguments.map(_._2).mkString(", ")})")))} + |} + """.stripMargin + classSize(subclassName) += code.length + classFunctions(subclassName) += func -> code + s"$subclassInstance.$func" + } + + foldFunctions((outerClassFunctions ++ innerClassFunctions).map( + name => s"$name(${arguments.map(_._2).mkString(", ")})")) } } From d3a5b872e5446e1205a91498d977af6e6259e58b Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 9 Oct 2017 16:21:45 +0200 Subject: [PATCH 02/21] Adding UT and modifying class size limit --- .../expressions/codegen/CodeGenerator.scala | 18 ++++++++++++------ .../expressions/CodeGenerationSuite.scala | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 2a8f7803160cc..a45a3daae987f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -305,11 +305,11 @@ class CodegenContext { inlineToOuterClass: Boolean): NewFunction = { // The number of named constants that can exist in the class is limited by the Constant Pool // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a - // threshold of 1600k bytes to determine when a function should be inlined to a private, nested + // threshold of 1000k bytes to determine when a function should be inlined to a private, nested // sub-class. val (className, classInstance) = if (inlineToOuterClass) { outerClassName -> "" - } else if (currClassSize > 1600000) { + } else if (currClassSize > 1000000) { val className = freshName("NestedClass") val classInstance = freshName("nestedClassInstance") @@ -320,8 +320,7 @@ class CodegenContext { currClass() } - classSize(className) += funcCode.length - classFunctions(className) += funcName -> funcCode + addNewFunctionToClass(funcName, funcCode, className) if (className == outerClassName) { NewFunction(funcName, None, None) @@ -330,6 +329,14 @@ class CodegenContext { } } + private[this] def addNewFunctionToClass( + funcName: String, + funcCode: String, + className: String) = { + classSize(className) += funcCode.length + classFunctions(className) += funcName -> funcCode + } + /** * Declares all function code. If the added functions are too many, split them into nested * sub-classes to avoid hitting Java compiler constant pool limitation. @@ -846,8 +853,7 @@ class CodegenContext { s"$name(${arguments.map(_._2).mkString(", ")})")))} |} """.stripMargin - classSize(subclassName) += code.length - classFunctions(subclassName) += func -> code + addNewFunctionToClass(func, code, subclassName) s"$subclassInstance.$func" } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 7ea0bec145481..1e6f7b65e7e72 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -201,6 +201,23 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("SPARK-22226: group splitted expressions into one method per nested class") { + val length = 10000 + val expressions = Seq.fill(length) { + ToUTCTimestamp( + Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType), + Literal.create("PST", StringType)) + } + val plan = GenerateMutableProjection.generate(expressions) + val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType)) + val expected = Seq.fill(length)( + DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00"))) + + if (actual != expected) { + fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } + } + test("test generated safe and unsafe projection") { val schema = new StructType(Array( StructField("a", StringType, true), From e63264be8bc17c00d5d16d3ea7251ba0f3c8b2c8 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 12 Oct 2017 12:10:47 +0200 Subject: [PATCH 03/21] fix naming error in the comment --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index a45a3daae987f..f8fb5666756a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -82,7 +82,7 @@ case class SubExprCodes(codes: Seq[String], states: Map[Expression, SubExprElimi * * @param functionName String representing the name of the function * @param subclassName Optional value which is empty if the function is added to - * the superclass, otherwise it contains the name of the + * the outer class, otherwise it contains the name of the * inner class in which the function has been added. * @param subclassInstance Optional value which is empty if the function is added to * the superclass, otherwise it contains the name of the From bdc2fdb18688b86a1b30d6ea8a692a3e95858049 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 12 Oct 2017 15:16:51 +0200 Subject: [PATCH 04/21] fix naming error in comment --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f8fb5666756a0..bf5dd349f297a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -85,7 +85,7 @@ case class SubExprCodes(codes: Seq[String], states: Map[Expression, SubExprElimi * the outer class, otherwise it contains the name of the * inner class in which the function has been added. * @param subclassInstance Optional value which is empty if the function is added to - * the superclass, otherwise it contains the name of the + * the outer class, otherwise it contains the name of the * instance of the inner class in the outer class. */ private[codegen] case class NewFunction(functionName: String, subclassName: Option[String], From b9ffc52122cf5e692006d162e2a788f13b2f5c3d Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 12 Oct 2017 16:38:00 +0200 Subject: [PATCH 05/21] set threshold to merge the methods, preserve order and add comments --- .../expressions/codegen/CodeGenerator.scala | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index a45a3daae987f..38b168d3c733d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -21,6 +21,7 @@ import java.io.ByteArrayInputStream import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ +import scala.collection.immutable.ListMap import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.language.existentials @@ -833,28 +834,39 @@ class CodegenContext { addNewFunctionInternal(name, code, inlineToOuterClass = false) } + // Here we store all the methods which have been added to the outer class. val outerClassFunctions = functions .filter(_.subclassName.isEmpty) .map(_.functionName) + // Here we handle all the methods which have been added to the nested subclasses and + // not to the outer class. + // Since they can be many, their direct invocation in the outer class adds many entries + // to the outer class' constant pool. This can cause the constant pool to past JVM limit. + // To avoid this problem, we group them and we call only the grouping methods in the + // outer class. val innerClassFunctions = functions .filter(_.subclassName.isDefined) - .foldLeft(Map.empty[(String, String), Seq[String]]) { case (acc, f) => + .foldLeft(ListMap.empty[(String, String), Seq[String]]) { case (acc, f) => val key = (f.subclassName.get, f.subclassInstance.get) acc.updated(key, acc.getOrElse(key, Seq.empty[String]) ++ Seq(f.functionName)) } - .map { case ((subclassName, subclassInstance), subclassFunctions) => - // Adding a new function to each subclass which contains - // the invocation of all the ones which have been added to - // that subclass - val code = s""" - |private $returnType $func($argString) { - | ${makeSplitFunction(foldFunctions(subclassFunctions.map(name => - s"$name(${arguments.map(_._2).mkString(", ")})")))} - |} - """.stripMargin - addNewFunctionToClass(func, code, subclassName) - s"$subclassInstance.$func" + .flatMap { case ((subclassName, subclassInstance), subclassFunctions) => + if (subclassFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { + // Adding a new function to each subclass which contains + // the invocation of all the ones which have been added to + // that subclass + val code = s""" + |private $returnType $func($argString) { + | ${makeSplitFunction(foldFunctions(subclassFunctions.map(name => + s"$name(${arguments.map(_._2).mkString(", ")})")))} + |} + """.stripMargin + addNewFunctionToClass(func, code, subclassName) + Seq(s"$subclassInstance.$func") + } else { + subclassFunctions + } } foldFunctions((outerClassFunctions ++ innerClassFunctions).map( @@ -1067,6 +1079,8 @@ object CodeGenerator extends Logging { // This is the value of HugeMethodLimit in the OpenJDK JVM settings val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + val MERGE_SPLIT_METHODS_THRESHOLD = 3 + /** * Compile the Java source code into a Java class, using Janino. * From 76b5489d908f5a979280859021d61ed91cd106a7 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 12 Oct 2017 18:58:43 +0200 Subject: [PATCH 06/21] fix bug when the merge split methods threshold is not satisfied --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index c748f08558f62..d9d8253d2a3a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -865,7 +865,7 @@ class CodegenContext { addNewFunctionToClass(func, code, subclassName) Seq(s"$subclassInstance.$func") } else { - subclassFunctions + subclassFunctions.map(f => s"$subclassInstance.$f") } } From 20626b46ad62e32d1a77f9cf13446bd9a970fada Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 13 Oct 2017 09:40:20 +0200 Subject: [PATCH 07/21] fix indent --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index d9d8253d2a3a1..3974c6cc162ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -90,7 +90,7 @@ case class SubExprCodes(codes: Seq[String], states: Map[Expression, SubExprElimi * instance of the inner class in the outer class. */ private[codegen] case class NewFunction(functionName: String, subclassName: Option[String], - subclassInstance: Option[String]) + subclassInstance: Option[String]) /** * A context for codegen, tracking a list of objects that could be passed into generated Java From 5f3950003444bd78560e63f7325f4cd22e49b2a8 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 13 Oct 2017 14:37:10 +0200 Subject: [PATCH 08/21] Adding end-to-end UTs --- .../org/apache/spark/sql/DataFrameSuite.scala | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index dd8f54b690f64..54886e50c7177 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2103,4 +2103,35 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { testData2.select(lit(7), 'a, 'b).orderBy(lit(1), lit(2), lit(3)), Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2))) } + + test("SPARK-22226: splitExpressions should not cause \"Code of method grows beyond 64 KB\"") { + val colNumber = 10000 + val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) + val df = sqlContext.createDataFrame(input, StructType( + (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false)))) + val newCols = (1 to colNumber).flatMap { colIndex => + Seq(expr(s"if(1000 < _$colIndex, 1000, _$colIndex)"), + expr(s"sqrt(_$colIndex)")) + } + df.select(newCols: _*).collect() + } + + test("SPARK-22226: too many splitted expressions should not exceed constant pool limit") { + val colNumber = 1000 + val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) + val df = sqlContext.createDataFrame(input, StructType( + (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false)))) + + val funcs = (1 to colNumber).flatMap { colIndex => + val colName = s"_$colIndex" + val tsFormat = "yyyy-MM-dd HH:mm:ss.SSS" + Seq(expr(s"unix_timestamp(concat('2017-10-13 10:26:59.', $colName),'$tsFormat')"), + expr(s"unix_timestamp(concat('2017-10-13 11:26:59.', $colName),'$tsFormat')"), + expr(s"unix_timestamp(concat('2017-10-13 12:26:59.', $colName),'$tsFormat')"), + expr(s"unix_timestamp(concat('2017-10-13 13:26:59.', $colName),'$tsFormat')"), + expr(s"unix_timestamp(concat('2017-10-13 14:26:59.', $colName),'$tsFormat')"), + expr(s"unix_timestamp(concat('2017-10-13 15:26:59.', $colName),'$tsFormat')").as(colName)) + } + df.select(funcs: _*).dropDuplicates((1 to 5).map(colIndex => s"_$colIndex")).collect() + } } From c4601b4ae1f6ba618e1a92e75b06faaf8d14bee3 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 13 Oct 2017 14:39:22 +0200 Subject: [PATCH 09/21] Adding comment to the threshold --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 3974c6cc162ba..d9304b3958f55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1079,6 +1079,8 @@ object CodeGenerator extends Logging { // This is the value of HugeMethodLimit in the OpenJDK JVM settings val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + // This is the threshold over which the methods in a inner class are grouped in a single + // method which is going to be called by the outer class instead of the many small ones val MERGE_SPLIT_METHODS_THRESHOLD = 3 /** From 61cc4458613722bba99fd92e9b30c1d797916846 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 13 Oct 2017 16:05:01 +0200 Subject: [PATCH 10/21] improve test description --- .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 54886e50c7177..9cabae06cfb20 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2104,7 +2104,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2))) } - test("SPARK-22226: splitExpressions should not cause \"Code of method grows beyond 64 KB\"") { + test("SPARK-22226: splitExpressions should not generate codes beyond 64KB") { val colNumber = 10000 val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) val df = sqlContext.createDataFrame(input, StructType( From 37506dcc380cf5c14ea929b33f9e8e26efdbcb8d Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 14 Oct 2017 11:56:09 +0200 Subject: [PATCH 11/21] remove useless UT --- .../org/apache/spark/sql/DataFrameSuite.scala | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 9cabae06cfb20..84461200f2445 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2115,23 +2115,4 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } df.select(newCols: _*).collect() } - - test("SPARK-22226: too many splitted expressions should not exceed constant pool limit") { - val colNumber = 1000 - val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*)) - val df = sqlContext.createDataFrame(input, StructType( - (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false)))) - - val funcs = (1 to colNumber).flatMap { colIndex => - val colName = s"_$colIndex" - val tsFormat = "yyyy-MM-dd HH:mm:ss.SSS" - Seq(expr(s"unix_timestamp(concat('2017-10-13 10:26:59.', $colName),'$tsFormat')"), - expr(s"unix_timestamp(concat('2017-10-13 11:26:59.', $colName),'$tsFormat')"), - expr(s"unix_timestamp(concat('2017-10-13 12:26:59.', $colName),'$tsFormat')"), - expr(s"unix_timestamp(concat('2017-10-13 13:26:59.', $colName),'$tsFormat')"), - expr(s"unix_timestamp(concat('2017-10-13 14:26:59.', $colName),'$tsFormat')"), - expr(s"unix_timestamp(concat('2017-10-13 15:26:59.', $colName),'$tsFormat')").as(colName)) - } - df.select(funcs: _*).dropDuplicates((1 to 5).map(colIndex => s"_$colIndex")).collect() - } } From bce3616cccfb5c1b1dc7fca32d17764fda142e7d Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 18 Oct 2017 00:16:59 +0200 Subject: [PATCH 12/21] fix grammar error in comment --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index a3a9128f11d88..21c1c86037c47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1082,7 +1082,7 @@ object CodeGenerator extends Logging { // This is the value of HugeMethodLimit in the OpenJDK JVM settings val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 - // This is the threshold over which the methods in a inner class are grouped in a single + // This is the threshold over which the methods in an inner class are grouped in a single // method which is going to be called by the outer class instead of the many small ones val MERGE_SPLIT_METHODS_THRESHOLD = 3 From c4ab587696a1b9a752f30a491b454e1cea0133f7 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 25 Oct 2017 16:11:19 +0200 Subject: [PATCH 13/21] code refactor --- .../expressions/codegen/CodeGenerator.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 21c1c86037c47..83329bae64985 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -842,27 +842,30 @@ class CodegenContext { .filter(_.subclassName.isEmpty) .map(_.functionName) + val argsString = arguments.map(_._2).mkString(", ") + // Here we handle all the methods which have been added to the nested subclasses and // not to the outer class. // Since they can be many, their direct invocation in the outer class adds many entries // to the outer class' constant pool. This can cause the constant pool to past JVM limit. // To avoid this problem, we group them and we call only the grouping methods in the // outer class. - val innerClassFunctions = functions + val innerClassToFunctions = functions .filter(_.subclassName.isDefined) .foldLeft(ListMap.empty[(String, String), Seq[String]]) { case (acc, f) => val key = (f.subclassName.get, f.subclassInstance.get) acc.updated(key, acc.getOrElse(key, Seq.empty[String]) ++ Seq(f.functionName)) } - .flatMap { case ((subclassName, subclassInstance), subclassFunctions) => + val innerClassFunctions = innerClassToFunctions.flatMap { + case ((subclassName, subclassInstance), subclassFunctions) => if (subclassFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { // Adding a new function to each subclass which contains // the invocation of all the ones which have been added to // that subclass + val body = foldFunctions(subclassFunctions.map(name => s"$name($argsString)")) val code = s""" |private $returnType $func($argString) { - | ${makeSplitFunction(foldFunctions(subclassFunctions.map(name => - s"$name(${arguments.map(_._2).mkString(", ")})")))} + | ${makeSplitFunction(body)} |} """.stripMargin addNewFunctionToClass(func, code, subclassName) @@ -873,7 +876,7 @@ class CodegenContext { } foldFunctions((outerClassFunctions ++ innerClassFunctions).map( - name => s"$name(${arguments.map(_._2).mkString(", ")})")) + name => s"$name($argsString)")) } } From 02151392fedca69c784c039a2e39c648dfd1b894 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 25 Oct 2017 17:01:54 +0200 Subject: [PATCH 14/21] create a constant for class size threshold --- .../catalyst/expressions/codegen/CodeGenerator.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 83329bae64985..5472c55dfb6f7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -304,13 +304,9 @@ class CodegenContext { funcName: String, funcCode: String, inlineToOuterClass: Boolean): NewFunction = { - // The number of named constants that can exist in the class is limited by the Constant Pool - // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a - // threshold of 1000k bytes to determine when a function should be inlined to a private, nested - // sub-class. val (className, classInstance) = if (inlineToOuterClass) { outerClassName -> "" - } else if (currClassSize > 1000000) { + } else if (currClassSize > CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD) { val className = freshName("NestedClass") val classInstance = freshName("nestedClassInstance") @@ -1089,6 +1085,12 @@ object CodeGenerator extends Logging { // method which is going to be called by the outer class instead of the many small ones val MERGE_SPLIT_METHODS_THRESHOLD = 3 + // The number of named constants that can exist in the class is limited by the Constant Pool + // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a + // threshold of 1000k bytes to determine when a function should be inlined to a private, nested + // sub-class. + val GENERATED_CLASS_SIZE_THRESHOLD = 1000000 + /** * Compile the Java source code into a Java class, using Janino. * From 41c0b2ba6a4e0afe9acd804d9d417f4882e519f2 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 25 Oct 2017 20:11:57 +0200 Subject: [PATCH 15/21] fix indentation --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 5472c55dfb6f7..5655de018f1d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -89,7 +89,9 @@ case class SubExprCodes(codes: Seq[String], states: Map[Expression, SubExprElimi * the outer class, otherwise it contains the name of the * instance of the inner class in the outer class. */ -private[codegen] case class NewFunction(functionName: String, subclassName: Option[String], +private[codegen] case class NewFunction( + functionName: String, + subclassName: Option[String], subclassInstance: Option[String]) /** From 6a1eeca5bbe5ed927b1f9c02a4db02185adda84c Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 26 Oct 2017 10:09:35 +0200 Subject: [PATCH 16/21] rename to inner class consistently --- .../expressions/codegen/CodeGenerator.scala | 70 +++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 5655de018f1d0..f97b705eec777 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -82,17 +82,17 @@ case class SubExprCodes(codes: Seq[String], states: Map[Expression, SubExprElimi * The main information about a new added function. * * @param functionName String representing the name of the function - * @param subclassName Optional value which is empty if the function is added to - * the outer class, otherwise it contains the name of the - * inner class in which the function has been added. - * @param subclassInstance Optional value which is empty if the function is added to - * the outer class, otherwise it contains the name of the - * instance of the inner class in the outer class. + * @param innerClassName Optional value which is empty if the function is added to + * the outer class, otherwise it contains the name of the + * inner class in which the function has been added. + * @param innerClassInstance Optional value which is empty if the function is added to + * the outer class, otherwise it contains the name of the + * instance of the inner class in the outer class. */ -private[codegen] case class NewFunction( +private[codegen] case class NewFunctionSpec( functionName: String, - subclassName: Option[String], - subclassInstance: Option[String]) + innerClassName: Option[String], + innerClassInstance: Option[String]) /** * A context for codegen, tracking a list of objects that could be passed into generated Java @@ -245,8 +245,8 @@ class CodegenContext { /** * Holds the class and instance names to be generated, where `OuterClass` is a placeholder * standing for whichever class is generated as the outermost class and which will contain any - * nested sub-classes. All other classes and instance names in this list will represent private, - * nested sub-classes. + * inner sub-classes. All other classes and instance names in this list will represent private, + * inner sub-classes. */ private val classes: mutable.ListBuffer[(String, String)] = mutable.ListBuffer[(String, String)](outerClassName -> null) @@ -277,8 +277,8 @@ class CodegenContext { /** * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the - * function will be inlined into a new private, nested class, and a class-qualified name for the - * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * function will be inlined into a new private, inner class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inlined to the `OuterClass` the * simple `funcName` will be returned. * * @param funcName the class-unqualified name of the function @@ -288,7 +288,7 @@ class CodegenContext { * it is eventually referenced and a returned qualified function name * cannot otherwise be accessed. * @return the name of the function, qualified by class if it will be inlined to a private, - * nested sub-class + * inner class */ def addNewFunction( funcName: String, @@ -296,16 +296,16 @@ class CodegenContext { inlineToOuterClass: Boolean = false): String = { val newFunction = addNewFunctionInternal(funcName, funcCode, inlineToOuterClass) newFunction match { - case NewFunction(functionName, None, None) => functionName - case NewFunction(functionName, Some(_), Some(subclassInstance)) => - subclassInstance + "." + functionName + case NewFunctionSpec(functionName, None, None) => functionName + case NewFunctionSpec(functionName, Some(_), Some(innerClassInstance)) => + innerClassInstance + "." + functionName } } private[this] def addNewFunctionInternal( funcName: String, funcCode: String, - inlineToOuterClass: Boolean): NewFunction = { + inlineToOuterClass: Boolean): NewFunctionSpec = { val (className, classInstance) = if (inlineToOuterClass) { outerClassName -> "" } else if (currClassSize > CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD) { @@ -322,9 +322,9 @@ class CodegenContext { addNewFunctionToClass(funcName, funcCode, className) if (className == outerClassName) { - NewFunction(funcName, None, None) + NewFunctionSpec(funcName, None, None) } else { - NewFunction(funcName, Some(className), Some(classInstance)) + NewFunctionSpec(funcName, Some(className), Some(classInstance)) } } @@ -769,7 +769,7 @@ class CodegenContext { /** * Splits the generated code of expressions into multiple functions, because function has * 64kb code size limit in JVM. If the class to which the function would be inlined would grow - * beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it + * beyond 1000kb, we declare a private, inner sub-class, and the function is inlined to it * instead, because classes have a constant pool limit of 65,536 named values. * * @param row the variable name of row that is used by expressions @@ -837,39 +837,39 @@ class CodegenContext { // Here we store all the methods which have been added to the outer class. val outerClassFunctions = functions - .filter(_.subclassName.isEmpty) + .filter(_.innerClassName.isEmpty) .map(_.functionName) val argsString = arguments.map(_._2).mkString(", ") - // Here we handle all the methods which have been added to the nested subclasses and + // Here we handle all the methods which have been added to the inner classes and // not to the outer class. // Since they can be many, their direct invocation in the outer class adds many entries // to the outer class' constant pool. This can cause the constant pool to past JVM limit. // To avoid this problem, we group them and we call only the grouping methods in the // outer class. val innerClassToFunctions = functions - .filter(_.subclassName.isDefined) + .filter(_.innerClassName.isDefined) .foldLeft(ListMap.empty[(String, String), Seq[String]]) { case (acc, f) => - val key = (f.subclassName.get, f.subclassInstance.get) + val key = (f.innerClassName.get, f.innerClassInstance.get) acc.updated(key, acc.getOrElse(key, Seq.empty[String]) ++ Seq(f.functionName)) } val innerClassFunctions = innerClassToFunctions.flatMap { - case ((subclassName, subclassInstance), subclassFunctions) => - if (subclassFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { - // Adding a new function to each subclass which contains + case ((innerClassName, innerClassInstance), innerClassFunctions) => + if (innerClassFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { + // Adding a new function to each inner class which contains // the invocation of all the ones which have been added to - // that subclass - val body = foldFunctions(subclassFunctions.map(name => s"$name($argsString)")) + // that inner class + val body = foldFunctions(innerClassFunctions.map(name => s"$name($argsString)")) val code = s""" |private $returnType $func($argString) { | ${makeSplitFunction(body)} |} """.stripMargin - addNewFunctionToClass(func, code, subclassName) - Seq(s"$subclassInstance.$func") + addNewFunctionToClass(func, code, innerClassName) + Seq(s"$innerClassInstance.$func") } else { - subclassFunctions.map(f => s"$subclassInstance.$f") + innerClassFunctions.map(f => s"$innerClassInstance.$f") } } @@ -1089,8 +1089,8 @@ object CodeGenerator extends Logging { // The number of named constants that can exist in the class is limited by the Constant Pool // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a - // threshold of 1000k bytes to determine when a function should be inlined to a private, nested - // sub-class. + // threshold of 1000k bytes to determine when a function should be inlined to a private, inner + // class. val GENERATED_CLASS_SIZE_THRESHOLD = 1000000 /** From be84c4dc3fe40a939e9722f4ddb94e309aa7c841 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 26 Oct 2017 10:44:31 +0200 Subject: [PATCH 17/21] refactor splitExpressions --- .../expressions/codegen/CodeGenerator.scala | 97 ++++++++++++------- 1 file changed, 64 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f97b705eec777..9e69baa5292fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -21,7 +21,6 @@ import java.io.ByteArrayInputStream import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ -import scala.collection.immutable.ListMap import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.language.existentials @@ -840,44 +839,76 @@ class CodegenContext { .filter(_.innerClassName.isEmpty) .map(_.functionName) - val argsString = arguments.map(_._2).mkString(", ") - - // Here we handle all the methods which have been added to the inner classes and - // not to the outer class. - // Since they can be many, their direct invocation in the outer class adds many entries - // to the outer class' constant pool. This can cause the constant pool to past JVM limit. - // To avoid this problem, we group them and we call only the grouping methods in the - // outer class. - val innerClassToFunctions = functions - .filter(_.innerClassName.isDefined) - .foldLeft(ListMap.empty[(String, String), Seq[String]]) { case (acc, f) => - val key = (f.innerClassName.get, f.innerClassInstance.get) - acc.updated(key, acc.getOrElse(key, Seq.empty[String]) ++ Seq(f.functionName)) - } - val innerClassFunctions = innerClassToFunctions.flatMap { - case ((innerClassName, innerClassInstance), innerClassFunctions) => - if (innerClassFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { - // Adding a new function to each inner class which contains - // the invocation of all the ones which have been added to - // that inner class - val body = foldFunctions(innerClassFunctions.map(name => s"$name($argsString)")) - val code = s""" - |private $returnType $func($argString) { - | ${makeSplitFunction(body)} - |} - """.stripMargin - addNewFunctionToClass(func, code, innerClassName) - Seq(s"$innerClassInstance.$func") - } else { - innerClassFunctions.map(f => s"$innerClassInstance.$f") - } - } + val innerClassFunctions = generateInnerClassesMethodsCalls( + functions.filter(_.innerClassName.nonEmpty), + func, + arguments, + returnType, + makeSplitFunction, + foldFunctions) + val argsString = arguments.map(_._2).mkString(", ") foldFunctions((outerClassFunctions ++ innerClassFunctions).map( name => s"$name($argsString)")) } } + /** + * Here we handle all the methods which have been added to the inner classes and + * not to the outer class. + * Since they can be many, their direct invocation in the outer class adds many entries + * to the outer class' constant pool. This can cause the constant pool to past JVM limit. + * Moreover, this can cause also the outer class method where all the invocations are + * performed to grow beyond the 64k limit. + * To avoid these problems, we group them and we call only the grouping methods in the + * outer class. + * + * @param functions a [[Seq]] of [[NewFunctionSpec]] defined in the inner classes + * @param funcName the split function name base. + * @param arguments the list of (type, name) of the arguments of the split function. + * @param returnType the return type of the split function. + * @param makeSplitFunction makes split function body, e.g. add preparation or cleanup. + * @param foldFunctions folds the split function calls. + * @return an [[Iterable]] containing the methods' invocations + */ + private def generateInnerClassesMethodsCalls( + functions: Seq[NewFunctionSpec], + funcName: String, + arguments: Seq[(String, String)], + returnType: String, + makeSplitFunction: String => String, + foldFunctions: Seq[String] => String): Iterable[String] = { + val innerClassToFunctions = mutable.ListMap.empty[(String, String), Seq[String]] + functions.foreach(f => { + val key = (f.innerClassName.get, f.innerClassInstance.get) + innerClassToFunctions.update(key, f.functionName +: + innerClassToFunctions.getOrElse(key, Seq.empty[String])) + }) + + val argDefinitionString = arguments.map { case (t, name) => s"$t $name" }.mkString(", ") + val argInvocationString = arguments.map(_._2).mkString(", ") + + innerClassToFunctions.flatMap { + case ((innerClassName, innerClassInstance), innerClassFunctions) => + if (innerClassFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { + // Adding a new function to each inner class which contains + // the invocation of all the ones which have been added to + // that inner class + val body = foldFunctions(innerClassFunctions.map(name => + s"$name($argInvocationString)")) + val code = s""" + |private $returnType $funcName($argDefinitionString) { + | ${makeSplitFunction(body)} + |} + """.stripMargin + addNewFunctionToClass(funcName, code, innerClassName) + Seq(s"$innerClassInstance.$funcName") + } else { + innerClassFunctions.map(f => s"$innerClassInstance.$f") + } + } + } + /** * Perform a function which generates a sequence of ExprCodes with a given mapping between * expressions and common expressions, instead of using the mapping in current context. From a39098dc0319cbd207e132aed7214f1c02245ec6 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 26 Oct 2017 15:37:45 +0200 Subject: [PATCH 18/21] fix ordering error --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 9e69baa5292fc..a5266d4cbf34b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -884,6 +884,9 @@ class CodegenContext { innerClassToFunctions.update(key, f.functionName +: innerClassToFunctions.getOrElse(key, Seq.empty[String])) }) + // for performance reasons, the functions are prepended, instead of appended, + // thus they are in reversed order + innerClassToFunctions.transform { case (_, functions) => functions.reverse } val argDefinitionString = arguments.map { case (t, name) => s"$t $name" }.mkString(", ") val argInvocationString = arguments.map(_._2).mkString(", ") From 6e8cd00e6eab9fb64faae65214547457a86bbfcb Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 27 Oct 2017 11:14:24 +0200 Subject: [PATCH 19/21] further refactor --- .../expressions/codegen/CodeGenerator.scala | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index a5266d4cbf34b..c8ed77421450f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -834,22 +834,20 @@ class CodegenContext { addNewFunctionInternal(name, code, inlineToOuterClass = false) } - // Here we store all the methods which have been added to the outer class. - val outerClassFunctions = functions - .filter(_.innerClassName.isEmpty) - .map(_.functionName) + val (outerClassFunctions, innerClassFunctions) = functions.partition(_.innerClassName.isEmpty) - val innerClassFunctions = generateInnerClassesMethodsCalls( - functions.filter(_.innerClassName.nonEmpty), + val argsString = arguments.map(_._2).mkString(", ") + val outerClassFunctionCalls = outerClassFunctions.map(f => s"${f.functionName}($argsString)") + + val innerClassFunctionCalls = generateInnerClassesFunctionCalls( + innerClassFunctions, func, arguments, returnType, makeSplitFunction, foldFunctions) - val argsString = arguments.map(_._2).mkString(", ") - foldFunctions((outerClassFunctions ++ innerClassFunctions).map( - name => s"$name($argsString)")) + foldFunctions(outerClassFunctionCalls ++ innerClassFunctionCalls) } } @@ -871,33 +869,33 @@ class CodegenContext { * @param foldFunctions folds the split function calls. * @return an [[Iterable]] containing the methods' invocations */ - private def generateInnerClassesMethodsCalls( + private def generateInnerClassesFunctionCalls( functions: Seq[NewFunctionSpec], funcName: String, arguments: Seq[(String, String)], returnType: String, makeSplitFunction: String => String, foldFunctions: Seq[String] => String): Iterable[String] = { - val innerClassToFunctions = mutable.ListMap.empty[(String, String), Seq[String]] + val innerClassToFunctions = mutable.LinkedHashMap.empty[(String, String), Seq[String]] functions.foreach(f => { val key = (f.innerClassName.get, f.innerClassInstance.get) innerClassToFunctions.update(key, f.functionName +: innerClassToFunctions.getOrElse(key, Seq.empty[String])) }) - // for performance reasons, the functions are prepended, instead of appended, - // thus they are in reversed order - innerClassToFunctions.transform { case (_, functions) => functions.reverse } val argDefinitionString = arguments.map { case (t, name) => s"$t $name" }.mkString(", ") val argInvocationString = arguments.map(_._2).mkString(", ") innerClassToFunctions.flatMap { case ((innerClassName, innerClassInstance), innerClassFunctions) => - if (innerClassFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { + // for performance reasons, the functions are prepended, instead of appended, + // thus here they are in reversed order + val orderedFunctions = innerClassFunctions.reverse + if (orderedFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { // Adding a new function to each inner class which contains // the invocation of all the ones which have been added to // that inner class - val body = foldFunctions(innerClassFunctions.map(name => + val body = foldFunctions(orderedFunctions.map(name => s"$name($argInvocationString)")) val code = s""" |private $returnType $funcName($argDefinitionString) { @@ -905,9 +903,9 @@ class CodegenContext { |} """.stripMargin addNewFunctionToClass(funcName, code, innerClassName) - Seq(s"$innerClassInstance.$funcName") + Seq(s"$innerClassInstance.$funcName($argInvocationString)") } else { - innerClassFunctions.map(f => s"$innerClassInstance.$f") + orderedFunctions.map(f => s"$innerClassInstance.$f($argInvocationString)") } } } From 36d8e2e1596266f6f7552e2b21105d167e55efc1 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 27 Oct 2017 18:06:57 +0200 Subject: [PATCH 20/21] addressing review comments --- .../expressions/codegen/CodeGenerator.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index c8ed77421450f..96f756dca4020 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -879,8 +879,8 @@ class CodegenContext { val innerClassToFunctions = mutable.LinkedHashMap.empty[(String, String), Seq[String]] functions.foreach(f => { val key = (f.innerClassName.get, f.innerClassInstance.get) - innerClassToFunctions.update(key, f.functionName +: - innerClassToFunctions.getOrElse(key, Seq.empty[String])) + val value = f.functionName +: innerClassToFunctions.getOrElse(key, Seq.empty[String]) + innerClassToFunctions.update(key, value) }) val argDefinitionString = arguments.map { case (t, name) => s"$t $name" }.mkString(", ") @@ -892,9 +892,18 @@ class CodegenContext { // thus here they are in reversed order val orderedFunctions = innerClassFunctions.reverse if (orderedFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) { - // Adding a new function to each inner class which contains - // the invocation of all the ones which have been added to - // that inner class + // Adding a new function to each inner class which contains the invocation of all the + // ones which have been added to that inner class. For example, + // private class NestedClass { + // private void apply_862(InternalRow i) { ... } + // private void apply_863(InternalRow i) { ... } + // ... + // private void apply(InternalRow i) { + // apply_862(i); + // apply_863(i); + // ... + // } + // } val body = foldFunctions(orderedFunctions.map(name => s"$name($argInvocationString)")) val code = s""" From 4952880dcb9830616fde1c2c62f148347bbb0b55 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 27 Oct 2017 18:18:17 +0200 Subject: [PATCH 21/21] code style fix --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 96f756dca4020..58738b52b299f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -880,7 +880,7 @@ class CodegenContext { functions.foreach(f => { val key = (f.innerClassName.get, f.innerClassInstance.get) val value = f.functionName +: innerClassToFunctions.getOrElse(key, Seq.empty[String]) - innerClassToFunctions.update(key, value) + innerClassToFunctions.put(key, value) }) val argDefinitionString = arguments.map { case (t, name) => s"$t $name" }.mkString(", ") @@ -904,8 +904,7 @@ class CodegenContext { // ... // } // } - val body = foldFunctions(orderedFunctions.map(name => - s"$name($argInvocationString)")) + val body = foldFunctions(orderedFunctions.map(name => s"$name($argInvocationString)")) val code = s""" |private $returnType $funcName($argDefinitionString) { | ${makeSplitFunction(body)}