From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 1/9] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdb..5a5b71e52dd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da0..5e00546a74c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 2/9] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00a..47962ebe6ef8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c0..61d9dcd37572 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 3/9] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572..3427152b2da0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 797aabb275af8e7d1208341bcd78bcc41a1d2191 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 14 Mar 2016 22:34:33 -0700 Subject: [PATCH 4/9] When doing the test, issue an exception if hitting the max iteration. --- .../sql/catalyst/rules/RuleExecutor.scala | 22 ++++++++++++++----- .../optimizer/AggregateOptimizeSuite.scala | 1 + .../BooleanSimplificationSuite.scala | 1 + .../optimizer/ColumnPruningSuite.scala | 1 + .../optimizer/CombiningLimitsSuite.scala | 1 + .../optimizer/ComputeCurrentTimeSuite.scala | 1 + .../optimizer/SimplifyConditionalSuite.scala | 4 +++- .../catalyst/trees/RuleExecutorSuite.scala | 8 +++++-- .../apache/spark/sql/test/SQLTestUtils.scala | 1 + 9 files changed, 31 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 9ebacb4680dc..9a52e5554191 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -21,9 +21,10 @@ import scala.collection.JavaConverters._ import com.google.common.util.concurrent.AtomicLongMap -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkException} import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.sideBySide +import org.apache.spark.util.Utils object RuleExecutor { protected val timeMap = AtomicLongMap.create[String]() @@ -46,15 +47,23 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** * An execution strategy for rules that indicates the maximum number of executions. If the * execution reaches fix point (i.e. converge) before maxIterations, it will stop. + * If throws is equal to true, it will issue an exception AnalysisExceptions */ - abstract class Strategy { def maxIterations: Int } + abstract class Strategy { + def maxIterations: Int + def throws: Boolean + } /** A strategy that only runs once. */ - case object Once extends Strategy { val maxIterations = 1 } + case object Once extends Strategy { + val maxIterations = 1 + val throws = false + } /** A strategy that runs until fix point or maxIterations times, whichever comes first. */ - case class FixedPoint(maxIterations: Int) extends Strategy - + case class FixedPoint(maxIterations: Int) extends Strategy { + override val throws: Boolean = if (Utils.isTesting) true else false + } /** A batch of rules. */ protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) @@ -98,7 +107,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { - logInfo(s"Max iterations (${iteration - 1}) reached for batch ${batch.name}") + val msg = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + if (batch.strategy.throws) throw new SparkException(msg) else logTrace(msg) } continue = false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala index e458eb8a1d36..6ecaec5b8fcb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor class AggregateOptimizeSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { + System.setProperty("spark.testing", "true") val batches = Batch("Aggregate", FixedPoint(100), RemoveLiteralFromGroupExpressions) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index da43751b0a31..e23855dd037f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.rules._ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { + System.setProperty("spark.testing", "true") val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index dd7d65ddc9e9..9c61ee8375b7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.types.StringType class ColumnPruningSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { + System.setProperty("spark.testing", "true") val batches = Batch("Column pruning", FixedPoint(100), ColumnPruning, CollapseProject) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 87ad81db11b6..128dd64f4866 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.rules._ class CombiningLimitsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { + System.setProperty("spark.testing", "true") val batches = Batch("Filter Pushdown", FixedPoint(100), ColumnPruning) :: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 10ed4e46ddd1..4cdf0fbe10f6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.DateTimeUtils class ComputeCurrentTimeSuite extends PlanTest { + System.setProperty("spark.testing", "true") object Optimize extends RuleExecutor[LogicalPlan] { val batches = Seq(Batch("ComputeCurrentTime", Once, ComputeCurrentTime)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala index d436b627f6bd..dc22b9556c9e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala @@ -29,7 +29,9 @@ import org.apache.spark.sql.types.IntegerType class SimplifyConditionalSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("SimplifyConditionals", FixedPoint(50), SimplifyConditionals) :: Nil + System.setProperty("spark.testing", "true") + val batches = + Batch("SimplifyConditionals", FixedPoint(50), SimplifyConditionals) :: Nil } protected def assertEquivalent(e1: Expression, e2: Expression): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index a7de7b052bdc..1fa343e9751f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.trees -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} @@ -46,9 +46,13 @@ class RuleExecutorSuite extends SparkFunSuite { test("to maxIterations") { object ToFixedPoint extends RuleExecutor[Expression] { + System.setProperty("spark.testing", "true") val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil } - assert(ToFixedPoint.execute(Literal(100)) === Literal(90)) + val message = intercept[SparkException] { + ToFixedPoint.execute(Literal(100)) + }.getMessage + assert(message.contains("Max iterations (10) reached for batch fixedPoint")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 2bce74571d02..9097a5134d85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -84,6 +84,7 @@ private[sql] trait SQLTestUtils protected override def beforeAll(): Unit = { super.beforeAll() + System.setProperty("spark.testing", "true") if (loadTestDataBeforeTests) { loadTestData() } From f351362475f4fd9382f36eb60150cad93aa8e864 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 15 Mar 2016 00:20:38 -0700 Subject: [PATCH 5/9] address comments. --- .../spark/sql/catalyst/rules/RuleExecutor.scala | 17 +++++++++++------ .../catalyst/optimizer/ColumnPruningSuite.scala | 1 - .../optimizer/CombiningLimitsSuite.scala | 1 - .../optimizer/ComputeCurrentTimeSuite.scala | 1 - .../optimizer/SimplifyConditionalSuite.scala | 1 - .../spark/sql/catalyst/plans/PlanTest.scala | 3 +++ .../sql/catalyst/trees/RuleExecutorSuite.scala | 6 ++++-- 7 files changed, 18 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 9a52e5554191..f2adefbd65e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -21,7 +21,8 @@ import scala.collection.JavaConverters._ import com.google.common.util.concurrent.AtomicLongMap -import org.apache.spark.{Logging, SparkException} +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.util.Utils @@ -51,18 +52,18 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { */ abstract class Strategy { def maxIterations: Int - def throws: Boolean + def throwsExceptionUponMaxIterations: Boolean } /** A strategy that only runs once. */ case object Once extends Strategy { - val maxIterations = 1 - val throws = false + override val maxIterations = 1 + override val throwsExceptionUponMaxIterations = false } /** A strategy that runs until fix point or maxIterations times, whichever comes first. */ case class FixedPoint(maxIterations: Int) extends Strategy { - override val throws: Boolean = if (Utils.isTesting) true else false + override val throwsExceptionUponMaxIterations: Boolean = if (Utils.isTesting) true else false } /** A batch of rules. */ protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) @@ -108,7 +109,11 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { val msg = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" - if (batch.strategy.throws) throw new SparkException(msg) else logTrace(msg) + if (batch.strategy.throwsExceptionUponMaxIterations) { + throw new TreeNodeException(curPlan, msg, null) + } else { + logTrace(msg) + } } continue = false } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 9c61ee8375b7..dd7d65ddc9e9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.types.StringType class ColumnPruningSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - System.setProperty("spark.testing", "true") val batches = Batch("Column pruning", FixedPoint(100), ColumnPruning, CollapseProject) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index 128dd64f4866..87ad81db11b6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.rules._ class CombiningLimitsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - System.setProperty("spark.testing", "true") val batches = Batch("Filter Pushdown", FixedPoint(100), ColumnPruning) :: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala index 4cdf0fbe10f6..10ed4e46ddd1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ComputeCurrentTimeSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util.DateTimeUtils class ComputeCurrentTimeSuite extends PlanTest { - System.setProperty("spark.testing", "true") object Optimize extends RuleExecutor[LogicalPlan] { val batches = Seq(Batch("ComputeCurrentTime", Once, ComputeCurrentTime)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala index dc22b9556c9e..aac2b016fc88 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.types.IntegerType class SimplifyConditionalSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { - System.setProperty("spark.testing", "true") val batches = Batch("SimplifyConditionals", FixedPoint(50), SimplifyConditionals) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 0541844e0bfc..6472011a7c90 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -26,6 +26,9 @@ import org.apache.spark.sql.catalyst.util._ * Provides helper methods for comparing plans. */ abstract class PlanTest extends SparkFunSuite with PredicateHelper { + + System.setProperty("spark.testing", "true") + /** * Since attribute references are given globally unique ids during analysis, * we must normalize them to check if two different queries are identical. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index 1fa343e9751f..d14e24acdf74 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.catalyst.trees -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} class RuleExecutorSuite extends SparkFunSuite { @@ -50,7 +52,7 @@ class RuleExecutorSuite extends SparkFunSuite { val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil } - val message = intercept[SparkException] { + val message = intercept[TreeNodeException[LogicalPlan]] { ToFixedPoint.execute(Literal(100)) }.getMessage assert(message.contains("Max iterations (10) reached for batch fixedPoint")) From 1281f36f624bd23f095e62708fe3516907dbde5d Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 15 Mar 2016 00:22:17 -0700 Subject: [PATCH 6/9] code clean --- .../sql/catalyst/optimizer/BooleanSimplificationSuite.scala | 1 - .../sql/catalyst/optimizer/SimplifyConditionalSuite.scala | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index e23855dd037f..da43751b0a31 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.rules._ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { - System.setProperty("spark.testing", "true") val batches = Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala index aac2b016fc88..d436b627f6bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala @@ -29,8 +29,7 @@ import org.apache.spark.sql.types.IntegerType class SimplifyConditionalSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = - Batch("SimplifyConditionals", FixedPoint(50), SimplifyConditionals) :: Nil + val batches = Batch("SimplifyConditionals", FixedPoint(50), SimplifyConditionals) :: Nil } protected def assertEquivalent(e1: Expression, e2: Expression): Unit = { From 23663fe8750979af00ba200a498119aeb8618a60 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 15 Mar 2016 00:27:24 -0700 Subject: [PATCH 7/9] update comments. --- .../org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index f2adefbd65e6..9742ff5192e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -48,7 +48,8 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** * An execution strategy for rules that indicates the maximum number of executions. If the * execution reaches fix point (i.e. converge) before maxIterations, it will stop. - * If throws is equal to true, it will issue an exception AnalysisExceptions + * If throwsExceptionUponMaxIterations is equal to true, it will issue an exception + * TreeNodeException. */ abstract class Strategy { def maxIterations: Int From 48b2c329bb57eef7ebb1b24e0f8ed37fb03a4cd3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 23 Mar 2016 06:28:43 -0700 Subject: [PATCH 8/9] address comments. --- .../org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 2 +- .../scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala | 2 -- .../src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala | 1 - 3 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 7c26599edd95..1eee09e60ea1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -64,7 +64,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { /** A strategy that runs until fix point or maxIterations times, whichever comes first. */ case class FixedPoint(maxIterations: Int) extends Strategy { - override val throwsExceptionUponMaxIterations: Boolean = if (Utils.isTesting) true else false + override val throwsExceptionUponMaxIterations: Boolean = Utils.isTesting } /** A batch of rules. */ protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 6472011a7c90..5dc93e354f6e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -27,8 +27,6 @@ import org.apache.spark.sql.catalyst.util._ */ abstract class PlanTest extends SparkFunSuite with PredicateHelper { - System.setProperty("spark.testing", "true") - /** * Since attribute references are given globally unique ids during analysis, * we must normalize them to check if two different queries are identical. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index 50ac7bd6e1b0..ab3876728bea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -78,7 +78,6 @@ private[sql] trait SQLTestUtils protected override def beforeAll(): Unit = { super.beforeAll() - System.setProperty("spark.testing", "true") if (loadTestDataBeforeTests) { loadTestData() } From 1a59fcf7013e2a4d7da85bfae37159f87adb3977 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 23 Mar 2016 06:31:10 -0700 Subject: [PATCH 9/9] clean code. --- .../scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 5dc93e354f6e..0541844e0bfc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.util._ * Provides helper methods for comparing plans. */ abstract class PlanTest extends SparkFunSuite with PredicateHelper { - /** * Since attribute references are given globally unique ids during analysis, * we must normalize them to check if two different queries are identical.