From 77b64dbe8513a55420b83f368063d3d2e4a02767 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sun, 24 Mar 2019 10:47:38 -0700 Subject: [PATCH 1/9] [SPARK-27255] Aggregate functions should not be allowed in WHERE --- .../sql/catalyst/analysis/CheckAnalysis.scala | 11 +++++--- .../analysis/AnalysisErrorSuite.scala | 8 ++++++ .../resources/sql-tests/inputs/group-by.sql | 8 ++++++ .../negative-cases/invalid-correlation.sql | 5 ++-- .../sql-tests/results/group-by.sql.out | 28 ++++++++++++++++++- .../invalid-correlation.sql.out | 5 ++-- 6 files changed, 56 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 18c40b370cb5..fc8943e550fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -80,6 +80,10 @@ trait CheckAnalysis extends PredicateHelper { } } + def isAggregateExpression(expr: Expression): Boolean = { + expr.isInstanceOf[AggregateExpression] || PythonUDF.isGroupedAggPandasUDF(expr) + } + def checkAnalysis(plan: LogicalPlan): Unit = { // We transform up and order the rules so as to catch the first possible failure instead // of the result of cascading resolution failures. @@ -172,16 +176,15 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("Null-aware predicate sub-queries cannot be used in nested " + s"conditions: $condition") + case Filter(condition, _) if condition.find(isAggregateExpression(_)).isDefined => + failAnalysis("Aggregate expressions are not allowed in where clause of the query") + case j @ Join(_, _, _, Some(condition), _) if condition.dataType != BooleanType => failAnalysis( s"join condition '${condition.sql}' " + s"of type ${condition.dataType.catalogString} is not a boolean.") case Aggregate(groupingExprs, aggregateExprs, child) => - def isAggregateExpression(expr: Expression) = { - expr.isInstanceOf[AggregateExpression] || PythonUDF.isGroupedAggPandasUDF(expr) - } - def checkValidAggregateExpression(expr: Expression): Unit = expr match { case expr: Expression if isAggregateExpression(expr) => val aggFunction = expr match { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index ec71c1c93452..7ac0de65a663 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -599,4 +599,12 @@ class AnalysisErrorSuite extends AnalysisTest { assertAnalysisError(plan5, "Accessing outer query column is not allowed in" :: Nil) } + + test("Error on filter condition containing aggregate expressions") { + val a = AttributeReference("a", IntegerType)() + val b = AttributeReference("b", IntegerType)() + val plan = Filter('a === UnresolvedFunction("max", Seq(b), true), LocalRelation(a, b)) + assertAnalysisError(plan, + "Aggregate expressions are not allowed in where clause of the query" :: Nil) + } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql index 7e81ff1aba37..106f93c8858f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql @@ -141,3 +141,11 @@ SELECT every("true"); SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; + +-- Aggrgate expressions can be referenced through an alias +SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1; + +-- Error when aggregate expressions are in where clause directly +SELECT count(*) FROM test_agg WHERE count(*) > 1; +SELECT count(*) FROM test_agg WHERE count(*) + 1 > 1; + diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql index e22cade93679..109ffa77d621 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/negative-cases/invalid-correlation.sql @@ -46,9 +46,10 @@ WHERE t1a IN (SELECT min(t2a) SELECT t1a FROM t1 GROUP BY 1 -HAVING EXISTS (SELECT 1 +HAVING EXISTS (SELECT t2a FROM t2 - WHERE t2a < min(t1a + t2a)); + GROUP BY 1 + HAVING t2a < min(t1a + t2a)); -- TC 01.04 -- Invalid due to mixure of outer and local references under an AggegatedExpression diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index daf47c4d0a39..9ddb4d3bb8c5 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 46 +-- Number of queries: 49 -- !query 0 @@ -459,3 +459,29 @@ struct 1 +-- !query 46 schema +struct +-- !query 46 output +10 + + +-- !query 47 +SELECT count(*) FROM test_agg WHERE count(*) > 1 +-- !query 47 schema +struct<> +-- !query 47 output +org.apache.spark.sql.AnalysisException +Aggregate expressions are not allowed in where clause of the query; + + +-- !query 48 +SELECT count(*) FROM test_agg WHERE count(*) + 1 > 1 +-- !query 48 schema +struct<> +-- !query 48 output +org.apache.spark.sql.AnalysisException +Aggregate expressions are not allowed in where clause of the query; diff --git a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out index e49978ddb1ce..7b47a6139f60 100644 --- a/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/subquery/negative-cases/invalid-correlation.sql.out @@ -70,9 +70,10 @@ Resolved attribute(s) t2b#x missing from min(t2a)#x,t2c#x in operator !Filter t2 SELECT t1a FROM t1 GROUP BY 1 -HAVING EXISTS (SELECT 1 +HAVING EXISTS (SELECT t2a FROM t2 - WHERE t2a < min(t1a + t2a)) + GROUP BY 1 + HAVING t2a < min(t1a + t2a)) -- !query 5 schema struct<> -- !query 5 output From 2bc8e3a3caa39be12965163ad7d2a48953e080be Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 27 Mar 2019 15:20:48 -0700 Subject: [PATCH 2/9] Review --- .../sql/catalyst/analysis/CheckAnalysis.scala | 13 ++--- .../sql/catalyst/optimizer/Optimizer.scala | 24 +-------- .../catalyst/plans/logical/PlanHelper.scala | 53 +++++++++++++++++++ .../analysis/AnalysisErrorSuite.scala | 2 +- .../sql-tests/results/group-by.sql.out | 4 +- 5 files changed, 64 insertions(+), 32 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index fc8943e550fa..2a92562e6c64 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -80,10 +80,6 @@ trait CheckAnalysis extends PredicateHelper { } } - def isAggregateExpression(expr: Expression): Boolean = { - expr.isInstanceOf[AggregateExpression] || PythonUDF.isGroupedAggPandasUDF(expr) - } - def checkAnalysis(plan: LogicalPlan): Unit = { // We transform up and order the rules so as to catch the first possible failure instead // of the result of cascading resolution failures. @@ -176,8 +172,9 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("Null-aware predicate sub-queries cannot be used in nested " + s"conditions: $condition") - case Filter(condition, _) if condition.find(isAggregateExpression(_)).isDefined => - failAnalysis("Aggregate expressions are not allowed in where clause of the query") + case f@ Filter(condition, _) if PlanHelper.specialExpressionInUnsupportedOperator(f) => + failAnalysis("Aggregate/Window/Generate expressions are not allowed in where " + + "clause of the query") case j @ Join(_, _, _, Some(condition), _) if condition.dataType != BooleanType => failAnalysis( @@ -185,6 +182,10 @@ trait CheckAnalysis extends PredicateHelper { s"of type ${condition.dataType.catalogString} is not a boolean.") case Aggregate(groupingExprs, aggregateExprs, child) => + def isAggregateExpression(expr: Expression): Boolean = { + expr.isInstanceOf[AggregateExpression] || PythonUDF.isGroupedAggPandasUDF(expr) + } + def checkValidAggregateExpression(expr: Expression): Unit = expr match { case expr: Expression if isAggregateExpression(expr) => val aggFunction = expr match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index d4eb516534f1..dbcb9d4cc5c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -52,29 +52,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) * Returns true when all operators are integral. */ private def checkSpecialExpressionIntegrity(plan: LogicalPlan): Boolean = { - plan.find(specialExpressionInUnsupportedOperator).isEmpty - } - - /** - * Check if there's any expression in this query plan operator that is - * - A WindowExpression but the plan is not Window - * - An AggregateExpresion but the plan is not Aggregate or Window - * - A Generator but the plan is not Generate - * Returns true when this operator breaks structural integrity with one of the cases above. - */ - private def specialExpressionInUnsupportedOperator(plan: LogicalPlan): Boolean = { - val exprs = plan.expressions - exprs.flatMap { root => - root.find { - case e: WindowExpression - if !plan.isInstanceOf[Window] => true - case e: AggregateExpression - if !(plan.isInstanceOf[Aggregate] || plan.isInstanceOf[Window]) => true - case e: Generator - if !plan.isInstanceOf[Generate] => true - case _ => false - } - }.nonEmpty + plan.find(PlanHelper.specialExpressionInUnsupportedOperator).isEmpty } protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala new file mode 100644 index 000000000000..06ba7f50223b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Generator, WindowExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression + +/** + * [[PlanHelper]] Contains utility methods that can be used by Analyzer and Optimizer. + * It can also be container of methods that are common across multiple rules in Analyzer + * and optimizer. + */ +object PlanHelper { + /** + * Check if there's any expression in this query plan operator that is + * - A WindowExpression but the plan is not Window + * - An AggregateExpresion but the plan is not Aggregate or Window + * - A Generator but the plan is not Generate + * Returns true when this operator hosts illegal expressions. This can happen when + * 1. The input query from users contain invalid expressions. + * Example : SELECT * FROM tab WHERE max(c1) > 0 + * 2. Query rewrites inadvertently produce plans that are invalid. + */ + def specialExpressionInUnsupportedOperator(plan: LogicalPlan): Boolean = { + val exprs = plan.expressions + exprs.flatMap { root => + root.find { + case e: WindowExpression + if !plan.isInstanceOf[Window] => true + case e: AggregateExpression + if !(plan.isInstanceOf[Aggregate] || plan.isInstanceOf[Window]) => true + case e: Generator + if !plan.isInstanceOf[Generate] => true + case _ => false + } + }.nonEmpty + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 7ac0de65a663..273dbf03733e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -605,6 +605,6 @@ class AnalysisErrorSuite extends AnalysisTest { val b = AttributeReference("b", IntegerType)() val plan = Filter('a === UnresolvedFunction("max", Seq(b), true), LocalRelation(a, b)) assertAnalysisError(plan, - "Aggregate expressions are not allowed in where clause of the query" :: Nil) + "Aggregate/Window/Generate expressions are not allowed in where clause of the query" :: Nil) } } diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index 9ddb4d3bb8c5..9d7febd3c376 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -475,7 +475,7 @@ SELECT count(*) FROM test_agg WHERE count(*) > 1 struct<> -- !query 47 output org.apache.spark.sql.AnalysisException -Aggregate expressions are not allowed in where clause of the query; +Aggregate/Window/Generate expressions are not allowed in where clause of the query; -- !query 48 @@ -484,4 +484,4 @@ SELECT count(*) FROM test_agg WHERE count(*) + 1 > 1 struct<> -- !query 48 output org.apache.spark.sql.AnalysisException -Aggregate expressions are not allowed in where clause of the query; +Aggregate/Window/Generate expressions are not allowed in where clause of the query; From 2ed1360f415d2b65a6297c0a42058c92c81df83b Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Wed, 27 Mar 2019 15:28:46 -0700 Subject: [PATCH 3/9] minor --- .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 2a92562e6c64..e34ab2e2d0a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -172,7 +172,7 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("Null-aware predicate sub-queries cannot be used in nested " + s"conditions: $condition") - case f@ Filter(condition, _) if PlanHelper.specialExpressionInUnsupportedOperator(f) => + case f @ Filter(condition, _) if PlanHelper.specialExpressionInUnsupportedOperator(f) => failAnalysis("Aggregate/Window/Generate expressions are not allowed in where " + "clause of the query") From 5fac069790ff3fc3ce01a470b1eb5a789b0dc72e Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 28 Mar 2019 17:38:52 -0700 Subject: [PATCH 4/9] Code review --- .../sql/catalyst/analysis/CheckAnalysis.scala | 4 +++ .../resources/sql-tests/inputs/group-by.sql | 4 +++ .../sql-tests/results/group-by.sql.out | 34 ++++++++++++++----- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index e34ab2e2d0a3..5f4bfa320232 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -380,6 +380,10 @@ trait CheckAnalysis extends PredicateHelper { throw new IllegalStateException( "Internal error: logical hint operator should have been removed during analysis") + case other if PlanHelper.specialExpressionInUnsupportedOperator(other) => + failAnalysis(s"The query operator `${other.nodeName}` contains " + + "one or more unsupported expression types Aggregate, Window or Generate.") + case _ => // Analysis successful! } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql index 106f93c8858f..941558ad21b6 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql @@ -142,6 +142,10 @@ SELECT k, v, every(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; +-- Having referencing aggregate expressions is ok. +SELECT count(*) FROM test_agg HAVING count(*) > 1; +SELECT k, max(v) FROM test_agg GROUP BY k HAVING max(v) = true; + -- Aggrgate expressions can be referenced through an alias SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1; diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index 9d7febd3c376..2ba296c9242f 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 49 +-- Number of queries: 51 -- !query 0 @@ -462,26 +462,44 @@ struct 1 +SELECT count(*) FROM test_agg HAVING count(*) > 1 -- !query 46 schema -struct +struct -- !query 46 output 10 -- !query 47 -SELECT count(*) FROM test_agg WHERE count(*) > 1 +SELECT k, max(v) FROM test_agg GROUP BY k HAVING max(v) = true -- !query 47 schema -struct<> +struct -- !query 47 output +1 true +2 true +5 true + + +-- !query 48 +SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1 +-- !query 48 schema +struct +-- !query 48 output +10 + + +-- !query 49 +SELECT count(*) FROM test_agg WHERE count(*) > 1 +-- !query 49 schema +struct<> +-- !query 49 output org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not allowed in where clause of the query; --- !query 48 +-- !query 50 SELECT count(*) FROM test_agg WHERE count(*) + 1 > 1 --- !query 48 schema +-- !query 50 schema struct<> --- !query 48 output +-- !query 50 output org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not allowed in where clause of the query; From a86114d747001a352bf587970c188ddb046ccc27 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Sat, 30 Mar 2019 00:22:37 -0700 Subject: [PATCH 5/9] Code review --- .../sql/catalyst/analysis/CheckAnalysis.scala | 11 +++++++---- .../catalyst/analysis/AnalysisErrorSuite.scala | 2 +- .../resources/sql-tests/inputs/group-by.sql | 8 ++++---- .../sql-tests/results/group-by.sql.out | 18 ++++++++++++------ 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 5f4bfa320232..385bff482f0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -172,10 +172,6 @@ trait CheckAnalysis extends PredicateHelper { failAnalysis("Null-aware predicate sub-queries cannot be used in nested " + s"conditions: $condition") - case f @ Filter(condition, _) if PlanHelper.specialExpressionInUnsupportedOperator(f) => - failAnalysis("Aggregate/Window/Generate expressions are not allowed in where " + - "clause of the query") - case j @ Join(_, _, _, Some(condition), _) if condition.dataType != BooleanType => failAnalysis( s"join condition '${condition.sql}' " + @@ -380,6 +376,13 @@ trait CheckAnalysis extends PredicateHelper { throw new IllegalStateException( "Internal error: logical hint operator should have been removed during analysis") + case f @ Filter(condition, _) if PlanHelper.specialExpressionInUnsupportedOperator(f) => + failAnalysis( + s""" + |Aggregate/Window/Generate expressions are not valid in where clause of the query. + |Expression in where clause: ${condition.sql} + """.stripMargin) + case other if PlanHelper.specialExpressionInUnsupportedOperator(other) => failAnalysis(s"The query operator `${other.nodeName}` contains " + "one or more unsupported expression types Aggregate, Window or Generate.") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index 273dbf03733e..55ce93ead4a8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -605,6 +605,6 @@ class AnalysisErrorSuite extends AnalysisTest { val b = AttributeReference("b", IntegerType)() val plan = Filter('a === UnresolvedFunction("max", Seq(b), true), LocalRelation(a, b)) assertAnalysisError(plan, - "Aggregate/Window/Generate expressions are not allowed in where clause of the query" :: Nil) + "Aggregate/Window/Generate expressions are not valid in where clause of the query" :: Nil) } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql index 941558ad21b6..3f40bed07922 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql @@ -143,13 +143,13 @@ SELECT k, v, some(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; SELECT k, v, any(v) OVER (PARTITION BY k ORDER BY v) FROM test_agg; -- Having referencing aggregate expressions is ok. -SELECT count(*) FROM test_agg HAVING count(*) > 1; +SELECT count(*) FROM test_agg HAVING count(*) > 1L; SELECT k, max(v) FROM test_agg GROUP BY k HAVING max(v) = true; -- Aggrgate expressions can be referenced through an alias -SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1; +SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1L; -- Error when aggregate expressions are in where clause directly -SELECT count(*) FROM test_agg WHERE count(*) > 1; -SELECT count(*) FROM test_agg WHERE count(*) + 1 > 1; +SELECT count(*) FROM test_agg WHERE count(*) > 1L; +SELECT count(*) FROM test_agg WHERE count(*) + 1L > 1L; diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index 2ba296c9242f..884018b319db 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -462,7 +462,7 @@ struct 1 +SELECT count(*) FROM test_agg HAVING count(*) > 1L -- !query 46 schema struct -- !query 46 output @@ -480,7 +480,7 @@ struct -- !query 48 -SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1 +SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1L -- !query 48 schema struct -- !query 48 output @@ -488,18 +488,24 @@ struct -- !query 49 -SELECT count(*) FROM test_agg WHERE count(*) > 1 +SELECT count(*) FROM test_agg WHERE count(*) > 1L -- !query 49 schema struct<> -- !query 49 output org.apache.spark.sql.AnalysisException -Aggregate/Window/Generate expressions are not allowed in where clause of the query; + +Aggregate/Window/Generate expressions are not valid in where clause of the query. +Expression in where clause: (count(1) > 1L) + ; -- !query 50 -SELECT count(*) FROM test_agg WHERE count(*) + 1 > 1 +SELECT count(*) FROM test_agg WHERE count(*) + 1L > 1L -- !query 50 schema struct<> -- !query 50 output org.apache.spark.sql.AnalysisException -Aggregate/Window/Generate expressions are not allowed in where clause of the query; + +Aggregate/Window/Generate expressions are not valid in where clause of the query. +Expression in where clause: ((count(1) + 1L) > 1L) + ; From 3ae6cd0c76b1695aecd14725dd28c5ce0a83908a Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 1 Apr 2019 00:01:29 -0700 Subject: [PATCH 6/9] Code review --- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../spark/sql/catalyst/optimizer/Optimizer.scala | 12 ++---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 385bff482f0a..50023c498cb8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -384,7 +384,7 @@ trait CheckAnalysis extends PredicateHelper { """.stripMargin) case other if PlanHelper.specialExpressionInUnsupportedOperator(other) => - failAnalysis(s"The query operator `${other.nodeName}` contains " + + failAnalysis(s"The query operator `${other.nodeName}` contains " + "one or more unsupported expression types Aggregate, Window or Generate.") case _ => // Analysis successful! diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index dbcb9d4cc5c4..0dd73b05d136 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -43,16 +43,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // - is still resolved // - only host special expressions in supported operators override protected def isPlanIntegral(plan: LogicalPlan): Boolean = { - !Utils.isTesting || (plan.resolved && checkSpecialExpressionIntegrity(plan)) - } - - /** - * Check if all operators in this plan hold structural integrity with regards to hosting special - * expressions. - * Returns true when all operators are integral. - */ - private def checkSpecialExpressionIntegrity(plan: LogicalPlan): Boolean = { - plan.find(PlanHelper.specialExpressionInUnsupportedOperator).isEmpty + !Utils.isTesting || (plan.resolved && + plan.find(PlanHelper.specialExpressionInUnsupportedOperator).isEmpty) } protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) From f2e5174661088dafdeb3e6db8b598059a12db69e Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 1 Apr 2019 08:31:27 -0700 Subject: [PATCH 7/9] Code review --- .../apache/spark/sql/catalyst/plans/logical/PlanHelper.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala index 06ba7f50223b..520bf16d33bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.catalyst.expressions.{Generator, WindowExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression /** - * [[PlanHelper]] Contains utility methods that can be used by Analyzer and Optimizer. + * [[PlanHelper]] contains utility methods that can be used by Analyzer and Optimizer. * It can also be container of methods that are common across multiple rules in Analyzer - * and optimizer. + * and Optimizer. */ object PlanHelper { /** From f664d1ce3c90221be9b56ad7b2210bd60f67af4e Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Mon, 1 Apr 2019 17:19:20 -0700 Subject: [PATCH 8/9] Code review --- .../sql/catalyst/analysis/CheckAnalysis.scala | 21 ++++++++++++++----- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../catalyst/plans/logical/PlanHelper.scala | 20 +++++++++--------- .../resources/sql-tests/inputs/group-by.sql | 1 + .../sql-tests/results/group-by.sql.out | 21 ++++++++++++++++--- 5 files changed, 46 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 50023c498cb8..055c2734bf9c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -376,16 +376,27 @@ trait CheckAnalysis extends PredicateHelper { throw new IllegalStateException( "Internal error: logical hint operator should have been removed during analysis") - case f @ Filter(condition, _) if PlanHelper.specialExpressionInUnsupportedOperator(f) => + case f @ Filter(condition, _) + if PlanHelper.specialExpressionInUnsupportedOperator(f).nonEmpty => + val invalidExprSqls = PlanHelper.specialExpressionInUnsupportedOperator(f).map(_.sql) failAnalysis( s""" |Aggregate/Window/Generate expressions are not valid in where clause of the query. - |Expression in where clause: ${condition.sql} + |Expression in where clause: [${condition.sql}] + |Invalid expressions: [${invalidExprSqls.mkString(", ")}] """.stripMargin) - case other if PlanHelper.specialExpressionInUnsupportedOperator(other) => - failAnalysis(s"The query operator `${other.nodeName}` contains " + - "one or more unsupported expression types Aggregate, Window or Generate.") + case other if PlanHelper.specialExpressionInUnsupportedOperator(other).nonEmpty => + val invalidExprSqls = + PlanHelper.specialExpressionInUnsupportedOperator(other).map(_.sql) + failAnalysis( + s""" + |The query operator `${other.nodeName}` contains one or more unsupported + |expression types Aggregate, Window or Generate. + |Invalid expressions: [${invalidExprSqls.mkString(", ")}] + | + """.stripMargin + ) case _ => // Analysis successful! } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0dd73b05d136..94233eb46866 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -44,7 +44,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // - only host special expressions in supported operators override protected def isPlanIntegral(plan: LogicalPlan): Boolean = { !Utils.isTesting || (plan.resolved && - plan.find(PlanHelper.specialExpressionInUnsupportedOperator).isEmpty) + plan.find(PlanHelper.specialExpressionInUnsupportedOperator(_).nonEmpty).isEmpty) } protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala index 520bf16d33bd..06ff43f11db9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.expressions.{Generator, WindowExpression} +import org.apache.spark.sql.catalyst.expressions.{Expression, Generator, WindowExpression} import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression /** @@ -31,23 +31,23 @@ object PlanHelper { * - A WindowExpression but the plan is not Window * - An AggregateExpresion but the plan is not Aggregate or Window * - A Generator but the plan is not Generate - * Returns true when this operator hosts illegal expressions. This can happen when + * Returns the list of invalid expressions that this operator hosts. This can happen when * 1. The input query from users contain invalid expressions. * Example : SELECT * FROM tab WHERE max(c1) > 0 * 2. Query rewrites inadvertently produce plans that are invalid. */ - def specialExpressionInUnsupportedOperator(plan: LogicalPlan): Boolean = { + def specialExpressionInUnsupportedOperator(plan: LogicalPlan): Seq[Expression] = { val exprs = plan.expressions - exprs.flatMap { root => - root.find { + val invalidExpressions = exprs.flatMap { root => + root.collect { case e: WindowExpression - if !plan.isInstanceOf[Window] => true + if !plan.isInstanceOf[Window] => e case e: AggregateExpression - if !(plan.isInstanceOf[Aggregate] || plan.isInstanceOf[Window]) => true + if !(plan.isInstanceOf[Aggregate] || plan.isInstanceOf[Window]) => e case e: Generator - if !plan.isInstanceOf[Generate] => true - case _ => false + if !plan.isInstanceOf[Generate] => e } - }.nonEmpty + } + invalidExpressions } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql index 3f40bed07922..66bc90914e0d 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql @@ -152,4 +152,5 @@ SELECT * FROM (SELECT COUNT(*) AS cnt FROM test_agg) WHERE cnt > 1L; -- Error when aggregate expressions are in where clause directly SELECT count(*) FROM test_agg WHERE count(*) > 1L; SELECT count(*) FROM test_agg WHERE count(*) + 1L > 1L; +SELECT count(*) FROM test_agg WHERE k = 1 or k = 2 or count(*) + 1L > 1L or max(k) > 1; diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index 884018b319db..e09487596184 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 51 +-- Number of queries: 52 -- !query 0 @@ -495,7 +495,8 @@ struct<> org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not valid in where clause of the query. -Expression in where clause: (count(1) > 1L) +Expression in where clause: [(count(1) > 1L)] +Invalid expressions: [count(1)] ; @@ -507,5 +508,19 @@ struct<> org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not valid in where clause of the query. -Expression in where clause: ((count(1) + 1L) > 1L) +Expression in where clause: [((count(1) + 1L) > 1L)] +Invalid expressions: [count(1)] + ; + + +-- !query 51 +SELECT count(*) FROM test_agg WHERE k = 1 or k = 2 or count(*) + 1L > 1L or max(k) > 1 +-- !query 51 schema +struct<> +-- !query 51 output +org.apache.spark.sql.AnalysisException + +Aggregate/Window/Generate expressions are not valid in where clause of the query. +Expression in where clause: [(((test_agg.`k` = 1) OR (test_agg.`k` = 2)) OR (((count(1) + 1L) > 1L) OR (max(test_agg.`k`) > 1)))] +Invalid expressions: [count(1), max(test_agg.`k`)] ; From bbab4c198152ebe5a06a99372aaf1445a49f0f18 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 2 Apr 2019 08:29:53 -0700 Subject: [PATCH 9/9] code review --- .../sql/catalyst/analysis/CheckAnalysis.scala | 15 ++++++--------- .../spark/sql/catalyst/optimizer/Optimizer.scala | 2 +- .../sql/catalyst/plans/logical/PlanHelper.scala | 2 +- .../resources/sql-tests/results/group-by.sql.out | 9 +++------ 4 files changed, 11 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 055c2734bf9c..427f196344df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -377,25 +377,22 @@ trait CheckAnalysis extends PredicateHelper { "Internal error: logical hint operator should have been removed during analysis") case f @ Filter(condition, _) - if PlanHelper.specialExpressionInUnsupportedOperator(f).nonEmpty => - val invalidExprSqls = PlanHelper.specialExpressionInUnsupportedOperator(f).map(_.sql) + if PlanHelper.specialExpressionsInUnsupportedOperator(f).nonEmpty => + val invalidExprSqls = PlanHelper.specialExpressionsInUnsupportedOperator(f).map(_.sql) failAnalysis( s""" |Aggregate/Window/Generate expressions are not valid in where clause of the query. |Expression in where clause: [${condition.sql}] - |Invalid expressions: [${invalidExprSqls.mkString(", ")}] - """.stripMargin) + |Invalid expressions: [${invalidExprSqls.mkString(", ")}]""".stripMargin) - case other if PlanHelper.specialExpressionInUnsupportedOperator(other).nonEmpty => + case other if PlanHelper.specialExpressionsInUnsupportedOperator(other).nonEmpty => val invalidExprSqls = - PlanHelper.specialExpressionInUnsupportedOperator(other).map(_.sql) + PlanHelper.specialExpressionsInUnsupportedOperator(other).map(_.sql) failAnalysis( s""" |The query operator `${other.nodeName}` contains one or more unsupported |expression types Aggregate, Window or Generate. - |Invalid expressions: [${invalidExprSqls.mkString(", ")}] - | - """.stripMargin + |Invalid expressions: [${invalidExprSqls.mkString(", ")}]""".stripMargin ) case _ => // Analysis successful! diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 94233eb46866..6319d47c9a0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -44,7 +44,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // - only host special expressions in supported operators override protected def isPlanIntegral(plan: LogicalPlan): Boolean = { !Utils.isTesting || (plan.resolved && - plan.find(PlanHelper.specialExpressionInUnsupportedOperator(_).nonEmpty).isEmpty) + plan.find(PlanHelper.specialExpressionsInUnsupportedOperator(_).nonEmpty).isEmpty) } protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala index 06ff43f11db9..4a28d879d114 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PlanHelper.scala @@ -36,7 +36,7 @@ object PlanHelper { * Example : SELECT * FROM tab WHERE max(c1) > 0 * 2. Query rewrites inadvertently produce plans that are invalid. */ - def specialExpressionInUnsupportedOperator(plan: LogicalPlan): Seq[Expression] = { + def specialExpressionsInUnsupportedOperator(plan: LogicalPlan): Seq[Expression] = { val exprs = plan.expressions val invalidExpressions = exprs.flatMap { root => root.collect { diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index e09487596184..3a5df254f2cd 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -496,8 +496,7 @@ org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not valid in where clause of the query. Expression in where clause: [(count(1) > 1L)] -Invalid expressions: [count(1)] - ; +Invalid expressions: [count(1)]; -- !query 50 @@ -509,8 +508,7 @@ org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not valid in where clause of the query. Expression in where clause: [((count(1) + 1L) > 1L)] -Invalid expressions: [count(1)] - ; +Invalid expressions: [count(1)]; -- !query 51 @@ -522,5 +520,4 @@ org.apache.spark.sql.AnalysisException Aggregate/Window/Generate expressions are not valid in where clause of the query. Expression in where clause: [(((test_agg.`k` = 1) OR (test_agg.`k` = 2)) OR (((count(1) + 1L) > 1L) OR (max(test_agg.`k`) > 1)))] -Invalid expressions: [count(1), max(test_agg.`k`)] - ; +Invalid expressions: [count(1), max(test_agg.`k`)];