Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Analyzer, TypeCheckResult, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.sequenceOption
import org.apache.spark.sql.types._

////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -96,15 +95,13 @@ abstract class Expression extends TreeNode[Expression] {
ctx.subExprEliminationExprs.get(this).map { subExprState =>
// This expression is repeated meaning the code to evaluated has already been added
// as a function and called in advance. Just use it.
val code = s"/* ${this.toCommentSafeString} */"
ExprCode(code, subExprState.isNull, subExprState.value)
ExprCode("", subExprState.isNull, subExprState.value)
}.getOrElse {
val isNull = ctx.freshName("isNull")
val value = ctx.freshName("value")
val ve = ExprCode("", isNull, value)
ve.code = genCode(ctx, ve)
// Add `this` in the comment.
ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code.trim)
ve
}
}

Expand Down Expand Up @@ -221,7 +218,7 @@ abstract class Expression extends TreeNode[Expression] {
* Returns the string representation of this expression that is safe to be put in
* code comments of generated code.
*/
protected def toCommentSafeString: String = this.toString
def toCommentSafeString: String = this.toString
.replace("*/", "\\*\\/")
.replace("\\u", "\\\\u")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import org.apache.spark.util.Utils
/**
* Java source for evaluating an [[Expression]] given a [[InternalRow]] of input.
*
* @param code The sequence of statements required to evaluate the expression.
* @param isNull A term that holds a boolean value representing whether the expression evaluated
* to null.
* @param value A term for a (possibly primitive) value of the result of the evaluation. Not
* valid if `isNull` is set to `true`.
* @param code The Java source code required to evaluate the expression.
* @param isNull Name of the variable that holds the boolean value representing whether the
* expression evaluated to null.
* @param value Name of the variable that holds the (possibly primitive) value of the result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that value could be a constant too

* of the evaluation. Not valid if `isNull` is `true`.
*/
case class ExprCode(var code: String, var isNull: String, var value: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu

public java.lang.Object apply(java.lang.Object _i) {
InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i;
// project list: ${expressions.map(_.toCommentSafeString).mkString(", ")}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use /* ...... */ in case the comments cross multiple lines

$evalSubexpr
$allProjections
// copy all the results into MutableRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
}

public boolean eval(InternalRow ${ctx.INPUT_ROW}) {
// predicate: ${predicate.toCommentSafeString}
${eval.code}
return !${eval.isNull} && ${eval.value};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]

public java.lang.Object apply(java.lang.Object _i) {
InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i;
// project list: ${expressions.map(_.toCommentSafeString).mkString(", ")}
$allExpressions
return mutableRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro

val code =
s"""
// project list: ${expressions.map(_.toCommentSafeString).mkString("[", ", ", "]")}
$resetBufferHolder
$evalSubexpr
$writeExpressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ case class TungstenAggregate(
ctx.addNewFunction(doAgg,
s"""
| private void $doAgg() throws java.io.IOException {
| // initialize aggregation buffer
| // initialize agg buffer:
| // ${aggregateExpressions.map(_.toCommentSafeString).mkString("[", ", ", "]")}
| ${bufVars.map(_.code).mkString("\n")}
|
| ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
Expand All @@ -222,7 +223,7 @@ case class TungstenAggregate(
// only have DeclarativeAggregate
val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
val inputAttrs = functions.flatMap(_.aggBufferAttributes) ++ child.output
val updateExpr = aggregateExpressions.flatMap { e =>
val updateExpr: Seq[Expression] = aggregateExpressions.flatMap { e =>
e.mode match {
case Partial | Complete =>
e.aggregateFunction.asInstanceOf[DeclarativeAggregate].updateExpressions
Expand All @@ -242,9 +243,9 @@ case class TungstenAggregate(
}

s"""
| // do aggregate
| // aggregate: ${updateExpr.map(_.toCommentSafeString).mkString("[", ", ", "]")}
| ${aggVals.map(_.code).mkString("\n")}
| // update aggregation buffer
| // update agg buffer
| ${updates.mkString("")}
""".stripMargin
}
Expand Down Expand Up @@ -427,17 +428,17 @@ case class TungstenAggregate(
}

s"""
// generate grouping key
// grouping key: ${groupingExpressions.map(_.toCommentSafeString).mkString("[", ", ", "]")}
${keyCode.code}
UnsafeRow $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key);
if ($buffer == null) {
// failed to allocate the first page
throw new OutOfMemoryError("No enough memory for aggregation");
}

// evaluate aggregate function
// aggregate: ${updateExpr.map(_.toCommentSafeString).mkString("[", ", ", "]")}
${evals.map(_.code).mkString("\n")}
// update aggregate buffer
// update agg buffer
${updates.mkString("\n")}
"""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
ctx.currentVars = input
val output = exprs.map(_.gen(ctx))
s"""
| // project list: ${exprs.map(_.toCommentSafeString).mkString("[", ", ", "]")}
| ${output.map(_.code).mkString("\n")}
|
| ${consume(ctx, output)}
Expand Down Expand Up @@ -89,11 +90,12 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val expr = ExpressionCanonicalizer.execute(
val expr: Expression = ExpressionCanonicalizer.execute(
BindReferences.bindReference(condition, child.output))
ctx.currentVars = input
val eval = expr.gen(ctx)
s"""
| // predicate: ${expr.toCommentSafeString}
| ${eval.code}
| if (!${eval.isNull} && ${eval.value}) {
| ${consume(ctx, ctx.currentVars)}
Expand Down