From 029f9bd574e5c76ed594a95fe0825d2e8eb0b7be Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 22 Mar 2015 21:51:48 +0800 Subject: [PATCH 1/2] Checks for missing attributes and unresolved operator for all types of operator --- .../sql/catalyst/analysis/CheckAnalysis.scala | 18 ++++++++++++------ .../sql/catalyst/analysis/AnalysisSuite.scala | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fb975ee5e7296..51a7961de1082 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -63,7 +63,7 @@ class CheckAnalysis { s"filter expression '${f.condition.prettyString}' " + s"of type ${f.condition.dataType.simpleString} is not a boolean.") - case aggregatePlan@Aggregate(groupingExprs, aggregateExprs, child) => + case Aggregate(groupingExprs, aggregateExprs, child) => def checkValidAggregateExpression(expr: Expression): Unit = expr match { case _: AggregateExpression => // OK case e: Attribute if !groupingExprs.contains(e) => @@ -85,13 +85,19 @@ class CheckAnalysis { cleaned.foreach(checkValidAggregateExpression) - case o if o.children.nonEmpty && o.missingInput.nonEmpty => - val missingAttributes = o.missingInput.map(_.prettyString).mkString(",") - val input = o.inputSet.map(_.prettyString).mkString(",") + case _ => // Fallbacks to the following checks + } + + operator match { + case o if o.children.nonEmpty && + !o.references.filter(_.name != "grouping__id").subsetOf(o.inputSet) => + val missingAttributes = (o.references -- o.inputSet).mkString(",") + val input = o.inputSet.mkString(",") - failAnalysis(s"resolved attributes $missingAttributes missing from $input") + failAnalysis( + s"resolved attribute(s) $missingAttributes missing from $input " + + s"in operator ${operator.simpleString}") - // Catch all case o if !o.resolved => failAnalysis( s"unresolved operator ${operator.simpleString}") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index c1dd5aa913ddc..6a5d9fcb13008 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -199,4 +199,21 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { assert(pl(3).dataType == DecimalType.Unlimited) assert(pl(4).dataType == DoubleType) } + + test("SPARK-6452: CheckAnalysis should throw when Aggregate contains missing attributes") { + val plan = + Aggregate( + Nil, + Alias(Sum(AttributeReference("a", StringType)(exprId = ExprId(1))), "b")() :: Nil, + LocalRelation( + AttributeReference("a", StringType)(exprId = ExprId(2)))) + + assert(plan.resolved) + + val message = intercept[AnalysisException] { + caseSensitiveAnalyze(plan) + }.getMessage + + assert(message.contains("resolved attribute(s) a#1 missing from a#2")) + } } From 52cdc69fcbf40968628b62366891fd5e43b80299 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 23 Mar 2015 15:16:35 +0800 Subject: [PATCH 2/2] Addresses comments --- .../apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 5 ++--- .../org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 7 +++++-- .../apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 3 ++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 51a7961de1082..425e1e41cbf21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -89,9 +89,8 @@ class CheckAnalysis { } operator match { - case o if o.children.nonEmpty && - !o.references.filter(_.name != "grouping__id").subsetOf(o.inputSet) => - val missingAttributes = (o.references -- o.inputSet).mkString(",") + case o if o.children.nonEmpty && o.missingInput.nonEmpty => + val missingAttributes = o.missingInput.mkString(",") val input = o.inputSet.mkString(",") failAnalysis( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 400a6b2825c10..48191f31198f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -47,9 +47,12 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy * Attributes that are referenced by expressions but not provided by this nodes children. * Subclasses should override this method if they produce attributes internally as it is used by * assertions designed to prevent the construction of invalid plans. + * + * Note that virtual columns should be excluded. Currently, we only support the grouping ID + * virtual column. */ - def missingInput: AttributeSet = (references -- inputSet) - .filter(_.name != VirtualColumn.groupingIdName) + def missingInput: AttributeSet = + (references -- inputSet).filter(_.name != VirtualColumn.groupingIdName) /** * Runs [[transform]] with `rule` on all expressions present in this query operator. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 6a5d9fcb13008..359aec4a7b5ab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -200,7 +200,8 @@ class AnalysisSuite extends FunSuite with BeforeAndAfter { assert(pl(4).dataType == DoubleType) } - test("SPARK-6452: CheckAnalysis should throw when Aggregate contains missing attributes") { + test("SPARK-6452 regression test") { + // CheckAnalysis should throw AnalysisException when Aggregate contains missing attribute(s) val plan = Aggregate( Nil,