From 3f99740c1151e6aa0bab7eaf6a9f197cde6bace6 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 22 Feb 2016 18:54:04 +0900 Subject: [PATCH 01/29] add the source file name and line into a generated Java code --- .../sql/catalyst/parser/ParseDriver.scala | 0 .../spark/sql/catalyst/trees/TreeNode.scala | 11 ++++---- .../sql/catalyst/trees/TreeNodeSuite.scala | 25 ++++++++++++++++++- 3 files changed, 30 insertions(+), 6 deletions(-) mode change 100644 => 100755 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala old mode 100644 new mode 100755 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 3ebd815dce32c..5dd480d24e19d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -41,6 +41,7 @@ import org.apache.spark.util.Utils private class MutableInt(var i: Int) case class Origin( + callSite: Option[String] = None, line: Option[Int] = None, startPosition: Option[Int] = None) @@ -58,21 +59,21 @@ object CurrentOrigin { def reset(): Unit = value.set(Origin()) - def setPosition(line: Int, start: Int): Unit = { + def setPosition(callSite: String, line: Int, start: Int): Unit = { value.set( - value.get.copy(line = Some(line), startPosition = Some(start))) + value.get.copy(callSite = Some(callSite), line = Some(line), startPosition = Some(start))) } def withOrigin[A](o: Origin)(f: => A): A = { + val current = get set(o) - val ret = try f finally { reset() } - reset() + val ret = try f finally { set(current) } ret } } // scalastyle:off -abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { +abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Serializable { // scalastyle:on self: BaseType => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 6a188e7e55126..83a6ae9e3fb74 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.trees import scala.collection.mutable.ArrayBuffer +import org.apache.spark.SparkContext import org.apache.spark.SparkFunSuite +import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback @@ -117,7 +119,7 @@ class TreeNodeSuite extends SparkFunSuite { } test("preserves origin") { - CurrentOrigin.setPosition(1, 1) + CurrentOrigin.setPosition("TreeNodeSuite.scala:120", 1, 1) val add = Add(Literal(1), Literal(1)) CurrentOrigin.reset() @@ -125,10 +127,31 @@ class TreeNodeSuite extends SparkFunSuite { case Literal(1, _) => Literal(2) } + assert(transformed.origin.callSite.isDefined) assert(transformed.origin.line.isDefined) assert(transformed.origin.startPosition.isDefined) } + test("preserves origin thru SerDe") { + val sc = new SparkContext("local", "test") + val callSite = "TreeNodeSuite.scala:137" + val line = 1 + val startPosition = 2 + CurrentOrigin.setPosition(callSite, line, startPosition) + val add = Add(Literal(1), Literal(2)) + + val ser = sc.env.closureSerializer.newInstance() + val serBinary = ser.serialize(add) + val deadd = ser.deserialize[Expression](serBinary, Thread.currentThread.getContextClassLoader) + + assert(deadd.origin.callSite.isDefined && + deadd.origin.callSite.get == callSite) + assert(deadd.origin.line.isDefined && + deadd.origin.line.get == line) + assert(deadd.origin.startPosition.isDefined && + deadd.origin.startPosition.get == startPosition) + } + test("foreach up") { val actual = new ArrayBuffer[String]() val expected = Seq("1", "2", "3", "4", "-", "*", "+") From ec0c1a90de988bb19171f616cffd85a6bef4c8e0 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 22 Feb 2016 23:32:50 +0900 Subject: [PATCH 02/29] commit Expression.scala --- .../spark/sql/catalyst/util/package.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 4005087dad05a..53f3cfe078915 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -155,6 +155,29 @@ package object util { def toPrettySQL(e: Expression): String = usePrettyExpression(e).sql + /** + * Returns the string representation of this expression that is safe to be put in + * code comments of generated code. The length is capped at 128 characters. + */ + def toCommentSafeString(str: String): String = { + val len = math.min(str.length, 128) + val suffix = if (str.length > len) "..." else "" + str.substring(0, len).replace("*/", "\\*\\/").replace("\\u", "\\\\u") + suffix + } + + /** + * Returns the string representation of this expression with origin that is safe to be put in + * code comments of generated code. + */ + def toCommentSafeString(expr: Expression): String = { + val str = if (expr.origin.callSite.isDefined && !expr.isInstanceOf[BoundReference]) { + expr.toString + " @ " + expr.origin.callSite.get + } else { + expr.toString + } + str.replace("*/", "\\*\\/").replace("\\u", "\\\\u") + } + /* FIX ME implicit class debugLogging(a: Any) { def debugLogging() { From 899bc64a0d49543223e5ebae8dc19eb0d8175dbb Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 26 Feb 2016 01:23:46 +0900 Subject: [PATCH 03/29] add a message to show the origin of a generated method as possible when an exception occurs --- .../codegen/GenerateMutableProjection.scala | 20 ++++++++++++------ .../codegen/GenerateOrdering.scala | 14 ++++++++++--- .../codegen/GeneratePredicate.scala | 11 ++++++++-- .../codegen/GenerateSafeProjection.scala | 21 ++++++++++++++----- .../codegen/GenerateUnsafeProjection.scala | 17 +++++++++++++-- .../sql/execution/WholeStageCodegenExec.scala | 9 +++++++- 6 files changed, 73 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 0f82d2e613c73..2bfe1e5882732 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -94,6 +94,8 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP val allProjections = ctx.splitExpressions(ctx.INPUT_ROW, projectionCodes) val allUpdates = ctx.splitExpressions(ctx.INPUT_ROW, updates) + val callSite = + if (validExpr.isEmpty) "unknown" else validExpr(0).origin.callSite.getOrElse("unknown") val codeBody = s""" public java.lang.Object generate(Object[] references) { return new SpecificMutableProjection(references); @@ -123,12 +125,18 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP } public java.lang.Object apply(java.lang.Object _i) { - InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; - $evalSubexpr - $allProjections - // copy all the results into MutableRow - $allUpdates - return mutableRow; + try { + InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; + $evalSubexpr + $allProjections + // copy all the results into MutableRow + $allUpdates + return mutableRow; + } catch (final Throwable e) { + org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); + logger.error("The method apply() is generated for ${callSite}"); + throw e; + } } } """ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index c10829d4f14f3..4a5ef55536a01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -113,6 +113,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR protected def create(ordering: Seq[SortOrder]): BaseOrdering = { val ctx = newCodeGenContext() val comparisons = genComparisons(ctx, ordering) + val callSite = + if (validExpr.isEmpty) "unknown" else validExpr(0).origin.callSite.getOrElse("unknown") val codeBody = s""" public SpecificOrdering generate(Object[] references) { return new SpecificOrdering(references); @@ -130,9 +132,15 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR } public int compare(InternalRow a, InternalRow b) { - InternalRow ${ctx.INPUT_ROW} = null; // Holds current row being evaluated. - $comparisons - return 0; + try { + InternalRow ${ctx.INPUT_ROW} = null; // Holds current row being evaluated. + $comparisons + return 0; + } catch (final Throwable e) { + org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); + logger.error("The method compare() is generated for ${callSite}"); + throw e; + } } }""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index 106bb27964cab..8cedaa368407b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -56,8 +56,15 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool } public boolean eval(InternalRow ${ctx.INPUT_ROW}) { - ${eval.code} - return !${eval.isNull} && ${eval.value}; + try { + ${eval.code} + return !${eval.isNull} && ${eval.value}; + } catch (final Throwable e) { + org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); + logger.error("The method eval() is generated for " + + "${predicate.origin.callSite.getOrElse("unknown")}"); + throw e; + } } }""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index b891f94673752..e7aabfd5a2ef9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -139,8 +139,11 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] protected def create(expressions: Seq[Expression]): Projection = { val ctx = newCodeGenContext() - val expressionCodes = expressions.zipWithIndex.map { - case (NoOp, _) => "" + val validExpr = expressions.filter { + case NoOp => false + case _ => true + } + val expressionCodes = validExpr.zipWithIndex.map { case (e, i) => val evaluationCode = e.genCode(ctx) val converter = convertToSafe(ctx, evaluationCode.value, e.dataType) @@ -155,6 +158,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] """ } val allExpressions = ctx.splitExpressions(ctx.INPUT_ROW, expressionCodes) + val callSite = + if (validExpr.isEmpty) "unknown" else validExpr(0).origin.callSite.getOrElse("unknown") val codeBody = s""" public java.lang.Object generate(Object[] references) { return new SpecificSafeProjection(references); @@ -174,9 +179,15 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] } public java.lang.Object apply(java.lang.Object _i) { - InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; - $allExpressions - return mutableRow; + try { + InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; + $allExpressions + return mutableRow; + } catch (final Throwable e) { + org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); + logger.error("The method apply() is generated for ${callSite}"); + throw e; + } } } """ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index 5efba4b3a6087..e0000ce9f6a21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp import org.apache.spark.sql.types._ /** @@ -359,9 +360,15 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro private def create( expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection = { + val validExpr = expressions.filter { + case NoOp => false + case _ => true + } val ctx = newCodeGenContext() val eval = createCode(ctx, expressions, subexpressionEliminationEnabled) + val callSite = + if (validExpr.isEmpty) "unknown" else validExpr(0).origin.callSite.getOrElse("unknown") val codeBody = s""" public java.lang.Object generate(Object[] references) { return new SpecificUnsafeProjection(references); @@ -384,8 +391,14 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro } public UnsafeRow apply(InternalRow ${ctx.INPUT_ROW}) { - ${eval.code.trim} - return ${eval.value}; + try { + ${eval.code.trim} + return ${eval.value}; + } catch (final Throwable e) { + org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); + logger.error("The method apply() is generated for ${callSite}"); + throw e; + } } } """ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index cd9ba7c75b91d..ad251776cc3b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -328,7 +328,14 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co ${ctx.declareAddedFunctions()} protected void processNext() throws java.io.IOException { - ${code.trim} + try { + ${code.trim} + } catch (final Throwable e) { + org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); + logger.error("The method processNext() is generated for " + + "${plan.origin.callSite.getOrElse("unknown")}"); + throw e; + } } } """.trim From bf72c139a3b2bc40e7ba4445bcb99fb36a66a600 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sat, 27 Feb 2016 22:21:22 +0900 Subject: [PATCH 04/29] make ExpressionSet serializable --- .../apache/spark/sql/catalyst/expressions/ExpressionSet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala index 644a5b28a2151..43b0ec1880803 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionSet.scala @@ -50,7 +50,7 @@ object ExpressionSet { class ExpressionSet protected( protected val baseSet: mutable.Set[Expression] = new mutable.HashSet, protected val originals: mutable.Buffer[Expression] = new ArrayBuffer) - extends Set[Expression] { + extends Set[Expression] with Serializable { protected def add(e: Expression): Unit = { if (!baseSet.contains(e.canonicalized)) { From 27b13d2f0ee0f45597c71eae0f41e8fd5b665171 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Mar 2016 17:25:14 +0900 Subject: [PATCH 05/29] revert changes to create another PR --- .../codegen/GenerateMutableProjection.scala | 20 ++++++------------ .../codegen/GenerateOrdering.scala | 14 +++---------- .../codegen/GeneratePredicate.scala | 11 ++-------- .../codegen/GenerateSafeProjection.scala | 21 +++++-------------- .../codegen/GenerateUnsafeProjection.scala | 17 ++------------- 5 files changed, 18 insertions(+), 65 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 2bfe1e5882732..0f82d2e613c73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -94,8 +94,6 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP val allProjections = ctx.splitExpressions(ctx.INPUT_ROW, projectionCodes) val allUpdates = ctx.splitExpressions(ctx.INPUT_ROW, updates) - val callSite = - if (validExpr.isEmpty) "unknown" else validExpr(0).origin.callSite.getOrElse("unknown") val codeBody = s""" public java.lang.Object generate(Object[] references) { return new SpecificMutableProjection(references); @@ -125,18 +123,12 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP } public java.lang.Object apply(java.lang.Object _i) { - try { - InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; - $evalSubexpr - $allProjections - // copy all the results into MutableRow - $allUpdates - return mutableRow; - } catch (final Throwable e) { - org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - logger.error("The method apply() is generated for ${callSite}"); - throw e; - } + InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; + $evalSubexpr + $allProjections + // copy all the results into MutableRow + $allUpdates + return mutableRow; } } """ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index 4a5ef55536a01..c10829d4f14f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -113,8 +113,6 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR protected def create(ordering: Seq[SortOrder]): BaseOrdering = { val ctx = newCodeGenContext() val comparisons = genComparisons(ctx, ordering) - val callSite = - if (validExpr.isEmpty) "unknown" else validExpr(0).origin.callSite.getOrElse("unknown") val codeBody = s""" public SpecificOrdering generate(Object[] references) { return new SpecificOrdering(references); @@ -132,15 +130,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR } public int compare(InternalRow a, InternalRow b) { - try { - InternalRow ${ctx.INPUT_ROW} = null; // Holds current row being evaluated. - $comparisons - return 0; - } catch (final Throwable e) { - org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - logger.error("The method compare() is generated for ${callSite}"); - throw e; - } + InternalRow ${ctx.INPUT_ROW} = null; // Holds current row being evaluated. + $comparisons + return 0; } }""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala index 8cedaa368407b..106bb27964cab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala @@ -56,15 +56,8 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool } public boolean eval(InternalRow ${ctx.INPUT_ROW}) { - try { - ${eval.code} - return !${eval.isNull} && ${eval.value}; - } catch (final Throwable e) { - org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - logger.error("The method eval() is generated for " + - "${predicate.origin.callSite.getOrElse("unknown")}"); - throw e; - } + ${eval.code} + return !${eval.isNull} && ${eval.value}; } }""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala index e7aabfd5a2ef9..b891f94673752 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala @@ -139,11 +139,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] protected def create(expressions: Seq[Expression]): Projection = { val ctx = newCodeGenContext() - val validExpr = expressions.filter { - case NoOp => false - case _ => true - } - val expressionCodes = validExpr.zipWithIndex.map { + val expressionCodes = expressions.zipWithIndex.map { + case (NoOp, _) => "" case (e, i) => val evaluationCode = e.genCode(ctx) val converter = convertToSafe(ctx, evaluationCode.value, e.dataType) @@ -158,8 +155,6 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] """ } val allExpressions = ctx.splitExpressions(ctx.INPUT_ROW, expressionCodes) - val callSite = - if (validExpr.isEmpty) "unknown" else validExpr(0).origin.callSite.getOrElse("unknown") val codeBody = s""" public java.lang.Object generate(Object[] references) { return new SpecificSafeProjection(references); @@ -179,15 +174,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection] } public java.lang.Object apply(java.lang.Object _i) { - try { - InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; - $allExpressions - return mutableRow; - } catch (final Throwable e) { - org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - logger.error("The method apply() is generated for ${callSite}"); - throw e; - } + InternalRow ${ctx.INPUT_ROW} = (InternalRow) _i; + $allExpressions + return mutableRow; } } """ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala index e0000ce9f6a21..5efba4b3a6087 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp import org.apache.spark.sql.types._ /** @@ -360,15 +359,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro private def create( expressions: Seq[Expression], subexpressionEliminationEnabled: Boolean): UnsafeProjection = { - val validExpr = expressions.filter { - case NoOp => false - case _ => true - } val ctx = newCodeGenContext() val eval = createCode(ctx, expressions, subexpressionEliminationEnabled) - val callSite = - if (validExpr.isEmpty) "unknown" else validExpr(0).origin.callSite.getOrElse("unknown") val codeBody = s""" public java.lang.Object generate(Object[] references) { return new SpecificUnsafeProjection(references); @@ -391,14 +384,8 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro } public UnsafeRow apply(InternalRow ${ctx.INPUT_ROW}) { - try { - ${eval.code.trim} - return ${eval.value}; - } catch (final Throwable e) { - org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - logger.error("The method apply() is generated for ${callSite}"); - throw e; - } + ${eval.code.trim} + return ${eval.value}; } } """ From 09245eb443662df871adbc227ba74281be60cdcb Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Mar 2016 17:33:40 +0900 Subject: [PATCH 06/29] revert changes to create another PRsql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala --- .../spark/sql/execution/WholeStageCodegenExec.scala | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index ad251776cc3b8..cd9ba7c75b91d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -328,14 +328,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co ${ctx.declareAddedFunctions()} protected void processNext() throws java.io.IOException { - try { - ${code.trim} - } catch (final Throwable e) { - org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - logger.error("The method processNext() is generated for " + - "${plan.origin.callSite.getOrElse("unknown")}"); - throw e; - } + ${code.trim} } } """.trim From 9731e4e1bbadbd426c67f146b1d829cb5f73cc7f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Mar 2016 17:36:19 +0900 Subject: [PATCH 07/29] revert --- .../apache/spark/sql/catalyst/util/package.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 53f3cfe078915..3d2a624ba3b30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -165,19 +165,6 @@ package object util { str.substring(0, len).replace("*/", "\\*\\/").replace("\\u", "\\\\u") + suffix } - /** - * Returns the string representation of this expression with origin that is safe to be put in - * code comments of generated code. - */ - def toCommentSafeString(expr: Expression): String = { - val str = if (expr.origin.callSite.isDefined && !expr.isInstanceOf[BoundReference]) { - expr.toString + " @ " + expr.origin.callSite.get - } else { - expr.toString - } - str.replace("*/", "\\*\\/").replace("\\u", "\\\\u") - } - /* FIX ME implicit class debugLogging(a: Any) { def debugLogging() { From 305852a8cc2a323c54f4813728783f7c6736527f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Mar 2016 17:38:57 +0900 Subject: [PATCH 08/29] Add toOriginString() --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 5dd480d24e19d..da421256fea5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -73,7 +73,7 @@ object CurrentOrigin { } // scalastyle:off -abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Serializable { +abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { // scalastyle:on self: BaseType => @@ -443,6 +443,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Ser override def toString: String = treeString + def toOriginString: String = + if (this.origin.callSite.isDefined && !this.isInstanceOf[BoundReference]) { + this.toString + " @ " + this.origin.callSite.get + } else { + this.toString + } + /** Returns a string representation of the nodes in this tree */ def treeString: String = generateTreeString(0, Nil, new StringBuilder).toString From 10de448f04c13b6da8ac856db880f7c87d0b159a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Mar 2016 17:39:50 +0900 Subject: [PATCH 09/29] call toOriginString() in toCommentSafeString() --- .../apache/spark/sql/catalyst/expressions/Expression.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 2ec46216e1cdb..b66dfeec83b4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -96,14 +96,14 @@ abstract class Expression extends TreeNode[Expression] { ctx.subExprEliminationExprs.get(this).map { subExprState => // This expression is repeated which means that the code to evaluate it has already been added // as a function before. In that case, we just re-use it. - ExprCode(ctx.registerComment(this.toString), subExprState.isNull, subExprState.value) + ExprCode(ctx.registerComment(this.toOriginString), subExprState.isNull, subExprState.value) }.getOrElse { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val ve = doGenCode(ctx, ExprCode("", isNull, value)) if (ve.code.nonEmpty) { // Add `this` in the comment. - ve.copy(code = s"${ctx.registerComment(this.toString)}\n" + ve.code.trim) + ve.copy(code = s"${ctx.registerComment(this.toOriginString)}\n" + ve.code.trim) } else { ve } From e68f551e3cbb633162e79994a5d50d9a0ee4613d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 3 Mar 2016 17:40:14 +0900 Subject: [PATCH 10/29] call toOriginString() in toCommentSafeString() From 458db22d60e2fe5b06ae7e4d79a54f613fdfe83b Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 10 Mar 2016 10:34:26 +0900 Subject: [PATCH 11/29] add CurrentOrigin when a Column object is created --- .../src/main/scala/org/apache/spark/sql/Column.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 713f7941beeb2..86d96f01932f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -26,10 +26,12 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils.getCallSite private[sql] object Column { @@ -139,6 +141,16 @@ class Column(protected[sql] val expr: Expression) extends Logging { override def hashCode: Int = this.expr.hashCode() + { + // set current call stack to CurrentOrigin + val exprWithCallSite = expr.children.find(_.origin.callSite.isDefined) + val callSite = exprWithCallSite match { + case Some(expr) => expr.origin.callSite.get + case _ => org.apache.spark.util.Utils.getCallSite().shortForm + } + org.apache.spark.sql.catalyst.trees.CurrentOrigin.setPosition(callSite, 0, 0) + } + /** Creates a column based on the given expression. */ private def withExpr(newExpr: Expression): Column = new Column(newExpr) From dfbe2df037c8e12e2c46befd73f61f770bb2e62a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 10 Mar 2016 10:48:44 +0900 Subject: [PATCH 12/29] rebase --- .../scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala old mode 100644 new mode 100755 From 7b606ab8f426a78113accc60d45f44c6ee1bcbce Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 17 Mar 2016 00:23:55 +0900 Subject: [PATCH 13/29] replace Origin.callSite in Expression in Column with that for DataFrame API's call site --- .../spark/sql/catalyst/trees/TreeNode.scala | 2 +- .../scala/org/apache/spark/sql/Column.scala | 8 +++++++- .../scala/org/apache/spark/sql/Dataset.scala | 19 +++++++++++++++++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index da421256fea5b..c08cc4f0d731e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -41,7 +41,7 @@ import org.apache.spark.util.Utils private class MutableInt(var i: Int) case class Origin( - callSite: Option[String] = None, + var callSite: Option[String] = None, line: Option[Int] = None, startPosition: Option[Int] = None) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 86d96f01932f3..49e3d480d03f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin} import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.aggregate.TypedAggregateExpression import org.apache.spark.sql.functions.lit @@ -48,6 +48,12 @@ private[sql] object Column { case expr => usePrettyExpression(expr).sql } } + + @scala.annotation.varargs + def updateExpressionsOrigin(cols: Column*): Unit = { + val callSite = org.apache.spark.util.Utils.getCallSite().shortForm + cols.map(col => col.expr.foreach(e => e.origin.callSite = Some(callSite))) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 369b772d322c0..c59e96dd0c538 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -683,6 +683,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame = { + Column.updateExpressionsOrigin(joinExprs) // Note that in this function, we introduce a hack in the case of self-join to automatically // resolve ambiguous join conditions into ones that might make sense [SPARK-6231]. // Consider this case: df.join(df, df("key") === df("key")) @@ -967,6 +968,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def select(cols: Column*): DataFrame = withPlan { + Column.updateExpressionsOrigin(cols : _*) Project(cols.map(_.named), logicalPlan) } @@ -1111,6 +1113,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def filter(condition: Column): Dataset[T] = withTypedPlan { + Column.updateExpressionsOrigin(condition) Filter(condition.expr, logicalPlan) } @@ -1173,6 +1176,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def groupBy(cols: Column*): RelationalGroupedDataset = { + Column.updateExpressionsOrigin(cols : _*) RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.GroupByType) } @@ -1197,6 +1201,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def rollup(cols: Column*): RelationalGroupedDataset = { + Column.updateExpressionsOrigin(cols : _*) RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.RollupType) } @@ -1221,6 +1226,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def cube(cols: Column*): RelationalGroupedDataset = { + Column.updateExpressionsOrigin(cols : _*) RelationalGroupedDataset(toDF(), cols.map(_.expr), RelationalGroupedDataset.CubeType) } @@ -1419,7 +1425,10 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @scala.annotation.varargs - def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs : _*) + def agg(expr: Column, exprs: Column*): DataFrame = { + Column.updateExpressionsOrigin(exprs : _*) + groupBy().agg(expr, exprs : _*) + } /** * Returns a new [[Dataset]] by taking the first `n` rows. The difference between this function @@ -1608,6 +1617,7 @@ class Dataset[T] private[sql]( */ @deprecated("use flatMap() or select() with functions.explode() instead", "2.0.0") def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame = { + Column.updateExpressionsOrigin(input : _*) val elementSchema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val convert = CatalystTypeConverters.createToCatalystConverter(elementSchema) @@ -1671,7 +1681,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def withColumn(colName: String, col: Column): DataFrame = { - val resolver = sparkSession.sessionState.analyzer.resolver + Column.updateExpressionsOrigin(col) val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { @@ -1692,6 +1702,7 @@ class Dataset[T] private[sql]( * Returns a new [[Dataset]] by adding a column with metadata. */ private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = { + Column.updateExpressionsOrigin(col) val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) @@ -1782,6 +1793,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def drop(col: Column): DataFrame = { + Column.updateExpressionsOrigin(col) val expression = col match { case Column(u: UnresolvedAttribute) => queryExecution.analyzed.resolveQuoted( @@ -2218,6 +2230,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = withTypedPlan { + Column.updateExpressionsOrigin(partitionExprs : _*) RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, Some(numPartitions)) } @@ -2233,6 +2246,7 @@ class Dataset[T] private[sql]( */ @scala.annotation.varargs def repartition(partitionExprs: Column*): Dataset[T] = withTypedPlan { + Column.updateExpressionsOrigin(partitionExprs : _*) RepartitionByExpression(partitionExprs.map(_.expr), logicalPlan, numPartitions = None) } @@ -2528,6 +2542,7 @@ class Dataset[T] private[sql]( } private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = { + Column.updateExpressionsOrigin(sortExprs : _*) val sortOrder: Seq[SortOrder] = sortExprs.map { col => col.expr match { case expr: SortOrder => From d9536d4f16409f80bafe66c68623831cb54a9ada Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 18 Mar 2016 16:20:55 +0900 Subject: [PATCH 14/29] add callSite in toOriginString() for all subclassses of a TreeNode class --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index c08cc4f0d731e..9b3181dd2cfbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -444,7 +444,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { override def toString: String = treeString def toOriginString: String = - if (this.origin.callSite.isDefined && !this.isInstanceOf[BoundReference]) { + if (this.origin.callSite.isDefined) { this.toString + " @ " + this.origin.callSite.get } else { this.toString From 9d70b36afb1033208f3574825131f6fbaa9e1fb9 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 11 Apr 2016 04:16:49 +0900 Subject: [PATCH 15/29] rebase --- .../org/apache/spark/sql/catalyst/parser/ParseDriver.scala | 6 +++--- .../org/apache/spark/sql/catalyst/parser/ParserUtils.scala | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index d687a85c18b63..d3f2eb5363fe1 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -98,7 +98,7 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { case e: ParseException => throw e.withCommand(command) case e: AnalysisException => - val position = Origin(e.line, e.startPosition) + val position = Origin(None, e.line, e.startPosition) throw new ParseException(Option(command), e.message, position, position) } } @@ -150,7 +150,7 @@ case object ParseErrorListener extends BaseErrorListener { charPositionInLine: Int, msg: String, e: RecognitionException): Unit = { - val position = Origin(Some(line), Some(charPositionInLine)) + val position = Origin(None, Some(line), Some(charPositionInLine)) throw new ParseException(None, msg, position, position) } } @@ -176,7 +176,7 @@ class ParseException( val builder = new StringBuilder builder ++= "\n" ++= message start match { - case Origin(Some(l), Some(p)) => + case Origin(None, Some(l), Some(p)) => builder ++= s"(line $l, pos $p)\n" command.foreach { cmd => val (above, below) = cmd.split("\n").splitAt(l) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala index 9619884edeafe..48d4b10b0145c 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala @@ -74,7 +74,8 @@ object ParserUtils { /** Get the origin (line and position) of the token. */ def position(token: Token): Origin = { - Origin(Option(token.getLine), Option(token.getCharPositionInLine)) + val callSite = org.apache.spark.util.Utils.getCallSite().shortForm + Origin(Option(callSite), Option(token.getLine), Option(token.getCharPositionInLine)) } /** Assert if a condition holds. If it doesn't throw a parse exception. */ From 05332ed3cf7df7f7400450973e73aabe26d5073a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Mon, 11 Apr 2016 10:57:22 +0900 Subject: [PATCH 16/29] fix test failure of 'semantic errors' in ErrorParserSuite --- .../org/apache/spark/sql/catalyst/parser/ParseDriver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index d3f2eb5363fe1..d2b8330223d69 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -176,7 +176,7 @@ class ParseException( val builder = new StringBuilder builder ++= "\n" ++= message start match { - case Origin(None, Some(l), Some(p)) => + case Origin(_, Some(l), Some(p)) => builder ++= s"(line $l, pos $p)\n" command.foreach { cmd => val (above, below) = cmd.split("\n").splitAt(l) From 330521c951cd6395604bcf412640213a67c15f53 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 12 Apr 2016 11:36:08 +0900 Subject: [PATCH 17/29] Fix issues by avoiding to set Origin in a constructor of Column --- .../src/main/scala/org/apache/spark/sql/Column.scala | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) mode change 100644 => 100755 sql/core/src/main/scala/org/apache/spark/sql/Column.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala old mode 100644 new mode 100755 index 49e3d480d03f8..bc89a9e379e24 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -52,6 +52,8 @@ private[sql] object Column { @scala.annotation.varargs def updateExpressionsOrigin(cols: Column*): Unit = { val callSite = org.apache.spark.util.Utils.getCallSite().shortForm + print(s"callSite: $callSite\n") + Thread.dumpStack() cols.map(col => col.expr.foreach(e => e.origin.callSite = Some(callSite))) } } @@ -147,16 +149,6 @@ class Column(protected[sql] val expr: Expression) extends Logging { override def hashCode: Int = this.expr.hashCode() - { - // set current call stack to CurrentOrigin - val exprWithCallSite = expr.children.find(_.origin.callSite.isDefined) - val callSite = exprWithCallSite match { - case Some(expr) => expr.origin.callSite.get - case _ => org.apache.spark.util.Utils.getCallSite().shortForm - } - org.apache.spark.sql.catalyst.trees.CurrentOrigin.setPosition(callSite, 0, 0) - } - /** Creates a column based on the given expression. */ private def withExpr(newExpr: Expression): Column = new Column(newExpr) From 607e67840ebd265256c759da94d93fd3987f7bb2 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 12 Apr 2016 11:36:39 +0900 Subject: [PATCH 18/29] addressed a comment by make TreeNode serializable --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) mode change 100644 => 100755 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala old mode 100644 new mode 100755 index 9b3181dd2cfbf..3e2066bb6ebdb --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -73,7 +73,7 @@ object CurrentOrigin { } // scalastyle:off -abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { +abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Serializable { // scalastyle:on self: BaseType => From f0b958773be5426c0aaf26638f4f6d02c071bb82 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 12 Apr 2016 16:05:45 +0900 Subject: [PATCH 19/29] Revert a change. Now, TreeNode is non-serializable --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 3e2066bb6ebdb..9b3181dd2cfbf 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -73,7 +73,7 @@ object CurrentOrigin { } // scalastyle:off -abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product with Serializable { +abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { // scalastyle:on self: BaseType => From 0bf3586bda547aac6464ecbb220ac9ec0bc743aa Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 12 Apr 2016 19:12:47 +0900 Subject: [PATCH 20/29] Update CurrentOrigin to set origin to LogicalPlan --- sql/core/src/main/scala/org/apache/spark/sql/Column.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index bc89a9e379e24..898acb2f6da51 100755 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -51,10 +51,12 @@ private[sql] object Column { @scala.annotation.varargs def updateExpressionsOrigin(cols: Column*): Unit = { + // Update Expression.origin using the callSite of an operation val callSite = org.apache.spark.util.Utils.getCallSite().shortForm - print(s"callSite: $callSite\n") - Thread.dumpStack() cols.map(col => col.expr.foreach(e => e.origin.callSite = Some(callSite))) + // Update CurrentOrigin for setting origin for LogicalPlan node + CurrentOrigin.set( + Origin(Some(callSite), CurrentOrigin.get.line, CurrentOrigin.get.startPosition)) } } From c08d83928991c080dbb4e67e65dc987671d49091 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 13 Apr 2016 04:31:01 +0900 Subject: [PATCH 21/29] stop setting callSite into Origin at parse time --- .../org/apache/spark/sql/catalyst/parser/ParserUtils.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala index 48d4b10b0145c..ad89afbad826f 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala @@ -74,8 +74,7 @@ object ParserUtils { /** Get the origin (line and position) of the token. */ def position(token: Token): Origin = { - val callSite = org.apache.spark.util.Utils.getCallSite().shortForm - Origin(Option(callSite), Option(token.getLine), Option(token.getCharPositionInLine)) + Origin(None, Option(token.getLine), Option(token.getCharPositionInLine)) } /** Assert if a condition holds. If it doesn't throw a parse exception. */ From 7c68e0708dd275158d02ebef4ec7d84370d3c62c Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 13 Apr 2016 04:31:37 +0900 Subject: [PATCH 22/29] add test suites --- .../scala/org/apache/spark/util/Utils.scala | 12 +- .../execution/CodegenEmbedFileLineSuite.scala | 139 ++++++++++++++++++ 2 files changed, 147 insertions(+), 4 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1a9dbcae8c083..33c61f69bef63 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1307,7 +1307,8 @@ private[spark] object Utils extends Logging { } /** Default filtering function for finding call sites using `getCallSite`. */ - private def sparkInternalExclusionFunction(className: String): Boolean = { + private def sparkInternalExclusionFunction(className: String, testClassName: String): + Boolean = { // A regular expression to match classes of the internal Spark API's // that we want to skip when finding the call site of a method. val SPARK_CORE_CLASS_REGEX = @@ -1317,8 +1318,9 @@ private[spark] object Utils extends Logging { val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined || SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined val isScalaClass = className.startsWith(SCALA_CORE_CLASS_PREFIX) + val isSparkTestSuiteClass = (testClassName != null) && className.startsWith(testClassName) // If the class is a Spark internal class or a Scala class, then exclude. - isSparkClass || isScalaClass + (isSparkClass || isScalaClass) && !isSparkTestSuiteClass } /** @@ -1328,7 +1330,8 @@ private[spark] object Utils extends Logging { * * @param skipClass Function that is used to exclude non-user-code classes. */ - def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = { + def getCallSite(skipClass: (String, String) => Boolean = sparkInternalExclusionFunction): + CallSite = { // Keep crawling up the stack trace until we find the first function not inside of the spark // package. We track the last (shallowest) contiguous Spark method. This might be an RDD // transformation, a SparkContext function (such as parallelize), or anything else that leads @@ -1343,10 +1346,11 @@ private[spark] object Utils extends Logging { // When running under some profilers, the current stack trace might contain some bogus // frames. This is intended to ensure that we don't crash in these situations by // ignoring any frames that we can't examine. + val testClassName = System.getProperty("spark.callstack.testClass") if (ste != null && ste.getMethodName != null && !ste.getMethodName.contains("getStackTrace")) { if (insideSpark) { - if (skipClass(ste.getClassName)) { + if (skipClass(ste.getClassName, testClassName)) { lastSparkMethod = if (ste.getMethodName == "") { // Spark method is a constructor; get its class name ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala new file mode 100644 index 0000000000000..fb108e512ff59 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.core.expressions.codegen + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.execution.debug.codegenString +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.util.ResetSystemProperties + + + +class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext + with ResetSystemProperties { + import testImplicits._ + + test("filter String") { + val df = sparkContext.parallelize(1 to 1, 1).map(i => (i, -i)).toDF("k", "v") + .filter("k > 0") + validate(df, Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:35")) + } + + test("select Column") { + val df = sparkContext.parallelize(1 to 1, 1).toDF + .select($"value" + 1) + validate(df, Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:41")) + } + + test("selectExpr String") { + val df = sparkContext.parallelize(1 to 1, 1).toDF + .selectExpr("value + 2") + validate(df, Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:47")) + } + + test("filter Strings (two filters are combined into one plan") { + val df = sparkContext.parallelize(1 to 1, 1).map(i => (i, -i)).toDF("k", "v") + .filter("k > 0") + .filter("v > 1") + validate(df, + Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:53", + " > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:54"), + Array(" > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:53", + " > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:54")) + } + + test("selectExpr Strings") { + val df = sparkContext.parallelize(1 to 1, 1).map(i => (i, -i)).toDF("k", "v") + .selectExpr("k + 2", "v - 2") + validate(df, + Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:64", + " - 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:64")) + } + + test("select and selectExpr") { + val df = sparkContext.parallelize(1 to 1, 1).toDF + val df1 = df.select($"value" + 1) + val df2 = df.selectExpr("value + 2") + validate(df1, + Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:72"), + Array(" \\+ 2\\) @ select at CodegenEmbedFileLineSuite.scala:73")) + validate(df2, + Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:73"), + Array(" \\+ 1\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:72")) + } + + test("filter and select") { + val df = sparkContext.parallelize(1 to 1, 1).toDF + val df1 = df.filter("value > 0") + val df2 = df1.select($"value" * 2) + validate(df2, + Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:84", + " \\* 2\\) @ select at CodegenEmbedFileLineSuite.scala:85")) + } + + test("no transformation") { + val df = sparkContext.parallelize(1 to 1, 1).toDF + validate(df, + Array.empty, + Array("CodegenEmbedFileLineSuite.scala")) + } + + + def validate(df: DataFrame, + expected: Array[String] = Array.empty, unexpected: Array[String] = Array.empty): Unit = { + val logicalPlan = df.logicalPlan + // As LogicalPlan.resolveOperators does, + // this routine also updates CurrentOrigin by logicalPlan.origin + val cg = CurrentOrigin.withOrigin(logicalPlan.origin) { + val queryExecution = sqlContext.executePlan(logicalPlan) + codegenString(queryExecution.executedPlan) + } + + if (cg.contains("Found 0 WholeStageCodegen subtrees")) { + return + } + + expected.foreach { string => + if (!string.r.findFirstIn(cg).isDefined) { + fail( + s""" + |=== FAIL: generated code must include: "$string" === + |$cg + """.stripMargin + ) + } + } + unexpected.foreach { string => + if (string.r.findFirstIn(cg).isDefined) { + fail( + s""" + |=== FAIL: generated code must not include: "$string" === + |$cg + """.stripMargin + ) + } + } + } + + override def beforeEach() { + super.beforeEach() + System.setProperty("spark.callstack.testClass", this.getClass.getName) + } +} From 0091eb59c22589a94ad990a20151bd813f9191c8 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 13 Apr 2016 13:35:08 +0900 Subject: [PATCH 23/29] fix compilation error --- core/src/main/scala/org/apache/spark/util/Utils.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 33c61f69bef63..6698493f789c1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1307,7 +1307,7 @@ private[spark] object Utils extends Logging { } /** Default filtering function for finding call sites using `getCallSite`. */ - private def sparkInternalExclusionFunction(className: String, testClassName: String): + private def sparkInternalExclusionFunction(className: String): Boolean = { // A regular expression to match classes of the internal Spark API's // that we want to skip when finding the call site of a method. @@ -1318,6 +1318,7 @@ private[spark] object Utils extends Logging { val isSparkClass = SPARK_CORE_CLASS_REGEX.findFirstIn(className).isDefined || SPARK_SQL_CLASS_REGEX.findFirstIn(className).isDefined val isScalaClass = className.startsWith(SCALA_CORE_CLASS_PREFIX) + val testClassName = System.getProperty("spark.callstack.testClass") val isSparkTestSuiteClass = (testClassName != null) && className.startsWith(testClassName) // If the class is a Spark internal class or a Scala class, then exclude. (isSparkClass || isScalaClass) && !isSparkTestSuiteClass @@ -1330,7 +1331,7 @@ private[spark] object Utils extends Logging { * * @param skipClass Function that is used to exclude non-user-code classes. */ - def getCallSite(skipClass: (String, String) => Boolean = sparkInternalExclusionFunction): + def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = { // Keep crawling up the stack trace until we find the first function not inside of the spark // package. We track the last (shallowest) contiguous Spark method. This might be an RDD @@ -1346,11 +1347,10 @@ private[spark] object Utils extends Logging { // When running under some profilers, the current stack trace might contain some bogus // frames. This is intended to ensure that we don't crash in these situations by // ignoring any frames that we can't examine. - val testClassName = System.getProperty("spark.callstack.testClass") if (ste != null && ste.getMethodName != null && !ste.getMethodName.contains("getStackTrace")) { if (insideSpark) { - if (skipClass(ste.getClassName, testClassName)) { + if (skipClass(ste.getClassName)) { lastSparkMethod = if (ste.getMethodName == "") { // Spark method is a constructor; get its class name ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1) From bacfcc6496d1184ef5ad67a19a5bb7f448f700e9 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 21 Apr 2016 20:36:13 +0900 Subject: [PATCH 24/29] resolved conflicts --- .../sql/catalyst/expressions/codegen/CodegenFallback.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala index 6a5a3e7933eea..e5662b073123c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodegenFallback.scala @@ -35,7 +35,7 @@ trait CodegenFallback extends Expression { val idx = ctx.references.length ctx.references += this val objectTerm = ctx.freshName("obj") - val placeHolder = ctx.registerComment(this.toString) + val placeHolder = ctx.registerComment(this.toOriginString) if (nullable) { ev.copy(code = s""" $placeHolder From 4f8772b4f1d09d151feca36f3a2726b486e848ae Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 27 Apr 2016 14:14:42 -0400 Subject: [PATCH 25/29] fix build error --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index c59e96dd0c538..762e42217e40f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1682,6 +1682,7 @@ class Dataset[T] private[sql]( */ def withColumn(colName: String, col: Column): DataFrame = { Column.updateExpressionsOrigin(col) + val resolver = sparkSession.sessionState.analyzer.resolver val output = queryExecution.analyzed.output val shouldReplace = output.exists(f => resolver(f.name, colName)) if (shouldReplace) { From ccdf12d705cf799f1da7811672d9d1108788bb74 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 5 May 2016 20:01:04 -0400 Subject: [PATCH 26/29] resolved conflicts From 7279489034314a2fce45b818c8f45dbb58a82efa Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 8 May 2016 13:55:17 -0400 Subject: [PATCH 27/29] addressed minor comments --- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- .../org/apache/spark/sql/catalyst/parser/ParseDriver.scala | 0 .../org/apache/spark/sql/catalyst/parser/ParserUtils.scala | 0 .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 0 sql/core/src/main/scala/org/apache/spark/sql/Column.scala | 0 .../apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala | 2 -- 6 files changed, 1 insertion(+), 4 deletions(-) mode change 100755 => 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala mode change 100755 => 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala mode change 100755 => 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala mode change 100755 => 100644 sql/core/src/main/scala/org/apache/spark/sql/Column.scala diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6698493f789c1..522c8ccb3e960 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1307,8 +1307,7 @@ private[spark] object Utils extends Logging { } /** Default filtering function for finding call sites using `getCallSite`. */ - private def sparkInternalExclusionFunction(className: String): - Boolean = { + private def sparkInternalExclusionFunction(className: String): Boolean = { // A regular expression to match classes of the internal Spark API's // that we want to skip when finding the call site of a method. val SPARK_CORE_CLASS_REGEX = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala old mode 100755 new mode 100644 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserUtils.scala old mode 100755 new mode 100644 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala old mode 100755 new mode 100644 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala old mode 100755 new mode 100644 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala index fb108e512ff59..6005a08833557 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala @@ -24,8 +24,6 @@ import org.apache.spark.sql.execution.debug.codegenString import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.ResetSystemProperties - - class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext with ResetSystemProperties { import testImplicits._ From a96bc4811e927cf27a28aec32303966b05b0f026 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 8 May 2016 15:57:43 -0400 Subject: [PATCH 28/29] update expected line numbers in test suite --- .../execution/CodegenEmbedFileLineSuite.scala | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala index 6005a08833557..d9893cfc21186 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala @@ -31,19 +31,19 @@ class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext test("filter String") { val df = sparkContext.parallelize(1 to 1, 1).map(i => (i, -i)).toDF("k", "v") .filter("k > 0") - validate(df, Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:35")) + validate(df, Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:33")) } test("select Column") { val df = sparkContext.parallelize(1 to 1, 1).toDF .select($"value" + 1) - validate(df, Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:41")) + validate(df, Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:39")) } test("selectExpr String") { val df = sparkContext.parallelize(1 to 1, 1).toDF .selectExpr("value + 2") - validate(df, Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:47")) + validate(df, Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:45")) } test("filter Strings (two filters are combined into one plan") { @@ -51,18 +51,18 @@ class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext .filter("k > 0") .filter("v > 1") validate(df, - Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:53", - " > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:54"), - Array(" > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:53", - " > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:54")) + Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:51", + " > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:52"), + Array(" > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:51", + " > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:52")) } test("selectExpr Strings") { val df = sparkContext.parallelize(1 to 1, 1).map(i => (i, -i)).toDF("k", "v") .selectExpr("k + 2", "v - 2") validate(df, - Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:64", - " - 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:64")) + Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:62", + " - 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:62")) } test("select and selectExpr") { @@ -70,11 +70,11 @@ class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext val df1 = df.select($"value" + 1) val df2 = df.selectExpr("value + 2") validate(df1, - Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:72"), - Array(" \\+ 2\\) @ select at CodegenEmbedFileLineSuite.scala:73")) + Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:70"), + Array(" \\+ 2\\) @ select at CodegenEmbedFileLineSuite.scala:71")) validate(df2, - Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:73"), - Array(" \\+ 1\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:72")) + Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:71"), + Array(" \\+ 1\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:70")) } test("filter and select") { @@ -82,8 +82,8 @@ class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext val df1 = df.filter("value > 0") val df2 = df1.select($"value" * 2) validate(df2, - Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:84", - " \\* 2\\) @ select at CodegenEmbedFileLineSuite.scala:85")) + Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:82", + " \\* 2\\) @ select at CodegenEmbedFileLineSuite.scala:83")) } test("no transformation") { From 597b7328554e9ee97615b4992002c16ef0322355 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 3 Jun 2016 01:26:25 +0900 Subject: [PATCH 29/29] update testsuite --- .../execution/CodegenEmbedFileLineSuite.scala | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala index d9893cfc21186..c6fee0f49fb59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/CodegenEmbedFileLineSuite.scala @@ -27,23 +27,24 @@ import org.apache.spark.util.ResetSystemProperties class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext with ResetSystemProperties { import testImplicits._ + sparkConf.set("spark.sql.codegen.comments", "true") test("filter String") { val df = sparkContext.parallelize(1 to 1, 1).map(i => (i, -i)).toDF("k", "v") .filter("k > 0") - validate(df, Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:33")) + validate(df, Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:34")) } test("select Column") { val df = sparkContext.parallelize(1 to 1, 1).toDF .select($"value" + 1) - validate(df, Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:39")) + validate(df, Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:40")) } test("selectExpr String") { val df = sparkContext.parallelize(1 to 1, 1).toDF .selectExpr("value + 2") - validate(df, Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:45")) + validate(df, Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:46")) } test("filter Strings (two filters are combined into one plan") { @@ -51,18 +52,18 @@ class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext .filter("k > 0") .filter("v > 1") validate(df, - Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:51", - " > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:52"), - Array(" > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:51", - " > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:52")) + Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:52", + " > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:53"), + Array(" > 1\\) @ filter at CodegenEmbedFileLineSuite.scala:52", + " > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:53")) } test("selectExpr Strings") { val df = sparkContext.parallelize(1 to 1, 1).map(i => (i, -i)).toDF("k", "v") .selectExpr("k + 2", "v - 2") validate(df, - Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:62", - " - 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:62")) + Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:63", + " - 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:63")) } test("select and selectExpr") { @@ -70,11 +71,11 @@ class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext val df1 = df.select($"value" + 1) val df2 = df.selectExpr("value + 2") validate(df1, - Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:70"), - Array(" \\+ 2\\) @ select at CodegenEmbedFileLineSuite.scala:71")) + Array(" \\+ 1\\) @ select at CodegenEmbedFileLineSuite.scala:71"), + Array(" \\+ 2\\) @ select at CodegenEmbedFileLineSuite.scala:72")) validate(df2, - Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:71"), - Array(" \\+ 1\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:70")) + Array(" \\+ 2\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:72"), + Array(" \\+ 1\\) @ selectExpr at CodegenEmbedFileLineSuite.scala:71")) } test("filter and select") { @@ -82,8 +83,8 @@ class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext val df1 = df.filter("value > 0") val df2 = df1.select($"value" * 2) validate(df2, - Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:82", - " \\* 2\\) @ select at CodegenEmbedFileLineSuite.scala:83")) + Array(" > 0\\) @ filter at CodegenEmbedFileLineSuite.scala:83", + " \\* 2\\) @ select at CodegenEmbedFileLineSuite.scala:84")) } test("no transformation") { @@ -100,7 +101,7 @@ class CodegenEmbedFileLineSuite extends PlanTest with SharedSQLContext // As LogicalPlan.resolveOperators does, // this routine also updates CurrentOrigin by logicalPlan.origin val cg = CurrentOrigin.withOrigin(logicalPlan.origin) { - val queryExecution = sqlContext.executePlan(logicalPlan) + val queryExecution = sqlContext.sessionState.executePlan(logicalPlan) codegenString(queryExecution.executedPlan) }