Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,10 @@ class CodegenContext {
def initMutableStates(): String = {
// It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in
// `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones.
mutableStates.distinct.map(_._3).mkString("\n")
val initCodes = mutableStates.distinct.map(_._3 + "\n")
// The generated initialization code may exceed 64kb function size limit in JVM if there are too
// many mutable states, so split it into multiple functions.
splitExpressions(initCodes, "init", Nil)
}

/**
Expand Down Expand Up @@ -589,6 +592,11 @@ class CodegenContext {
// Cannot split these expressions because they are not created from a row object.
return expressions.mkString("\n")
}
splitExpressions(expressions, "apply", ("InternalRow", row) :: Nil)
}

private def splitExpressions(
expressions: Seq[String], funcName: String, arguments: Seq[(String, String)]): String = {
Copy link
Member

@kiszk kiszk Oct 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about exposing this API as public? This API looks more flexible and reusable.Is there any reason to declare as private?
(I replaced "non-public" with "public" in the first statement).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't have a special reason.
Should we make it public? @rxin, @davies

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val blocks = new ArrayBuffer[String]()
val blockBuilder = new StringBuilder()
for (code <- expressions) {
Expand All @@ -608,19 +616,19 @@ class CodegenContext {
// inline execution if only one block
blocks.head
} else {
val apply = freshName("apply")
val func = freshName(funcName)
val functions = blocks.zipWithIndex.map { case (body, i) =>
val name = s"${apply}_$i"
val name = s"${func}_$i"
val code = s"""
|private void $name(InternalRow $row) {
|private void $name(${arguments.map { case (t, name) => s"$t $name" }.mkString(", ")}) {
| $body
|}
""".stripMargin
addNewFunction(name, code)
name
}

functions.map(name => s"$name($row);").mkString("\n")
functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")});").mkString("\n")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
private Object[] references;
private MutableRow mutableRow;
${ctx.declareMutableStates()}
${ctx.declareAddedFunctions()}

public SpecificMutableProjection(Object[] references) {
this.references = references;
mutableRow = new $genericMutableRowType(${expressions.size});
${ctx.initMutableStates()}
}

${ctx.declareAddedFunctions()}

public ${classOf[BaseMutableProjection].getName} target(MutableRow row) {
mutableRow = row;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR

private Object[] references;
${ctx.declareMutableStates()}
${ctx.declareAddedFunctions()}

public SpecificOrdering(Object[] references) {
this.references = references;
${ctx.initMutableStates()}
}

${ctx.declareAddedFunctions()}

public int compare(InternalRow a, InternalRow b) {
InternalRow ${ctx.INPUT_ROW} = null; // Holds current row being evaluated.
$comparisons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
protected def create(predicate: Expression): ((InternalRow) => Boolean) = {
val ctx = newCodeGenContext()
val eval = predicate.genCode(ctx)

val codeBody = s"""
public SpecificPredicate generate(Object[] references) {
return new SpecificPredicate(references);
Expand All @@ -48,13 +49,14 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
class SpecificPredicate extends ${classOf[Predicate].getName} {
private final Object[] references;
${ctx.declareMutableStates()}
${ctx.declareAddedFunctions()}

public SpecificPredicate(Object[] references) {
this.references = references;
${ctx.initMutableStates()}
}

${ctx.declareAddedFunctions()}

public boolean eval(InternalRow ${ctx.INPUT_ROW}) {
${eval.code}
return !${eval.isNull} && ${eval.value};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
"""
}
val allExpressions = ctx.splitExpressions(ctx.INPUT_ROW, expressionCodes)

val codeBody = s"""
public java.lang.Object generate(Object[] references) {
return new SpecificSafeProjection(references);
Expand All @@ -165,14 +166,15 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
private Object[] references;
private MutableRow mutableRow;
${ctx.declareMutableStates()}
${ctx.declareAddedFunctions()}

public SpecificSafeProjection(Object[] references) {
this.references = references;
mutableRow = (MutableRow) references[references.length - 1];
${ctx.initMutableStates()}
}

${ctx.declareAddedFunctions()}

public java.lang.Object apply(java.lang.Object _i) {
InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i;
$allExpressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,14 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro

private Object[] references;
${ctx.declareMutableStates()}
${ctx.declareAddedFunctions()}

public SpecificUnsafeProjection(Object[] references) {
this.references = references;
${ctx.initMutableStates()}
}

${ctx.declareAddedFunctions()}

// Scala.Function1 need this
public java.lang.Object apply(java.lang.Object row) {
return apply((InternalRow) row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@

package org.apache.spark.sql.catalyst.expressions

import java.sql.Timestamp

import org.apache.spark.SparkFunSuite
import org.apache.spark.metrics.source.CodegenMetrics
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.objects.{CreateExternalRow, GetExternalRowField, ValidateExternalType}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.ThreadUtils
Expand Down Expand Up @@ -164,6 +166,23 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("SPARK-17702: split wide constructor into blocks due to JVM code size limit") {
val length = 5000
val expressions = Seq.fill(length) {
ToUTCTimestamp(
Literal.create(Timestamp.valueOf("2015-07-24 00:00:00"), TimestampType),
Literal.create("PST", StringType))
}
val plan = GenerateMutableProjection.generate(expressions)
val actual = plan(new GenericMutableRow(length)).toSeq(expressions.map(_.dataType))
val expected = Seq.fill(length)(
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2015-07-24 07:00:00")))

if (!checkResult(actual, expected)) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}

test("test generated safe and unsafe projection") {
val schema = new StructType(Array(
StructField("a", StringType, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,16 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {

private Object[] references;
private scala.collection.Iterator[] inputs;
${ctx.declareMutableStates()}

public GeneratedIterator(Object[] references) {
this.references = references;
}

public void init(int index, scala.collection.Iterator inputs[]) {
public void init(int index, scala.collection.Iterator[] inputs) {
partitionIndex = index;
this.inputs = inputs;
${ctx.initMutableStates()}
}

Expand Down