From 6197cd53aebfe4105478b834968397f20e1c0fe0 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Wed, 11 Feb 2015 23:29:16 -0800 Subject: [PATCH 1/8] [SQL] Better error messages for analysis failures --- .../sql/catalyst/analysis/Analyzer.scala | 112 ++++++++++-------- .../catalyst/expressions/AttributeSet.scala | 6 +- .../spark/sql/catalyst/trees/TreeNode.scala | 9 ++ .../sql/catalyst/analysis/AnalysisSuite.scala | 68 +++++++---- 4 files changed, 121 insertions(+), 74 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2d1fa106a2aa9..df2f10de68463 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.types.{ArrayType, StructField, StructType, IntegerType} +import org.apache.spark.sql.types._ /** * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing @@ -66,9 +66,7 @@ class Analyzer(catalog: Catalog, typeCoercionRules ++ extendedRules : _*), Batch("Check Analysis", Once, - CheckResolution :: - CheckAggregation :: - Nil: _*), + CheckResolution), Batch("AnalysisOperators", fixedPoint, EliminateAnalysisOperators) ) @@ -77,21 +75,70 @@ class Analyzer(catalog: Catalog, * Makes sure all attributes and logical plans have been resolved. */ object CheckResolution extends Rule[LogicalPlan] { + def failAnalysis(msg: String) = { throw new AnalysisException(msg) } + def apply(plan: LogicalPlan): LogicalPlan = { - plan.transformUp { - case p if p.expressions.exists(!_.resolved) => - val missing = p.expressions.filterNot(_.resolved).map(_.prettyString).mkString(",") - val from = p.inputSet.map(_.name).mkString("{", ", ", "}") - - throw new AnalysisException(s"Cannot resolve '$missing' given input columns $from") - case p if !p.resolved && p.childrenResolved => - throw new AnalysisException(s"Unresolved operator in the query plan ${p.simpleString}") - } match { - // As a backstop, use the root node to check that the entire plan tree is resolved. - case p if !p.resolved => - throw new AnalysisException(s"Unresolved operator in the query plan ${p.simpleString}") - case p => p + plan.foreachUp { + case operator: LogicalPlan => + operator transformAllExpressions { + case a: Attribute if !a.resolved => + val from = operator.inputSet.map(_.name).mkString("{", ", ", "}") + failAnalysis(s"cannot resolve '$a' given input columns $from") + + case c: Cast if !c.resolved => + failAnalysis( + s"invalid cast from ${c.child.dataType.simpleString} to ${c.dataType.simpleString}") + + case b: BinaryExpression if !b.resolved => + failAnalysis( + s"invalid expression ${b.prettyString} " + + s"between ${b.left.simpleString} and ${b.right.simpleString}") + + + } + + operator match { + case f: Filter if f.condition.dataType != BooleanType => + failAnalysis(s"filter expression '${f.condition.prettyString}' is not a boolean.") + + case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) => + def isValidAggregateExpression(expr: Expression): Boolean = expr match { + case _: AggregateExpression => true + case e: Attribute => groupingExprs.contains(e) + case e if groupingExprs.contains(e) => true + case e if e.references.isEmpty => true + case e => e.children.forall(isValidAggregateExpression) + } + + aggregateExprs.find { e => + !isValidAggregateExpression(e.transform { + // Should trim aliases around `GetField`s. These aliases are introduced while + // resolving struct field accesses, because `GetField` is not a `NamedExpression`. + // (Should we just turn `GetField` into a `NamedExpression`?) + case Alias(g: GetField, _) => g + }) + }.foreach { e => + failAnalysis(s"expression must be aggregates or be in group by $e") + } + + aggregatePlan + + case o if o.children.nonEmpty && !o.references.subsetOf(o.inputSet) => + val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(",") + val input = o.inputSet.map(_.prettyString).mkString(",") + + failAnalysis(s"resolved attributes $missingAttributes missing from $input") + + // Catch all + case o if !o.resolved => + failAnalysis( + s"unresolved operator ${operator.simpleString}") + + case _ => // Analysis successful! + } } + + plan } } @@ -192,37 +239,6 @@ class Analyzer(catalog: Catalog, } } - /** - * Checks for non-aggregated attributes with aggregation - */ - object CheckAggregation extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = { - plan.transform { - case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) => - def isValidAggregateExpression(expr: Expression): Boolean = expr match { - case _: AggregateExpression => true - case e: Attribute => groupingExprs.contains(e) - case e if groupingExprs.contains(e) => true - case e if e.references.isEmpty => true - case e => e.children.forall(isValidAggregateExpression) - } - - aggregateExprs.find { e => - !isValidAggregateExpression(e.transform { - // Should trim aliases around `GetField`s. These aliases are introduced while - // resolving struct field accesses, because `GetField` is not a `NamedExpression`. - // (Should we just turn `GetField` into a `NamedExpression`?) - case Alias(g: GetField, _) => g - }) - }.foreach { e => - throw new TreeNodeException(plan, s"Expression not in GROUP BY: $e") - } - - aggregatePlan - } - } - } - /** * Replaces [[UnresolvedRelation]]s with concrete relations from the catalog. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala index 171845ad14e3e..a9ba0be596349 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala @@ -20,7 +20,11 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.analysis.Star protected class AttributeEquals(val a: Attribute) { - override def hashCode() = a.exprId.hashCode() + override def hashCode() = a match { + case ar: AttributeReference => ar.exprId.hashCode() + case a => a.hashCode() + } + override def equals(other: Any) = (a, other.asInstanceOf[AttributeEquals].a) match { case (a1: AttributeReference, a2: AttributeReference) => a1.exprId == a2.exprId case (a1, a2) => a1 == a2 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 2013ae4f7bd13..e0930b056d5fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -46,6 +46,15 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] { children.foreach(_.foreach(f)) } + /** + * Runs the given function recursively on [[children]] then on this node. + * @param f the function to be applied to each node in the tree. + */ + def foreachUp(f: BaseType => Unit): Unit = { + children.foreach(_.foreach(f)) + f(this) + } + /** * Returns a Seq containing the result of applying the given function to each * node in this tree in a preorder traversal. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index f011a5ff15ea9..99083d35ad235 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Literal, Alias, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ @@ -108,24 +108,45 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { testRelation) } - test("throw errors for unresolved attributes during analysis") { - val e = intercept[AnalysisException] { - caseSensitiveAnalyze(Project(Seq(UnresolvedAttribute("abcd")), testRelation)) + def errorTest( + name: String, + plan: LogicalPlan, + errorMessages: Seq[String], + caseSensitive: Boolean = true) = { + test(name) { + val error = intercept[AnalysisException] { + if(caseSensitive) { + caseSensitiveAnalyze(plan) + } else { + caseInsensitiveAnalyze(plan) + } + } + + errorMessages.foreach(m => assert(error.getMessage contains m)) } - assert(e.getMessage().toLowerCase.contains("cannot resolve")) } - test("throw errors for unresolved plans during analysis") { - case class UnresolvedTestPlan() extends LeafNode { - override lazy val resolved = false - override def output = Nil - } - val e = intercept[AnalysisException] { - caseSensitiveAnalyze(UnresolvedTestPlan()) - } - assert(e.getMessage().toLowerCase.contains("unresolved")) + errorTest( + "unresolved attributes", + testRelation.select('abcd), + "cannot resolve" :: "abcd" :: Nil) + + errorTest( + "bad casts", + testRelation.select(Literal(1).cast(BinaryType).as('badCast)), + "invalid cast" :: Literal(1).dataType.simpleString :: BinaryType.simpleString :: Nil) + + case class UnresolvedTestPlan() extends LeafNode { + override lazy val resolved = false + override def output = Nil } + errorTest( + "catch all unresolved plan", + UnresolvedTestPlan(), + "unresolved" :: Nil) + + test("divide should be casted into fractional types") { val testRelation2 = LocalRelation( AttributeReference("a", StringType)(), @@ -134,18 +155,15 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { AttributeReference("d", DecimalType.Unlimited)(), AttributeReference("e", ShortType)()) - val expr0 = 'a / 2 - val expr1 = 'a / 'b - val expr2 = 'a / 'c - val expr3 = 'a / 'd - val expr4 = 'e / 'e - val plan = caseInsensitiveAnalyze(Project( - Alias(expr0, s"Analyzer($expr0)")() :: - Alias(expr1, s"Analyzer($expr1)")() :: - Alias(expr2, s"Analyzer($expr2)")() :: - Alias(expr3, s"Analyzer($expr3)")() :: - Alias(expr4, s"Analyzer($expr4)")() :: Nil, testRelation2)) + val plan = caseInsensitiveAnalyze( + testRelation2.select( + 'a / Literal(2) as 'div1, + 'a / 'b as 'div2, + 'a / 'c as 'div3, + 'a / 'd as 'div4, + 'e / 'e as 'div5)) val pl = plan.asInstanceOf[Project].projectList + assert(pl(0).dataType == DoubleType) assert(pl(1).dataType == DoubleType) assert(pl(2).dataType == DoubleType) From 34eb3a46efe497c34f417b1701e6ef722d57f077 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 12 Feb 2015 00:01:11 -0800 Subject: [PATCH 2/8] more work --- .../spark/sql/catalyst/analysis/Analyzer.scala | 13 ++++++------- .../sql/catalyst/expressions/namedExpressions.scala | 2 +- .../spark/sql/catalyst/analysis/AnalysisSuite.scala | 11 +++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 6 ++---- 4 files changed, 20 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index df2f10de68463..105282c22aa7a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -83,7 +83,7 @@ class Analyzer(catalog: Catalog, operator transformAllExpressions { case a: Attribute if !a.resolved => val from = operator.inputSet.map(_.name).mkString("{", ", ", "}") - failAnalysis(s"cannot resolve '$a' given input columns $from") + failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from") case c: Cast if !c.resolved => failAnalysis( @@ -93,13 +93,13 @@ class Analyzer(catalog: Catalog, failAnalysis( s"invalid expression ${b.prettyString} " + s"between ${b.left.simpleString} and ${b.right.simpleString}") - - } operator match { case f: Filter if f.condition.dataType != BooleanType => - failAnalysis(s"filter expression '${f.condition.prettyString}' is not a boolean.") + failAnalysis( + s"filter expression '${f.condition.prettyString}' " + + s"of type ${f.condition.dataType.simpleString} is not a boolean.") case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) => def isValidAggregateExpression(expr: Expression): Boolean = expr match { @@ -118,11 +118,10 @@ class Analyzer(catalog: Catalog, case Alias(g: GetField, _) => g }) }.foreach { e => - failAnalysis(s"expression must be aggregates or be in group by $e") + failAnalysis( + s"expression '${e.prettyString}' is not an aggregate function or in the group by") } - aggregatePlan - case o if o.children.nonEmpty && !o.references.subsetOf(o.inputSet) => val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(",") val input = o.inputSet.map(_.prettyString).mkString(",") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index f77c56311cc8c..62c062be6d820 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -218,7 +218,7 @@ case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[E override def exprId: ExprId = ??? override def eval(input: Row): EvaluatedType = ??? override def nullable: Boolean = ??? - override def dataType: DataType = ??? + override def dataType: DataType = NullType } object VirtualColumn { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 99083d35ad235..e70c651e1486e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -136,6 +136,17 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { testRelation.select(Literal(1).cast(BinaryType).as('badCast)), "invalid cast" :: Literal(1).dataType.simpleString :: BinaryType.simpleString :: Nil) + errorTest( + "non-boolean filters", + testRelation.where(Literal(1)), + "filter" :: "'1'" :: "not a boolean" :: Literal(1).dataType.simpleString :: Nil) + + errorTest( + "missing group by", + testRelation2.groupBy('a)('b), + "'b'" :: "group by" :: Nil + ) + case class UnresolvedTestPlan() extends LeafNode { override lazy val resolved = false override def output = Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bba8899651259..a1c8cf58f2357 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -806,10 +806,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { test("throw errors for non-aggregate attributes with aggregation") { def checkAggregation(query: String, isInvalidQuery: Boolean = true) { if (isInvalidQuery) { - val e = intercept[TreeNodeException[LogicalPlan]](sql(query).queryExecution.analyzed) - assert( - e.getMessage.startsWith("Expression not in GROUP BY"), - "Non-aggregate attribute(s) not detected\n") + val e = intercept[AnalysisException](sql(query).queryExecution.analyzed) + assert(e.getMessage contains "group by") } else { // Should not throw sql(query).queryExecution.analyzed From af9e66803eb408f7dbc02c1627141051580ce7b2 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 12 Feb 2015 00:05:59 -0800 Subject: [PATCH 3/8] no braces --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 105282c22aa7a..fa315e389f3b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -82,7 +82,7 @@ class Analyzer(catalog: Catalog, case operator: LogicalPlan => operator transformAllExpressions { case a: Attribute if !a.resolved => - val from = operator.inputSet.map(_.name).mkString("{", ", ", "}") + val from = operator.inputSet.map(_.name).mkString(", ") failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from") case c: Cast if !c.resolved => From d4e9015a11922d5e25f824c0839817840073ec35 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 12 Feb 2015 00:10:09 -0800 Subject: [PATCH 4/8] add comment --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fa315e389f3b1..e5314d74ed8df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -78,6 +78,8 @@ class Analyzer(catalog: Catalog, def failAnalysis(msg: String) = { throw new AnalysisException(msg) } def apply(plan: LogicalPlan): LogicalPlan = { + // We transform up and order the rules so as to catch the first possible failure instead + // of the result of cascading resolution failures. plan.foreachUp { case operator: LogicalPlan => operator transformAllExpressions { From 1a797b4199584b70855d20e878423bc7755f0007 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 12 Feb 2015 00:29:57 -0800 Subject: [PATCH 5/8] comments --- .../sql/catalyst/analysis/Analyzer.scala | 44 ++++++++++--------- .../BooleanSimplificationSuite.scala | 4 +- .../optimizer/ConstantFoldingSuite.scala | 4 +- .../optimizer/FilterPushdownSuite.scala | 12 ++--- .../catalyst/optimizer/OptimizeInSuite.scala | 4 +- .../optimizer/UnionPushdownSuite.scala | 4 +- .../apache/spark/sql/hive/HiveContext.scala | 4 +- .../spark/sql/hive/execution/commands.scala | 4 +- 8 files changed, 41 insertions(+), 39 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e5314d74ed8df..1a02939895486 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -67,8 +67,8 @@ class Analyzer(catalog: Catalog, extendedRules : _*), Batch("Check Analysis", Once, CheckResolution), - Batch("AnalysisOperators", fixedPoint, - EliminateAnalysisOperators) + Batch("Remove SubQueries", fixedPoint, + EliminateSubQueries) ) /** @@ -104,25 +104,27 @@ class Analyzer(catalog: Catalog, s"of type ${f.condition.dataType.simpleString} is not a boolean.") case aggregatePlan @ Aggregate(groupingExprs, aggregateExprs, child) => - def isValidAggregateExpression(expr: Expression): Boolean = expr match { - case _: AggregateExpression => true - case e: Attribute => groupingExprs.contains(e) - case e if groupingExprs.contains(e) => true - case e if e.references.isEmpty => true - case e => e.children.forall(isValidAggregateExpression) + def checkValidAggregateExpression(expr: Expression): Unit = expr match { + case _: AggregateExpression => // OK + case e: Attribute if !groupingExprs.contains(e) => + failAnalysis( + s"expression '${e.prettyString}' is neither present in the group by, " + + s"nor is it an aggregate function. " + + "Add to group by or wrap in first() if you don't care which value you get.") + case e: Attribute => // OK + case e if groupingExprs.contains(e) => // OK + case e if e.references.isEmpty => // OK + case e => e.children.foreach(checkValidAggregateExpression) } - aggregateExprs.find { e => - !isValidAggregateExpression(e.transform { - // Should trim aliases around `GetField`s. These aliases are introduced while - // resolving struct field accesses, because `GetField` is not a `NamedExpression`. - // (Should we just turn `GetField` into a `NamedExpression`?) - case Alias(g: GetField, _) => g - }) - }.foreach { e => - failAnalysis( - s"expression '${e.prettyString}' is not an aggregate function or in the group by") - } + val cleaned = aggregateExprs.map(_.transform { + // Should trim aliases around `GetField`s. These aliases are introduced while + // resolving struct field accesses, because `GetField` is not a `NamedExpression`. + // (Should we just turn `GetField` into a `NamedExpression`?) + case Alias(g, _) => g + }) + + cleaned.foreach(checkValidAggregateExpression) case o if o.children.nonEmpty && !o.references.subsetOf(o.inputSet) => val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(",") @@ -247,7 +249,7 @@ class Analyzer(catalog: Catalog, def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ InsertIntoTable(UnresolvedRelation(tableIdentifier, alias), _, _, _) => i.copy( - table = EliminateAnalysisOperators(catalog.lookupRelation(tableIdentifier, alias))) + table = EliminateSubQueries(catalog.lookupRelation(tableIdentifier, alias))) case UnresolvedRelation(tableIdentifier, alias) => catalog.lookupRelation(tableIdentifier, alias) } @@ -494,7 +496,7 @@ class Analyzer(catalog: Catalog, * only required to provide scoping information for attributes and can be removed once analysis is * complete. */ -object EliminateAnalysisOperators extends Rule[LogicalPlan] { +object EliminateSubQueries extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case Subquery(_, child) => child } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 264a0eff37d34..72f06e26e05f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.PlanTest @@ -30,7 +30,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("AnalysisNodes", Once, - EliminateAnalysisOperators) :: + EliminateSubQueries) :: Batch("Constant Folding", FixedPoint(50), NullPropagation, ConstantFolding, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index e22c62505860a..ef10c0aece716 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, EliminateAnalysisOperators} +import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, EliminateSubQueries} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.PlanTest @@ -33,7 +33,7 @@ class ConstantFoldingSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("AnalysisNodes", Once, - EliminateAnalysisOperators) :: + EliminateSubQueries) :: Batch("ConstantFolding", Once, ConstantFolding, BooleanSimplification) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 1158b5dfc6147..0b74bacb18f4b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.expressions.Explode import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter} @@ -32,7 +32,7 @@ class FilterPushdownSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, - EliminateAnalysisOperators) :: + EliminateSubQueries) :: Batch("Filter Pushdown", Once, CombineFilters, PushPredicateThroughProject, @@ -351,7 +351,7 @@ class FilterPushdownSuite extends PlanTest { } val optimized = Optimize(originalQuery.analyze) - comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized) + comparePlans(analysis.EliminateSubQueries(originalQuery.analyze), optimized) } test("joins: conjunctive predicates") { @@ -370,7 +370,7 @@ class FilterPushdownSuite extends PlanTest { left.join(right, condition = Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } test("joins: conjunctive predicates #2") { @@ -389,7 +389,7 @@ class FilterPushdownSuite extends PlanTest { left.join(right, condition = Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } test("joins: conjunctive predicates #3") { @@ -412,7 +412,7 @@ class FilterPushdownSuite extends PlanTest { condition = Some("z.a".attr === "x.b".attr)) .analyze - comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer)) + comparePlans(optimized, analysis.EliminateSubQueries(correctAnswer)) } val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index da912ab382179..233e329cb2038 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.immutable.HashSet -import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubQueries, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.plans.PlanTest @@ -34,7 +34,7 @@ class OptimizeInSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("AnalysisNodes", Once, - EliminateAnalysisOperators) :: + EliminateSubQueries) :: Batch("ConstantFolding", Once, ConstantFolding, BooleanSimplification, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala index dfef87bd9133d..a54751dfa9a12 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/UnionPushdownSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{PlanTest, LeftOuter, RightOuter} import org.apache.spark.sql.catalyst.rules._ @@ -29,7 +29,7 @@ class UnionPushdownSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Subqueries", Once, - EliminateAnalysisOperators) :: + EliminateSubQueries) :: Batch("Union Pushdown", Once, UnionPushdown) :: Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 7ae6ed6f841bf..ddc7b181d4d46 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -37,7 +37,7 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperators, OverrideCatalog, OverrideFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateSubQueries, OverrideCatalog, OverrideFunctionRegistry} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} @@ -104,7 +104,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { */ @Experimental def analyze(tableName: String) { - val relation = EliminateAnalysisOperators(catalog.lookupRelation(Seq(tableName))) + val relation = EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) relation match { case relation: MetastoreRelation => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index f6bea1c6a6fe1..ce0db1125c27f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators +import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.{DataFrame, SQLContext} @@ -175,7 +175,7 @@ case class CreateMetastoreDataSourceAsSelect( val resolved = ResolvedDataSource(sqlContext, Some(query.schema), provider, optionsWithPath) val createdRelation = LogicalRelation(resolved.relation) - EliminateAnalysisOperators(sqlContext.table(tableName).logicalPlan) match { + EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match { case l @ LogicalRelation(i: InsertableRelation) => if (l.schema != createdRelation.schema) { val errorDescription = From d29fbde07fa52e78b1236a1fa236484010ff6dbc Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Thu, 12 Feb 2015 00:31:35 -0800 Subject: [PATCH 6/8] extra case --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1a02939895486..a17989aa62282 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -111,7 +111,6 @@ class Analyzer(catalog: Catalog, s"expression '${e.prettyString}' is neither present in the group by, " + s"nor is it an aggregate function. " + "Add to group by or wrap in first() if you don't care which value you get.") - case e: Attribute => // OK case e if groupingExprs.contains(e) => // OK case e if e.references.isEmpty => // OK case e => e.children.foreach(checkValidAggregateExpression) From f279a71008b18cdaeea8bbf029f8dae8154db76b Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 12 Feb 2015 20:01:11 +0800 Subject: [PATCH 7/8] make right references for ScriptTransformation --- .../sql/catalyst/plans/logical/ScriptTransformation.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala index cfe2c7a39a17c..ccf5291219add 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Attribute, Expression} /** * Transforms the input by forking and running the specified script. @@ -32,7 +32,9 @@ case class ScriptTransformation( script: String, output: Seq[Attribute], child: LogicalPlan, - ioschema: ScriptInputOutputSchema) extends UnaryNode + ioschema: ScriptInputOutputSchema) extends UnaryNode { + override def references: AttributeSet = AttributeSet(input.flatMap(_.references)) +} /** * A placeholder for implementation specific input and output properties when passing data From fa38881a81f169fe46c726168a49f0ec8bba73d1 Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 12 Feb 2015 20:01:37 +0800 Subject: [PATCH 8/8] fix for grouping__id --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a17989aa62282..58a7003977c93 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -82,7 +82,7 @@ class Analyzer(catalog: Catalog, // of the result of cascading resolution failures. plan.foreachUp { case operator: LogicalPlan => - operator transformAllExpressions { + operator transformExpressionsUp { case a: Attribute if !a.resolved => val from = operator.inputSet.map(_.name).mkString(", ") failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from") @@ -125,7 +125,8 @@ class Analyzer(catalog: Catalog, cleaned.foreach(checkValidAggregateExpression) - case o if o.children.nonEmpty && !o.references.subsetOf(o.inputSet) => + case o if o.children.nonEmpty && + !o.references.filter(_.name != "grouping__id").subsetOf(o.inputSet) => val missingAttributes = (o.references -- o.inputSet).map(_.prettyString).mkString(",") val input = o.inputSet.map(_.prettyString).mkString(",")