From 2c2b0079d4397cb59a72a8f4631d6870b54ff0c4 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 1 Feb 2016 23:06:46 -0800 Subject: [PATCH] [SPARK-13135][SQL] Don't print expressions recursively in generated code --- .../sql/catalyst/expressions/Expression.scala | 11 ++++------- .../expressions/codegen/CodeGenerator.scala | 10 +++++----- .../codegen/GenerateMutableProjection.scala | 1 + .../expressions/codegen/GeneratePredicate.scala | 1 + .../codegen/GenerateSafeProjection.scala | 1 + .../codegen/GenerateUnsafeProjection.scala | 1 + .../execution/aggregate/TungstenAggregate.scala | 15 ++++++++------- .../spark/sql/execution/basicOperators.scala | 4 +++- 8 files changed, 24 insertions(+), 20 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 353fb92581d3..672c5f1324fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -18,10 +18,9 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{Analyzer, TypeCheckResult, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.trees.TreeNode -import org.apache.spark.sql.catalyst.util.sequenceOption import org.apache.spark.sql.types._ //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -96,15 +95,13 @@ abstract class Expression extends TreeNode[Expression] { ctx.subExprEliminationExprs.get(this).map { subExprState => // This expression is repeated meaning the code to evaluated has already been added // as a function and called in advance. Just use it. - val code = s"/* ${this.toCommentSafeString} */" - ExprCode(code, subExprState.isNull, subExprState.value) + ExprCode("", subExprState.isNull, subExprState.value) }.getOrElse { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val ve = ExprCode("", isNull, value) ve.code = genCode(ctx, ve) - // Add `this` in the comment. - ve.copy(s"/* ${this.toCommentSafeString} */\n" + ve.code.trim) + ve } } @@ -221,7 +218,7 @@ abstract class Expression extends TreeNode[Expression] { * Returns the string representation of this expression that is safe to be put in * code comments of generated code. */ - protected def toCommentSafeString: String = this.toString + def toCommentSafeString: String = this.toString .replace("*/", "\\*\\/") .replace("\\u", "\\\\u") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index a30aba16170a..f8bb9df19a65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -36,11 +36,11 @@ import org.apache.spark.util.Utils /** * Java source for evaluating an [[Expression]] given a [[InternalRow]] of input. * - * @param code The sequence of statements required to evaluate the expression. - * @param isNull A term that holds a boolean value representing whether the expression evaluated - * to null. - * @param value A term for a (possibly primitive) value of the result of the evaluation. Not - * valid if `isNull` is set to `true`. + * @param code The Java source code required to evaluate the expression. + * @param isNull Name of the variable that holds the boolean value representing whether the + * expression evaluated to null. + * @param value Name of the variable that holds the (possibly primitive) value of the result + * of the evaluation. Not valid if `isNull` is `true`. */ case class ExprCode(var code: String, var isNull: String, var value: String) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 5b4dc8df8622..87f1da6aa039 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -125,6 +125,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu public java.lang.Object apply(java.lang.Object _i) { InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; + // project list: ${expressions.map(_.toCommentSafeString).mkString(", ")} $evalSubexpr $allProjections // copy all the results into MutableRow diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index 58065d956f07..ca608c0dc968 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -56,6 +56,7 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool } public boolean eval(InternalRow ${ctx.INPUT_ROW}) { + // predicate: ${predicate.toCommentSafeString} ${eval.code} return !${eval.isNull} && ${eval.value}; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index 4cb6af9d9fe9..9886cf6a0ebd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -171,6 +171,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] public java.lang.Object apply(java.lang.Object _i) { InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; + // project list: ${expressions.map(_.toCommentSafeString).mkString(", ")} $allExpressions return mutableRow; } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 6aa9cbf08bdb..8111fd70f5c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -332,6 +332,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro val code = s""" + // project list: ${expressions.map(_.toCommentSafeString).mkString("[", ", ", "]")} $resetBufferHolder $evalSubexpr $writeExpressions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 57db7262fdaf..5f5f14434939 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -198,7 +198,8 @@ case class TungstenAggregate( ctx.addNewFunction(doAgg, s""" | private void $doAgg() throws java.io.IOException { - | // initialize aggregation buffer + | // initialize agg buffer: + | // ${aggregateExpressions.map(_.toCommentSafeString).mkString("[", ", ", "]")} | ${bufVars.map(_.code).mkString("\n")} | | ${child.asInstanceOf[CodegenSupport].produce(ctx, this)} @@ -222,7 +223,7 @@ case class TungstenAggregate( // only have DeclarativeAggregate val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate]) val inputAttrs = functions.flatMap(_.aggBufferAttributes) ++ child.output - val updateExpr = aggregateExpressions.flatMap { e => + val updateExpr: Seq[Expression] = aggregateExpressions.flatMap { e => e.mode match { case Partial | Complete => e.aggregateFunction.asInstanceOf[DeclarativeAggregate].updateExpressions @@ -242,9 +243,9 @@ case class TungstenAggregate( } s""" - | // do aggregate + | // aggregate: ${updateExpr.map(_.toCommentSafeString).mkString("[", ", ", "]")} | ${aggVals.map(_.code).mkString("\n")} - | // update aggregation buffer + | // update agg buffer | ${updates.mkString("")} """.stripMargin } @@ -427,7 +428,7 @@ case class TungstenAggregate( } s""" - // generate grouping key + // grouping key: ${groupingExpressions.map(_.toCommentSafeString).mkString("[", ", ", "]")} ${keyCode.code} UnsafeRow $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key); if ($buffer == null) { @@ -435,9 +436,9 @@ case class TungstenAggregate( throw new OutOfMemoryError("No enough memory for aggregation"); } - // evaluate aggregate function + // aggregate: ${updateExpr.map(_.toCommentSafeString).mkString("[", ", ", "]")} ${evals.map(_.code).mkString("\n")} - // update aggregate buffer + // update agg buffer ${updates.mkString("\n")} """ } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index fd81531c9316..ca1f7e92c012 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -51,6 +51,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) ctx.currentVars = input val output = exprs.map(_.gen(ctx)) s""" + | // project list: ${exprs.map(_.toCommentSafeString).mkString("[", ", ", "]")} | ${output.map(_.code).mkString("\n")} | | ${consume(ctx, output)} @@ -89,11 +90,12 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { - val expr = ExpressionCanonicalizer.execute( + val expr: Expression = ExpressionCanonicalizer.execute( BindReferences.bindReference(condition, child.output)) ctx.currentVars = input val eval = expr.gen(ctx) s""" + | // predicate: ${expr.toCommentSafeString} | ${eval.code} | if (!${eval.isNull} && ${eval.value}) { | ${consume(ctx, ctx.currentVars)}