From 4a6f903897d28a3038918997e692410259a90ae3 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 19 Jun 2020 10:36:52 +0800 Subject: [PATCH 01/11] Reuse completeNextStageWithFetchFailure --- .../apache/spark/scheduler/DAGSchedulerSuite.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..762b14e170fcc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1796,9 +1796,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(1, 0, shuffleDep1) scheduler.resubmitFailedStages() // stage 0, attempt 1 should have the properties of job2 @@ -1872,9 +1870,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the second stage complete normally completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC")) // fail the third stage because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // have DAGScheduler try again @@ -1900,9 +1896,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // complete stage 1 completeShuffleMapStageSuccessfully(1, 0, 1) // pretend stage 2 failed because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. From 086bba330ffeba1856e90962872b87a710326631 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 2 Dec 2020 14:12:50 +0800 Subject: [PATCH 02/11] Supplement error --- .../spark/sql/QueryCompilationErrors.scala | 177 +++++++++++++++++- .../sql/catalyst/analysis/Analyzer.scala | 124 ++++++------ .../sql/catalyst/analysis/CheckAnalysis.scala | 4 + .../spark/sql/catalyst/analysis/package.scala | 4 + 4 files changed, 237 insertions(+), 72 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index c680502cb328f..ff6148dcbeeb9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -18,9 +18,13 @@ package org.apache.spark.sql.errors import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Expression, GroupingID} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.ResolvedView +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GroupingID, NamedExpression, SpecifiedWindowFrame, WindowFrame, WindowFunction, WindowSpecDefinition} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.toPrettySQL +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType} @@ -31,6 +35,7 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType} * org.apache.spark.sql.catalyst.analysis.Analyzer. */ object QueryCompilationErrors { + def groupingIDMismatchError(groupingID: GroupingID, groupByExprs: Seq[Expression]): Throwable = { new AnalysisException( s"Columns of grouping_id (${groupingID.groupByExprs.mkString(",")}) " + @@ -159,6 +164,174 @@ object QueryCompilationErrors { s"Couldn't find the reference column for $after at $parentName") } -} + def windowSpecificationNotDefinedError(windowName: String): Throwable = { + new AnalysisException(s"Window specification $windowName is not defined in the WINDOW clause.") + } + + def selectExprNotInGroupByError(expr: Expression, groupByAliases: Seq[Alias]): Throwable = { + new AnalysisException(s"$expr doesn't show up in the GROUP BY list $groupByAliases") + } + + def groupingMustWithGroupingSetsOrCubeOrRollupError(): Throwable = { + new AnalysisException("grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup") + } + + def pandasUDFAggregateNotSupportedInPivotError(): Throwable = { + new AnalysisException("Pandas UDF aggregate expressions are currently not supported in pivot.") + } + + def aggregateExpressionRequiredForPivotError(sql: String): Throwable = { + new AnalysisException(s"Aggregate expression required for pivot, but '$sql' " + + "did not appear in any aggregate function.") + } + + def expectedTableNotTempViewError(quoted: String, cmd: String, t: TreeNode[_]): Throwable = { + new AnalysisException(s"$quoted is a temp view. '$cmd' expects a table", + t.origin.line, t.origin.startPosition) + } + + def expectedTableOrPermanentViewNotTempViewError( + quoted: String, cmd: String, t: TreeNode[_]): Throwable = { + new AnalysisException(s"$quoted is a temp view. '$cmd' expects a table or permanent view.", + t.origin.line, t.origin.startPosition) + } + + def viewDepthExceedsMaxResolutionDepthError( + identifier: TableIdentifier, maxNestedViewDepth: Int, t: TreeNode[_]): Throwable = { + new AnalysisException(s"The depth of view $identifier exceeds the maximum " + + s"view resolution depth ($maxNestedViewDepth). Analysis is aborted to " + + s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " + + "around this.", t.origin.line, t.origin.startPosition) + } + + def insertingIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Inserting into a view is not allowed. View: $identifier.", + t.origin.line, t.origin.startPosition) + } + + def writingIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Writing into a view is not allowed. View: $identifier.", + t.origin.line, t.origin.startPosition) + } + + def writingIntoV1TableNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Cannot write into v1 table: $identifier.", + t.origin.line, t.origin.startPosition) + } + + def expectsTableNotViewError(v: ResolvedView, cmd: String, t: TreeNode[_]): Throwable = { + val viewStr = if (v.isTemp) "temp view" else "view" + new AnalysisException(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.", + t.origin.line, t.origin.startPosition) + } + + def starNotAllowedWhenGroupByOrdinalPositionUsedError(): Throwable = { + new AnalysisException( + "Star (*) is not allowed in select list when GROUP BY ordinal position is used") + } + + def starInExplodeOrJsonTupleOrUDTFInvalidError(): Throwable = { + new AnalysisException("Invalid usage of '*' in explode/json_tuple/UDTF") + } + + def starInExpressionInvalidError(prettyName: String): Throwable = { + new AnalysisException(s"Invalid usage of '*' in expression '$prettyName'") + } + + def orderByPositionRangeError(index: Int, size: Int, t: TreeNode[_]): Throwable = { + new AnalysisException(s"ORDER BY position $index is not in select list " + + s"(valid range is [1, $size])", t.origin.line, t.origin.startPosition) + } + + def groupByPositionRangeError(index: Int, size: Int, t: TreeNode[_]): Throwable = { + new AnalysisException(s"GROUP BY position $index is not in select list " + + s"(valid range is [1, $size])", t.origin.line, t.origin.startPosition) + } + + def generatorNotExpectedError(name: FunctionIdentifier, classCanonicalName: String): Throwable = { + new AnalysisException(s"$name is expected to be a generator. However, " + + s"its class is $classCanonicalName, which is not a generator.") + } + + def distinctOrFilterOnlyWithAggregateFunctionError(prettyName: String): Throwable = { + new AnalysisException("DISTINCT or FILTER specified, " + + s"but $prettyName is not an aggregate function") + } + + def filterInAggregateFunctionInvalidError(): Throwable = { + new AnalysisException("FILTER expression is non-deterministic, " + + "it cannot be used in aggregate functions") + } + + def aliasNumberNotMatchColumnNumberError( + columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = { + new AnalysisException("Number of column aliases does not match number of columns. " + + s"Number of column aliases: $columnSize; " + + s"number of columns: $outputSize.", t.origin.line, t.origin.startPosition) + } + def distinctOrFilterOnlyWithAggregateFunctionError( + aliasesSize: Int, aliasesNames: String): Throwable = { + new AnalysisException("The number of aliases supplied in the AS clause does not " + + s"match the number of columns output by the UDTF expected $aliasesSize " + + s"aliases but got $aliasesNames ") + } + + def windowAggregateFunctionWithFilterNotSupportedError(): Throwable = { + new AnalysisException("window aggregate function with filter predicate is not supported yet.") + } + + def windowFunctionInsideAggregateFunctionNotAllowedError(): Throwable = { + new AnalysisException("It is not allowed to use a window function inside an aggregate " + + "function. Please use the inner window function in a sub-query.") + } + + def expressionWithoutWindowExpressionError(expr: NamedExpression): Throwable = { + new AnalysisException(s"$expr does not have any WindowExpression.") + } + + def expressionWithMultiWindowExpressionError( + expr: NamedExpression, distinctWindowSpec: Seq[WindowSpecDefinition]): Throwable = { + new AnalysisException(s"$expr has multiple Window Specifications ($distinctWindowSpec)." + + s"Please file a bug report with this error message, stack trace, and the query.") + } + def windowFunctionsNotAllowedInsideWhereClauseError(): Throwable = { + new AnalysisException("It is not allowed to use window functions inside WHERE clause") + } + + def windowFunctionsNotAllowedInsideHavingClauseError(): Throwable = { + new AnalysisException("It is not allowed to use window functions inside HAVING clause") + } + + def specifyWindowFrameForFrameLessOffsetWindowFunctionError(prettyName: String): Throwable = { + new AnalysisException(s"Cannot specify window frame for $prettyName function") + } + + def windowFrameNotMatchRequiredFrameError( + f: SpecifiedWindowFrame, required: WindowFrame): Throwable = { + new AnalysisException(s"Window Frame $f must match the required frame $required") + } + + def windowFunctionWithWindowFrameNotOrderedError(wf: WindowFunction): Throwable = { + new AnalysisException(s"Window function $wf requires window to be ordered, please add " + + s"ORDER BY clause. For example SELECT $wf(value_expr) OVER (PARTITION BY window_partition " + + s"ORDER BY window_ordering) from table") + } + + def cannotResolveUserSpecifiedColumnsError(col: String, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Cannot resolve column name $col", t.origin.line, t.origin.startPosition) + } + + def writeTableWithMismatchedColumnsError( + columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = { + new AnalysisException(s"Cannot write to table due to mismatched user specified column " + + s"size($columnSize) and data column size($outputSize)", t.origin.line, t.origin.startPosition) + } + + def multiTimeWindowExpressionsNotSupportedError(t: TreeNode[_]): Throwable = { + new AnalysisException("Multiple time window expressions would result in a cartesian product " + + "of rows, therefore they are currently not supported.", t.origin.line, t.origin.startPosition) + } + +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index abd38f2f9d940..2454881e6ae8d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -349,10 +349,8 @@ class Analyzer(override val catalogManager: CatalogManager) // Lookup WindowSpecDefinitions. This rule works with unresolved children. case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions { case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => - val errorMessage = - s"Window specification $windowName is not defined in the WINDOW clause." - val windowSpecDefinition = - windowDefinitions.getOrElse(windowName, failAnalysis(errorMessage)) + val windowSpecDefinition = windowDefinitions.getOrElse(windowName, + failAnalysis(QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))) WindowExpression(c, windowSpecDefinition) } } @@ -496,7 +494,7 @@ class Analyzer(override val catalogManager: CatalogManager) val groupingSetsAttributes = selectedGroupByExprs.map { groupingSetExprs => groupingSetExprs.map { expr => val alias = groupByAliases.find(_.child.semanticEquals(expr)).getOrElse( - failAnalysis(s"$expr doesn't show up in the GROUP BY list $groupByAliases")) + failAnalysis(QueryCompilationErrors.selectExprNotInGroupByError(expr, groupByAliases))) // Map alias to expanded attribute. expandedAttributes.find(_.semanticEquals(alias.toAttribute)).getOrElse( alias.toAttribute) @@ -600,11 +598,11 @@ class Analyzer(override val catalogManager: CatalogManager) val gid = a.groupingExpressions.last if (!gid.isInstanceOf[AttributeReference] || gid.asInstanceOf[AttributeReference].name != VirtualColumn.groupingIdName) { - failAnalysis(s"grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup") + failAnalysis(QueryCompilationErrors.groupingMustWithGroupingSetsOrCubeOrRollupError()) } a.groupingExpressions.take(a.groupingExpressions.length - 1) }.getOrElse { - failAnalysis(s"grouping()/grouping_id() can only be used with GroupingSets/Cube/Rollup") + failAnalysis(QueryCompilationErrors.groupingMustWithGroupingSetsOrCubeOrRollupError()) } } @@ -814,11 +812,9 @@ class Analyzer(override val catalogManager: CatalogManager) private def checkValidAggregateExpression(expr: Expression): Unit = expr match { case _: AggregateExpression => // OK and leave the argument check to CheckAnalysis. case expr: PythonUDF if PythonUDF.isGroupedAggPandasUDF(expr) => - failAnalysis("Pandas UDF aggregate expressions are currently not supported in pivot.") + failAnalysis(QueryCompilationErrors.pandasUDFAggregateNotSupportedInPivotError()) case e: Attribute => - failAnalysis( - s"Aggregate expression required for pivot, but '${e.sql}' " + - s"did not appear in any aggregate function.") + failAnalysis(QueryCompilationErrors.aggregateExpressionRequiredForPivotError(e.sql)) case e => e.children.foreach(checkValidAggregateExpression) } } @@ -864,15 +860,15 @@ class Analyzer(override val catalogManager: CatalogManager) } case u @ UnresolvedTable(ident, cmd) => lookupTempView(ident).foreach { _ => - u.failAnalysis(s"${ident.quoted} is a temp view. '$cmd' expects a table") + u.failAnalysis(QueryCompilationErrors.expectedTableNotTempViewError(ident.quoted, cmd, u)) } u case u @ UnresolvedTableOrView(ident, cmd, allowTempView) => lookupTempView(ident) .map { _ => if (!allowTempView) { - u.failAnalysis( - s"${ident.quoted} is a temp view. '$cmd' expects a table or permanent view.") + u.failAnalysis(QueryCompilationErrors.expectedTableOrPermanentViewNotTempViewError( + ident.quoted, cmd, u)) } ResolvedView(ident.asIdentifier, isTemp = true) } @@ -1030,10 +1026,8 @@ class Analyzer(override val catalogManager: CatalogManager) // Resolve all the UnresolvedRelations and Views in the child. val newChild = AnalysisContext.withAnalysisContext(desc.viewCatalogAndNamespace) { if (AnalysisContext.get.nestedViewDepth > conf.maxNestedViewDepth) { - view.failAnalysis(s"The depth of view ${desc.identifier} exceeds the maximum " + - s"view resolution depth (${conf.maxNestedViewDepth}). Analysis is aborted to " + - s"avoid errors. Increase the value of ${SQLConf.MAX_NESTED_VIEW_DEPTH.key} to work " + - "around this.") + view.failAnalysis(QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( + desc.identifier, conf.maxNestedViewDepth, view)) } SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs)) { executeSameContext(child) @@ -1055,7 +1049,8 @@ class Analyzer(override val catalogManager: CatalogManager) EliminateSubqueryAliases(relation) match { case v: View => - table.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.") + table.failAnalysis(QueryCompilationErrors.insertingIntoViewNotAllowedError( + v.desc.identifier, table)) case other => i.copy(table = other) } @@ -1067,9 +1062,9 @@ class Analyzer(override val catalogManager: CatalogManager) .map(EliminateSubqueryAliases(_)) .map { case v: View => write.failAnalysis( - s"Writing into a view is not allowed. View: ${v.desc.identifier}.") - case u: UnresolvedCatalogRelation => write.failAnalysis( - "Cannot write into v1 table: " + u.tableMeta.identifier) + QueryCompilationErrors.writingIntoViewNotAllowedError(v.desc.identifier, write)) + case u: UnresolvedCatalogRelation => write.failAnalysis(QueryCompilationErrors. + writingIntoV1TableNotAllowedError(u.tableMeta.identifier, write)) case r: DataSourceV2Relation => write.withNewTable(r) case other => throw new IllegalStateException( "[BUG] unexpected plan returned by `lookupRelation`: " + other) @@ -1084,8 +1079,7 @@ class Analyzer(override val catalogManager: CatalogManager) case u @ UnresolvedTable(identifier, cmd) => lookupTableOrView(identifier).map { case v: ResolvedView => - val viewStr = if (v.isTemp) "temp view" else "view" - u.failAnalysis(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.") + u.failAnalysis(QueryCompilationErrors.expectsTableNotViewError(v, cmd, u)) case table => table }.getOrElse(u) @@ -1448,8 +1442,7 @@ class Analyzer(override val catalogManager: CatalogManager) // If the aggregate function argument contains Stars, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => if (a.groupingExpressions.exists(_.isInstanceOf[UnresolvedOrdinal])) { - failAnalysis( - "Star (*) is not allowed in select list when GROUP BY ordinal position is used") + failAnalysis(QueryCompilationErrors.starNotAllowedWhenGroupByOrdinalPositionUsedError()) } else { a.copy(aggregateExpressions = buildExpandedProjectList(a.aggregateExpressions, a.child)) } @@ -1462,7 +1455,7 @@ class Analyzer(override val catalogManager: CatalogManager) } ) case g: Generate if containsStar(g.generator.children) => - failAnalysis("Invalid usage of '*' in explode/json_tuple/UDTF") + failAnalysis(QueryCompilationErrors.starInExplodeOrJsonTupleOrUDTFInvalidError()) // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, _, _) if !j.duplicateResolved => @@ -1722,7 +1715,7 @@ class Analyzer(override val catalogManager: CatalogManager) }) // count(*) has been replaced by count(1) case o if containsStar(o.children) => - failAnalysis(s"Invalid usage of '*' in expression '${o.prettyName}'") + failAnalysis(QueryCompilationErrors.starInExpressionInvalidError(o.prettyName)) } } } @@ -1824,9 +1817,8 @@ class Analyzer(override val catalogManager: CatalogManager) if (index > 0 && index <= child.output.size) { SortOrder(child.output(index - 1), direction, nullOrdering, Set.empty) } else { - s.failAnalysis( - s"ORDER BY position $index is not in select list " + - s"(valid range is [1, ${child.output.size}])") + s.failAnalysis(QueryCompilationErrors.orderByPositionRangeError( + index, child.output.size, s)) } case o => o } @@ -1839,10 +1831,8 @@ class Analyzer(override val catalogManager: CatalogManager) val newGroups = groups.map { case u @ UnresolvedOrdinal(index) if index > 0 && index <= aggs.size => aggs(index - 1) - case ordinal @ UnresolvedOrdinal(index) => - ordinal.failAnalysis( - s"GROUP BY position $index is not in select list " + - s"(valid range is [1, ${aggs.size}])") + case ordinal @ UnresolvedOrdinal(index) => ordinal.failAnalysis( + QueryCompilationErrors.groupByPositionRangeError(index, aggs.size, ordinal)) case o => o } Aggregate(newGroups, aggs, child) @@ -2050,8 +2040,8 @@ class Analyzer(override val catalogManager: CatalogManager) v1SessionCatalog.lookupFunction(name, children) match { case generator: Generator => generator case other => - failAnalysis(s"$name is expected to be a generator. However, " + - s"its class is ${other.getClass.getCanonicalName}, which is not a generator.") + failAnalysis(QueryCompilationErrors.generatorNotExpectedError( + name, other.getClass.getCanonicalName)) } } case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) => @@ -2062,22 +2052,22 @@ class Analyzer(override val catalogManager: CatalogManager) // AggregateExpression. case wf: AggregateWindowFunction => if (isDistinct || filter.isDefined) { - failAnalysis("DISTINCT or FILTER specified, " + - s"but ${wf.prettyName} is not an aggregate function") + failAnalysis( + QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( + wf.prettyName)) } else { wf } // We get an aggregate function, we need to wrap it in an AggregateExpression. case agg: AggregateFunction => if (filter.isDefined && !filter.get.deterministic) { - failAnalysis("FILTER expression is non-deterministic, " + - "it cannot be used in aggregate functions") + failAnalysis(QueryCompilationErrors.filterInAggregateFunctionInvalidError) } AggregateExpression(agg, Complete, isDistinct, filter) // This function is not an aggregate function, just return the resolved one. case other if (isDistinct || filter.isDefined) => - failAnalysis("DISTINCT or FILTER specified, " + - s"but ${other.prettyName} is not an aggregate function") + failAnalysis(QueryCompilationErrors. + distinctOrFilterOnlyWithAggregateFunctionError(other.prettyName)) case e: String2TrimExpression if arguments.size == 2 => if (trimWarningEnabled.get) { log.warn("Two-parameter TRIM/LTRIM/RTRIM function signatures are deprecated." + @@ -2216,9 +2206,8 @@ class Analyzer(override val catalogManager: CatalogManager) // Checks if the number of the aliases equals to the number of output columns // in the subquery. if (columnNames.size != outputAttrs.size) { - u.failAnalysis("Number of column aliases does not match number of columns. " + - s"Number of column aliases: ${columnNames.size}; " + - s"number of columns: ${outputAttrs.size}.") + u.failAnalysis(QueryCompilationErrors.aliasNumberNotMatchColumnNumberError( + columnNames.size, outputAttrs.size, u)) } val aliases = outputAttrs.zip(columnNames).map { case (attr, aliasName) => Alias(attr, aliasName)() @@ -2609,10 +2598,8 @@ class Analyzer(override val catalogManager: CatalogManager) } else if (names.isEmpty) { elementAttrs } else { - failAnalysis( - "The number of aliases supplied in the AS clause does not match the number of columns " + - s"output by the UDTF expected ${elementAttrs.size} aliases but got " + - s"${names.mkString(",")} ") + failAnalysis(QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( + elementAttrs.size, names.mkString(","))) } } } @@ -2721,8 +2708,7 @@ class Analyzer(override val catalogManager: CatalogManager) wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec) case WindowExpression(ae: AggregateExpression, _) if ae.filter.isDefined => - failAnalysis( - "window aggregate function with filter predicate is not supported yet.") + failAnalysis(QueryCompilationErrors.windowAggregateFunctionWithFilterNotSupportedError) // Extract Windowed AggregateExpression case we @ WindowExpression( @@ -2735,8 +2721,8 @@ class Analyzer(override val catalogManager: CatalogManager) WindowExpression(newAgg, spec) case AggregateExpression(aggFunc, _, _, _, _) if hasWindowFunction(aggFunc.children) => - failAnalysis("It is not allowed to use a window function inside an aggregate " + - "function. Please use the inner window function in a sub-query.") + failAnalysis( + QueryCompilationErrors.windowFunctionInsideAggregateFunctionNotAllowedError) // Extracts AggregateExpression. For example, for SUM(x) - Sum(y) OVER (...), // we need to extract SUM(x). @@ -2800,12 +2786,12 @@ class Analyzer(override val catalogManager: CatalogManager) // We do a final check and see if we only have a single Window Spec defined in an // expressions. if (distinctWindowSpec.isEmpty) { - failAnalysis(s"$expr does not have any WindowExpression.") + failAnalysis(QueryCompilationErrors.expressionWithoutWindowExpressionError(expr)) } else if (distinctWindowSpec.length > 1) { // newExpressionsWithWindowFunctions only have expressions with a single // WindowExpression. If we reach here, we have a bug. - failAnalysis(s"$expr has multiple Window Specifications ($distinctWindowSpec)." + - s"Please file a bug report with this error message, stack trace, and the query.") + failAnalysis(QueryCompilationErrors.expressionWithMultiWindowExpressionError( + expr, distinctWindowSpec)) } else { val spec = distinctWindowSpec.head val specKey = (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr)) @@ -2833,10 +2819,10 @@ class Analyzer(override val catalogManager: CatalogManager) def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case Filter(condition, _) if hasWindowFunction(condition) => - failAnalysis("It is not allowed to use window functions inside WHERE clause") + failAnalysis(QueryCompilationErrors.windowFunctionsNotAllowedInsideWhereClauseError) case UnresolvedHaving(condition, _) if hasWindowFunction(condition) => - failAnalysis("It is not allowed to use window functions inside HAVING clause") + failAnalysis(QueryCompilationErrors.windowFunctionsNotAllowedInsideHavingClauseError) // Aggregate with Having clause. This rule works with an unresolved Aggregate because // a resolved Aggregate will not have Window Functions. @@ -3036,10 +3022,11 @@ class Analyzer(override val catalogManager: CatalogManager) def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { case WindowExpression(wf: FrameLessOffsetWindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f => - failAnalysis(s"Cannot specify window frame for ${wf.prettyName} function") + failAnalysis(QueryCompilationErrors. + specifyWindowFrameForFrameLessOffsetWindowFunctionError(wf.prettyName)) case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != UnspecifiedFrame && wf.frame != f => - failAnalysis(s"Window Frame $f must match the required frame ${wf.frame}") + failAnalysis(QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, wf.frame)) case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _, UnspecifiedFrame)) if wf.frame != UnspecifiedFrame => WindowExpression(wf, s.copy(frameSpecification = wf.frame)) @@ -3060,9 +3047,7 @@ class Analyzer(override val catalogManager: CatalogManager) object ResolveWindowOrder extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty => - failAnalysis(s"Window function $wf requires window to be ordered, please add ORDER BY " + - s"clause. For example SELECT $wf(value_expr) OVER (PARTITION BY window_partition " + - s"ORDER BY window_ordering) from table") + failAnalysis(QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf)) case WindowExpression(rank: RankLike, spec) if spec.resolved => val order = spec.orderSpec.map(_.child) WindowExpression(rank.withOrder(order), spec) @@ -3129,7 +3114,8 @@ class Analyzer(override val catalogManager: CatalogManager) i.userSpecifiedCols.map { col => i.table.resolve(Seq(col), resolver) - .getOrElse(i.table.failAnalysis(s"Cannot resolve column name $col")) + .getOrElse(i.table.failAnalysis( + QueryCompilationErrors.cannotResolveUserSpecifiedColumnsError(col, i.table))) } } @@ -3138,9 +3124,8 @@ class Analyzer(override val catalogManager: CatalogManager) cols: Seq[NamedExpression], query: LogicalPlan): LogicalPlan = { if (cols.size != query.output.size) { - query.failAnalysis( - s"Cannot write to table due to mismatched user specified column size(${cols.size}) and" + - s" data column size(${query.output.size})") + query.failAnalysis(QueryCompilationErrors.writeTableWithMismatchedColumnsError( + cols.size, query.output.size, query)) } val nameToQueryExpr = cols.zip(query.output).toMap // Static partition columns in the table output should not appear in the column list @@ -3720,8 +3705,7 @@ object TimeWindowing extends Rule[LogicalPlan] { renamedPlan.withNewChildren(substitutedPlan :: Nil) } } else if (numWindowExpr > 1) { - p.failAnalysis("Multiple time window expressions would result in a cartesian product " + - "of rows, therefore they are currently not supported.") + p.failAnalysis(QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)) } else { p // Return unchanged. Analyzer will throw exception later } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 61ac6346ff944..50d5eea93940e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -50,6 +50,10 @@ trait CheckAnalysis extends PredicateHelper { throw new AnalysisException(msg) } + protected def failAnalysis(t: Throwable): Nothing = { + throw t + } + protected def containsMultipleGenerators(exprs: Seq[Expression]): Boolean = { exprs.flatMap(_.collect { case e: Generator => e diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala index 354a3fa0602a9..cd9d4b15f9ebe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala @@ -42,6 +42,10 @@ package object analysis { throw new AnalysisException(msg, t.origin.line, t.origin.startPosition) } + def failAnalysis(t: Throwable): Nothing = { + throw t + } + /** Fails the analysis at the point where a specific tree node was parsed. */ def failAnalysis(msg: String, cause: Throwable): Nothing = { throw new AnalysisException(msg, t.origin.line, t.origin.startPosition, cause = Some(cause)) From 5c80da13ad8c95c0a328a978e8dd505e614020cd Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 2 Dec 2020 16:56:56 +0800 Subject: [PATCH 03/11] Update error --- .../scala/org/apache/spark/sql/QueryCompilationErrors.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index ff6148dcbeeb9..d46ec67316403 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -215,7 +215,7 @@ object QueryCompilationErrors { } def writingIntoV1TableNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { - new AnalysisException(s"Cannot write into v1 table: $identifier.", + new AnalysisException(s"Cannot write into v1 table: $identifier.", t.origin.line, t.origin.startPosition) } From 85853062ec96f3d06932c5a1ff34932dcd5a2a41 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 4 Dec 2020 14:28:26 +0800 Subject: [PATCH 04/11] Update some name --- .../apache/spark/sql/QueryCompilationErrors.scala | 12 ++++++------ .../spark/sql/catalyst/analysis/Analyzer.scala | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index d46ec67316403..dba58feba0931 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -185,12 +185,12 @@ object QueryCompilationErrors { "did not appear in any aggregate function.") } - def expectedTableNotTempViewError(quoted: String, cmd: String, t: TreeNode[_]): Throwable = { + def expectTableNotTempViewError(quoted: String, cmd: String, t: TreeNode[_]): Throwable = { new AnalysisException(s"$quoted is a temp view. '$cmd' expects a table", t.origin.line, t.origin.startPosition) } - def expectedTableOrPermanentViewNotTempViewError( + def expectTableOrPermanentViewNotTempViewError( quoted: String, cmd: String, t: TreeNode[_]): Throwable = { new AnalysisException(s"$quoted is a temp view. '$cmd' expects a table or permanent view.", t.origin.line, t.origin.startPosition) @@ -204,22 +204,22 @@ object QueryCompilationErrors { "around this.", t.origin.line, t.origin.startPosition) } - def insertingIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + def insertIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { new AnalysisException(s"Inserting into a view is not allowed. View: $identifier.", t.origin.line, t.origin.startPosition) } - def writingIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { new AnalysisException(s"Writing into a view is not allowed. View: $identifier.", t.origin.line, t.origin.startPosition) } - def writingIntoV1TableNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { + def writeIntoV1TableNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { new AnalysisException(s"Cannot write into v1 table: $identifier.", t.origin.line, t.origin.startPosition) } - def expectsTableNotViewError(v: ResolvedView, cmd: String, t: TreeNode[_]): Throwable = { + def expectTableNotViewError(v: ResolvedView, cmd: String, t: TreeNode[_]): Throwable = { val viewStr = if (v.isTemp) "temp view" else "view" new AnalysisException(s"${v.identifier.quoted} is a $viewStr. '$cmd' expects a table.", t.origin.line, t.origin.startPosition) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2454881e6ae8d..a524a2da7aae1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -860,14 +860,14 @@ class Analyzer(override val catalogManager: CatalogManager) } case u @ UnresolvedTable(ident, cmd) => lookupTempView(ident).foreach { _ => - u.failAnalysis(QueryCompilationErrors.expectedTableNotTempViewError(ident.quoted, cmd, u)) + u.failAnalysis(QueryCompilationErrors.expectTableNotTempViewError(ident.quoted, cmd, u)) } u case u @ UnresolvedTableOrView(ident, cmd, allowTempView) => lookupTempView(ident) .map { _ => if (!allowTempView) { - u.failAnalysis(QueryCompilationErrors.expectedTableOrPermanentViewNotTempViewError( + u.failAnalysis(QueryCompilationErrors.expectTableOrPermanentViewNotTempViewError( ident.quoted, cmd, u)) } ResolvedView(ident.asIdentifier, isTemp = true) @@ -1049,7 +1049,7 @@ class Analyzer(override val catalogManager: CatalogManager) EliminateSubqueryAliases(relation) match { case v: View => - table.failAnalysis(QueryCompilationErrors.insertingIntoViewNotAllowedError( + table.failAnalysis(QueryCompilationErrors.insertIntoViewNotAllowedError( v.desc.identifier, table)) case other => i.copy(table = other) } @@ -1062,9 +1062,9 @@ class Analyzer(override val catalogManager: CatalogManager) .map(EliminateSubqueryAliases(_)) .map { case v: View => write.failAnalysis( - QueryCompilationErrors.writingIntoViewNotAllowedError(v.desc.identifier, write)) + QueryCompilationErrors.writeIntoViewNotAllowedError(v.desc.identifier, write)) case u: UnresolvedCatalogRelation => write.failAnalysis(QueryCompilationErrors. - writingIntoV1TableNotAllowedError(u.tableMeta.identifier, write)) + writeIntoV1TableNotAllowedError(u.tableMeta.identifier, write)) case r: DataSourceV2Relation => write.withNewTable(r) case other => throw new IllegalStateException( "[BUG] unexpected plan returned by `lookupRelation`: " + other) @@ -1079,7 +1079,7 @@ class Analyzer(override val catalogManager: CatalogManager) case u @ UnresolvedTable(identifier, cmd) => lookupTableOrView(identifier).map { case v: ResolvedView => - u.failAnalysis(QueryCompilationErrors.expectsTableNotViewError(v, cmd, u)) + u.failAnalysis(QueryCompilationErrors.expectTableNotViewError(v, cmd, u)) case table => table }.getOrElse(u) From 2249e05d88c16932a2aae5488dd8d875bbdd4dae Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 4 Dec 2020 14:50:08 +0800 Subject: [PATCH 05/11] Adjust name --- .../org/apache/spark/sql/QueryCompilationErrors.scala | 8 ++++---- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 ++++---- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 4 ++-- .../org/apache/spark/sql/catalyst/analysis/package.scala | 4 ++-- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index dba58feba0931..0aa75a0935be8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -258,7 +258,7 @@ object QueryCompilationErrors { s"but $prettyName is not an aggregate function") } - def filterInAggregateFunctionInvalidError(): Throwable = { + def nonDeterministicFilterInAggregateError(): Throwable = { new AnalysisException("FILTER expression is non-deterministic, " + "it cannot be used in aggregate functions") } @@ -270,7 +270,7 @@ object QueryCompilationErrors { s"number of columns: $outputSize.", t.origin.line, t.origin.startPosition) } - def distinctOrFilterOnlyWithAggregateFunctionError( + def aliasesNumberNotMatchUDTFOutputError( aliasesSize: Int, aliasesNames: String): Throwable = { new AnalysisException("The number of aliases supplied in the AS clause does not " + s"match the number of columns output by the UDTF expected $aliasesSize " + @@ -290,7 +290,7 @@ object QueryCompilationErrors { new AnalysisException(s"$expr does not have any WindowExpression.") } - def expressionWithMultiWindowExpressionError( + def expressionWithMultiWindowExpressionsError( expr: NamedExpression, distinctWindowSpec: Seq[WindowSpecDefinition]): Throwable = { new AnalysisException(s"$expr has multiple Window Specifications ($distinctWindowSpec)." + s"Please file a bug report with this error message, stack trace, and the query.") @@ -304,7 +304,7 @@ object QueryCompilationErrors { new AnalysisException("It is not allowed to use window functions inside HAVING clause") } - def specifyWindowFrameForFrameLessOffsetWindowFunctionError(prettyName: String): Throwable = { + def cannotSpecifyWindowFrameError(prettyName: String): Throwable = { new AnalysisException(s"Cannot specify window frame for $prettyName function") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a524a2da7aae1..ba35c5b391e82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2061,7 +2061,7 @@ class Analyzer(override val catalogManager: CatalogManager) // We get an aggregate function, we need to wrap it in an AggregateExpression. case agg: AggregateFunction => if (filter.isDefined && !filter.get.deterministic) { - failAnalysis(QueryCompilationErrors.filterInAggregateFunctionInvalidError) + failAnalysis(QueryCompilationErrors.nonDeterministicFilterInAggregateError) } AggregateExpression(agg, Complete, isDistinct, filter) // This function is not an aggregate function, just return the resolved one. @@ -2598,7 +2598,7 @@ class Analyzer(override val catalogManager: CatalogManager) } else if (names.isEmpty) { elementAttrs } else { - failAnalysis(QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( + failAnalysis(QueryCompilationErrors.aliasesNumberNotMatchUDTFOutputError( elementAttrs.size, names.mkString(","))) } } @@ -2790,7 +2790,7 @@ class Analyzer(override val catalogManager: CatalogManager) } else if (distinctWindowSpec.length > 1) { // newExpressionsWithWindowFunctions only have expressions with a single // WindowExpression. If we reach here, we have a bug. - failAnalysis(QueryCompilationErrors.expressionWithMultiWindowExpressionError( + failAnalysis(QueryCompilationErrors.expressionWithMultiWindowExpressionsError( expr, distinctWindowSpec)) } else { val spec = distinctWindowSpec.head @@ -3023,7 +3023,7 @@ class Analyzer(override val catalogManager: CatalogManager) case WindowExpression(wf: FrameLessOffsetWindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f => failAnalysis(QueryCompilationErrors. - specifyWindowFrameForFrameLessOffsetWindowFunctionError(wf.prettyName)) + cannotSpecifyWindowFrameError(wf.prettyName)) case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != UnspecifiedFrame && wf.frame != f => failAnalysis(QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, wf.frame)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 50d5eea93940e..fb6a30904d4fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -50,8 +50,8 @@ trait CheckAnalysis extends PredicateHelper { throw new AnalysisException(msg) } - protected def failAnalysis(t: Throwable): Nothing = { - throw t + protected def failAnalysis(error: Throwable): Nothing = { + throw error } protected def containsMultipleGenerators(exprs: Seq[Expression]): Boolean = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala index cd9d4b15f9ebe..ef7c116dee6e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala @@ -42,8 +42,8 @@ package object analysis { throw new AnalysisException(msg, t.origin.line, t.origin.startPosition) } - def failAnalysis(t: Throwable): Nothing = { - throw t + def failAnalysis(error: Throwable): Nothing = { + throw error } /** Fails the analysis at the point where a specific tree node was parsed. */ From 9fdfffb889c4aced6ba4a68ec3a7bbcb170d94a0 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Fri, 4 Dec 2020 18:27:54 +0800 Subject: [PATCH 06/11] Fix conflict --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 3417f88a060a2..3792d0c713484 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1034,7 +1034,7 @@ class Analyzer(override val catalogManager: CatalogManager) desc.viewCatalogAndNamespace, desc.viewReferredTempViewNames) { if (AnalysisContext.get.nestedViewDepth > conf.maxNestedViewDepth) { view.failAnalysis(QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( - desc.identifier, conf.maxNestedViewDepth, view)) + desc.identifier, conf.maxNestedViewDepth, view)) } SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs, isTempView)) { executeSameContext(child) From 21812d0e89f3411b1ffb5d54223c8ae4c4e252d2 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 7 Dec 2020 18:13:18 +0800 Subject: [PATCH 07/11] Optimize code --- .../spark/sql/QueryCompilationErrors.scala | 14 +-- .../sql/catalyst/analysis/Analyzer.scala | 106 +++++++++--------- .../sql/catalyst/analysis/CheckAnalysis.scala | 4 - .../spark/sql/catalyst/analysis/package.scala | 4 - 4 files changed, 53 insertions(+), 75 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index 0aa75a0935be8..209bd3fcb1b56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -230,11 +230,7 @@ object QueryCompilationErrors { "Star (*) is not allowed in select list when GROUP BY ordinal position is used") } - def starInExplodeOrJsonTupleOrUDTFInvalidError(): Throwable = { - new AnalysisException("Invalid usage of '*' in explode/json_tuple/UDTF") - } - - def starInExpressionInvalidError(prettyName: String): Throwable = { + def invalidStarUsageError(prettyName: String): Throwable = { new AnalysisException(s"Invalid usage of '*' in expression '$prettyName'") } @@ -296,12 +292,8 @@ object QueryCompilationErrors { s"Please file a bug report with this error message, stack trace, and the query.") } - def windowFunctionsNotAllowedInsideWhereClauseError(): Throwable = { - new AnalysisException("It is not allowed to use window functions inside WHERE clause") - } - - def windowFunctionsNotAllowedInsideHavingClauseError(): Throwable = { - new AnalysisException("It is not allowed to use window functions inside HAVING clause") + def windowFunctionNotAllowedError(): Throwable = { + new AnalysisException("It is not allowed to use window functions inside WHERE/HAVING clause") } def cannotSpecifyWindowFrameError(prettyName: String): Throwable = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d38c5496a9a7b..f840152fe89e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -369,7 +369,7 @@ class Analyzer(override val catalogManager: CatalogManager) case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions { case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) => val windowSpecDefinition = windowDefinitions.getOrElse(windowName, - failAnalysis(QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))) + throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName)) WindowExpression(c, windowSpecDefinition) } } @@ -513,7 +513,7 @@ class Analyzer(override val catalogManager: CatalogManager) val groupingSetsAttributes = selectedGroupByExprs.map { groupingSetExprs => groupingSetExprs.map { expr => val alias = groupByAliases.find(_.child.semanticEquals(expr)).getOrElse( - failAnalysis(QueryCompilationErrors.selectExprNotInGroupByError(expr, groupByAliases))) + throw QueryCompilationErrors.selectExprNotInGroupByError(expr, groupByAliases)) // Map alias to expanded attribute. expandedAttributes.find(_.semanticEquals(alias.toAttribute)).getOrElse( alias.toAttribute) @@ -617,11 +617,11 @@ class Analyzer(override val catalogManager: CatalogManager) val gid = a.groupingExpressions.last if (!gid.isInstanceOf[AttributeReference] || gid.asInstanceOf[AttributeReference].name != VirtualColumn.groupingIdName) { - failAnalysis(QueryCompilationErrors.groupingMustWithGroupingSetsOrCubeOrRollupError()) + throw QueryCompilationErrors.groupingMustWithGroupingSetsOrCubeOrRollupError() } a.groupingExpressions.take(a.groupingExpressions.length - 1) }.getOrElse { - failAnalysis(QueryCompilationErrors.groupingMustWithGroupingSetsOrCubeOrRollupError()) + throw QueryCompilationErrors.groupingMustWithGroupingSetsOrCubeOrRollupError() } } @@ -831,9 +831,9 @@ class Analyzer(override val catalogManager: CatalogManager) private def checkValidAggregateExpression(expr: Expression): Unit = expr match { case _: AggregateExpression => // OK and leave the argument check to CheckAnalysis. case expr: PythonUDF if PythonUDF.isGroupedAggPandasUDF(expr) => - failAnalysis(QueryCompilationErrors.pandasUDFAggregateNotSupportedInPivotError()) + throw QueryCompilationErrors.pandasUDFAggregateNotSupportedInPivotError() case e: Attribute => - failAnalysis(QueryCompilationErrors.aggregateExpressionRequiredForPivotError(e.sql)) + throw QueryCompilationErrors.aggregateExpressionRequiredForPivotError(e.sql) case e => e.children.foreach(checkValidAggregateExpression) } } @@ -880,15 +880,15 @@ class Analyzer(override val catalogManager: CatalogManager) } case u @ UnresolvedTable(ident, cmd) => lookupTempView(ident).foreach { _ => - u.failAnalysis(QueryCompilationErrors.expectTableNotTempViewError(ident.quoted, cmd, u)) + throw QueryCompilationErrors.expectTableNotTempViewError(ident.quoted, cmd, u) } u case u @ UnresolvedTableOrView(ident, cmd, allowTempView) => lookupTempView(ident) .map { _ => if (!allowTempView) { - u.failAnalysis(QueryCompilationErrors.expectTableOrPermanentViewNotTempViewError( - ident.quoted, cmd, u)) + throw QueryCompilationErrors.expectTableOrPermanentViewNotTempViewError( + ident.quoted, cmd, u) } ResolvedView(ident.asIdentifier, isTemp = true) } @@ -1048,8 +1048,8 @@ class Analyzer(override val catalogManager: CatalogManager) val nestedViewDepth = AnalysisContext.get.nestedViewDepth val maxNestedViewDepth = AnalysisContext.get.maxNestedViewDepth if (nestedViewDepth > maxNestedViewDepth) { - view.failAnalysis(QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( - desc.identifier, conf.maxNestedViewDepth, view)) + throw QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( + desc.identifier, conf.maxNestedViewDepth, view) } SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs, isTempView)) { executeSameContext(child) @@ -1071,8 +1071,7 @@ class Analyzer(override val catalogManager: CatalogManager) EliminateSubqueryAliases(relation) match { case v: View => - table.failAnalysis(QueryCompilationErrors.insertIntoViewNotAllowedError( - v.desc.identifier, table)) + throw QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table) case other => i.copy(table = other) } @@ -1083,10 +1082,11 @@ class Analyzer(override val catalogManager: CatalogManager) lookupRelation(u.multipartIdentifier, u.options, false) .map(EliminateSubqueryAliases(_)) .map { - case v: View => write.failAnalysis( - QueryCompilationErrors.writeIntoViewNotAllowedError(v.desc.identifier, write)) - case u: UnresolvedCatalogRelation => write.failAnalysis(QueryCompilationErrors. - writeIntoV1TableNotAllowedError(u.tableMeta.identifier, write)) + case v: View => throw QueryCompilationErrors.writeIntoViewNotAllowedError( + v.desc.identifier, write) + case u: UnresolvedCatalogRelation => + throw QueryCompilationErrors.writeIntoV1TableNotAllowedError( + u.tableMeta.identifier, write) case r: DataSourceV2Relation => write.withNewTable(r) case other => throw new IllegalStateException( "[BUG] unexpected plan returned by `lookupRelation`: " + other) @@ -1100,8 +1100,7 @@ class Analyzer(override val catalogManager: CatalogManager) case u @ UnresolvedTable(identifier, cmd) => lookupTableOrView(identifier).map { - case v: ResolvedView => - u.failAnalysis(QueryCompilationErrors.expectTableNotViewError(v, cmd, u)) + case v: ResolvedView => throw QueryCompilationErrors.expectTableNotViewError(v, cmd, u) case table => table }.getOrElse(u) @@ -1464,7 +1463,7 @@ class Analyzer(override val catalogManager: CatalogManager) // If the aggregate function argument contains Stars, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => if (a.groupingExpressions.exists(_.isInstanceOf[UnresolvedOrdinal])) { - failAnalysis(QueryCompilationErrors.starNotAllowedWhenGroupByOrdinalPositionUsedError()) + throw QueryCompilationErrors.starNotAllowedWhenGroupByOrdinalPositionUsedError() } else { a.copy(aggregateExpressions = buildExpandedProjectList(a.aggregateExpressions, a.child)) } @@ -1477,7 +1476,7 @@ class Analyzer(override val catalogManager: CatalogManager) } ) case g: Generate if containsStar(g.generator.children) => - failAnalysis(QueryCompilationErrors.starInExplodeOrJsonTupleOrUDTFInvalidError()) + throw QueryCompilationErrors.invalidStarUsageError(g.generator.prettyName) // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, _, _) if !j.duplicateResolved => @@ -1737,7 +1736,7 @@ class Analyzer(override val catalogManager: CatalogManager) }) // count(*) has been replaced by count(1) case o if containsStar(o.children) => - failAnalysis(QueryCompilationErrors.starInExpressionInvalidError(o.prettyName)) + throw QueryCompilationErrors.invalidStarUsageError(o.prettyName) } } } @@ -1839,8 +1838,7 @@ class Analyzer(override val catalogManager: CatalogManager) if (index > 0 && index <= child.output.size) { SortOrder(child.output(index - 1), direction, nullOrdering, Seq.empty) } else { - s.failAnalysis(QueryCompilationErrors.orderByPositionRangeError( - index, child.output.size, s)) + throw QueryCompilationErrors.orderByPositionRangeError(index, child.output.size, s) } case o => o } @@ -1853,8 +1851,8 @@ class Analyzer(override val catalogManager: CatalogManager) val newGroups = groups.map { case u @ UnresolvedOrdinal(index) if index > 0 && index <= aggs.size => aggs(index - 1) - case ordinal @ UnresolvedOrdinal(index) => ordinal.failAnalysis( - QueryCompilationErrors.groupByPositionRangeError(index, aggs.size, ordinal)) + case ordinal @ UnresolvedOrdinal(index) => + throw QueryCompilationErrors.groupByPositionRangeError(index, aggs.size, ordinal) case o => o } Aggregate(newGroups, aggs, child) @@ -2061,9 +2059,8 @@ class Analyzer(override val catalogManager: CatalogManager) withPosition(u) { v1SessionCatalog.lookupFunction(name, children) match { case generator: Generator => generator - case other => - failAnalysis(QueryCompilationErrors.generatorNotExpectedError( - name, other.getClass.getCanonicalName)) + case other => throw QueryCompilationErrors.generatorNotExpectedError( + name, other.getClass.getCanonicalName) } } case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) => @@ -2074,22 +2071,21 @@ class Analyzer(override val catalogManager: CatalogManager) // AggregateExpression. case wf: AggregateWindowFunction => if (isDistinct || filter.isDefined) { - failAnalysis( - QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( - wf.prettyName)) + throw QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( + wf.prettyName) } else { wf } // We get an aggregate function, we need to wrap it in an AggregateExpression. case agg: AggregateFunction => if (filter.isDefined && !filter.get.deterministic) { - failAnalysis(QueryCompilationErrors.nonDeterministicFilterInAggregateError) + throw QueryCompilationErrors.nonDeterministicFilterInAggregateError } AggregateExpression(agg, Complete, isDistinct, filter) // This function is not an aggregate function, just return the resolved one. case other if (isDistinct || filter.isDefined) => - failAnalysis(QueryCompilationErrors. - distinctOrFilterOnlyWithAggregateFunctionError(other.prettyName)) + throw QueryCompilationErrors.distinctOrFilterOnlyWithAggregateFunctionError( + other.prettyName) case e: String2TrimExpression if arguments.size == 2 => if (trimWarningEnabled.get) { log.warn("Two-parameter TRIM/LTRIM/RTRIM function signatures are deprecated." + @@ -2228,8 +2224,8 @@ class Analyzer(override val catalogManager: CatalogManager) // Checks if the number of the aliases equals to the number of output columns // in the subquery. if (columnNames.size != outputAttrs.size) { - u.failAnalysis(QueryCompilationErrors.aliasNumberNotMatchColumnNumberError( - columnNames.size, outputAttrs.size, u)) + throw QueryCompilationErrors.aliasNumberNotMatchColumnNumberError( + columnNames.size, outputAttrs.size, u) } val aliases = outputAttrs.zip(columnNames).map { case (attr, aliasName) => Alias(attr, aliasName)() @@ -2620,8 +2616,8 @@ class Analyzer(override val catalogManager: CatalogManager) } else if (names.isEmpty) { elementAttrs } else { - failAnalysis(QueryCompilationErrors.aliasesNumberNotMatchUDTFOutputError( - elementAttrs.size, names.mkString(","))) + throw QueryCompilationErrors.aliasesNumberNotMatchUDTFOutputError( + elementAttrs.size, names.mkString(",")) } } } @@ -2730,7 +2726,7 @@ class Analyzer(override val catalogManager: CatalogManager) wsc.copy(partitionSpec = newPartitionSpec, orderSpec = newOrderSpec) case WindowExpression(ae: AggregateExpression, _) if ae.filter.isDefined => - failAnalysis(QueryCompilationErrors.windowAggregateFunctionWithFilterNotSupportedError) + throw QueryCompilationErrors.windowAggregateFunctionWithFilterNotSupportedError // Extract Windowed AggregateExpression case we @ WindowExpression( @@ -2743,8 +2739,7 @@ class Analyzer(override val catalogManager: CatalogManager) WindowExpression(newAgg, spec) case AggregateExpression(aggFunc, _, _, _, _) if hasWindowFunction(aggFunc.children) => - failAnalysis( - QueryCompilationErrors.windowFunctionInsideAggregateFunctionNotAllowedError) + throw QueryCompilationErrors.windowFunctionInsideAggregateFunctionNotAllowedError // Extracts AggregateExpression. For example, for SUM(x) - Sum(y) OVER (...), // we need to extract SUM(x). @@ -2808,12 +2803,12 @@ class Analyzer(override val catalogManager: CatalogManager) // We do a final check and see if we only have a single Window Spec defined in an // expressions. if (distinctWindowSpec.isEmpty) { - failAnalysis(QueryCompilationErrors.expressionWithoutWindowExpressionError(expr)) + throw QueryCompilationErrors.expressionWithoutWindowExpressionError(expr) } else if (distinctWindowSpec.length > 1) { // newExpressionsWithWindowFunctions only have expressions with a single // WindowExpression. If we reach here, we have a bug. - failAnalysis(QueryCompilationErrors.expressionWithMultiWindowExpressionsError( - expr, distinctWindowSpec)) + throw QueryCompilationErrors.expressionWithMultiWindowExpressionsError( + expr, distinctWindowSpec) } else { val spec = distinctWindowSpec.head val specKey = (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr)) @@ -2841,10 +2836,10 @@ class Analyzer(override val catalogManager: CatalogManager) def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case Filter(condition, _) if hasWindowFunction(condition) => - failAnalysis(QueryCompilationErrors.windowFunctionsNotAllowedInsideWhereClauseError) + throw QueryCompilationErrors.windowFunctionNotAllowedError case UnresolvedHaving(condition, _) if hasWindowFunction(condition) => - failAnalysis(QueryCompilationErrors.windowFunctionsNotAllowedInsideHavingClauseError) + throw QueryCompilationErrors.windowFunctionNotAllowedError // Aggregate with Having clause. This rule works with an unresolved Aggregate because // a resolved Aggregate will not have Window Functions. @@ -3044,11 +3039,10 @@ class Analyzer(override val catalogManager: CatalogManager) def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { case WindowExpression(wf: FrameLessOffsetWindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f => - failAnalysis(QueryCompilationErrors. - cannotSpecifyWindowFrameError(wf.prettyName)) + throw QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName) case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != UnspecifiedFrame && wf.frame != f => - failAnalysis(QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, wf.frame)) + throw QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f, wf.frame) case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _, UnspecifiedFrame)) if wf.frame != UnspecifiedFrame => WindowExpression(wf, s.copy(frameSpecification = wf.frame)) @@ -3069,7 +3063,7 @@ class Analyzer(override val catalogManager: CatalogManager) object ResolveWindowOrder extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveExpressions { case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty => - failAnalysis(QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf)) + throw QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf) case WindowExpression(rank: RankLike, spec) if spec.resolved => val order = spec.orderSpec.map(_.child) WindowExpression(rank.withOrder(order), spec) @@ -3136,8 +3130,8 @@ class Analyzer(override val catalogManager: CatalogManager) i.userSpecifiedCols.map { col => i.table.resolve(Seq(col), resolver) - .getOrElse(i.table.failAnalysis( - QueryCompilationErrors.cannotResolveUserSpecifiedColumnsError(col, i.table))) + .getOrElse(throw QueryCompilationErrors.cannotResolveUserSpecifiedColumnsError( + col, i.table)) } } @@ -3146,8 +3140,8 @@ class Analyzer(override val catalogManager: CatalogManager) cols: Seq[NamedExpression], query: LogicalPlan): LogicalPlan = { if (cols.size != query.output.size) { - query.failAnalysis(QueryCompilationErrors.writeTableWithMismatchedColumnsError( - cols.size, query.output.size, query)) + throw QueryCompilationErrors.writeTableWithMismatchedColumnsError( + cols.size, query.output.size, query) } val nameToQueryExpr = cols.zip(query.output).toMap // Static partition columns in the table output should not appear in the column list @@ -3727,7 +3721,7 @@ object TimeWindowing extends Rule[LogicalPlan] { renamedPlan.withNewChildren(substitutedPlan :: Nil) } } else if (numWindowExpr > 1) { - p.failAnalysis(QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p)) + throw QueryCompilationErrors.multiTimeWindowExpressionsNotSupportedError(p) } else { p // Return unchanged. Analyzer will throw exception later } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 26da6c56a1d05..11c4883992560 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -50,10 +50,6 @@ trait CheckAnalysis extends PredicateHelper { throw new AnalysisException(msg) } - protected def failAnalysis(error: Throwable): Nothing = { - throw error - } - protected def containsMultipleGenerators(exprs: Seq[Expression]): Boolean = { exprs.flatMap(_.collect { case e: Generator => e diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala index ef7c116dee6e0..354a3fa0602a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala @@ -42,10 +42,6 @@ package object analysis { throw new AnalysisException(msg, t.origin.line, t.origin.startPosition) } - def failAnalysis(error: Throwable): Nothing = { - throw error - } - /** Fails the analysis at the point where a specific tree node was parsed. */ def failAnalysis(msg: String, cause: Throwable): Nothing = { throw new AnalysisException(msg, t.origin.line, t.origin.startPosition, cause = Some(cause)) From 4b4780a2a11da225cb5f2e7e0631bbe6168d2114 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 7 Dec 2020 18:50:03 +0800 Subject: [PATCH 08/11] Fix conflict --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f840152fe89e6..dc8c14855e26e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1049,7 +1049,7 @@ class Analyzer(override val catalogManager: CatalogManager) val maxNestedViewDepth = AnalysisContext.get.maxNestedViewDepth if (nestedViewDepth > maxNestedViewDepth) { throw QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( - desc.identifier, conf.maxNestedViewDepth, view) + desc.identifier, maxNestedViewDepth , view) } SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs, isTempView)) { executeSameContext(child) From 88c7eb47f343f9c8cffa2067cd1b46c24461060b Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 7 Dec 2020 19:00:57 +0800 Subject: [PATCH 09/11] Fix conflict --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index dc8c14855e26e..cb85d63df03cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1049,7 +1049,7 @@ class Analyzer(override val catalogManager: CatalogManager) val maxNestedViewDepth = AnalysisContext.get.maxNestedViewDepth if (nestedViewDepth > maxNestedViewDepth) { throw QueryCompilationErrors.viewDepthExceedsMaxResolutionDepthError( - desc.identifier, maxNestedViewDepth , view) + desc.identifier, maxNestedViewDepth, view) } SQLConf.withExistingConf(View.effectiveSQLConf(desc.viewSQLConfigs, isTempView)) { executeSameContext(child) From d151497fccb565c4456b3d06f97a5ad0f02df346 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 8 Dec 2020 10:35:41 +0800 Subject: [PATCH 10/11] Fix break change --- .../org/apache/spark/sql/QueryCompilationErrors.scala | 6 +++--- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index 209bd3fcb1b56..c3ad8e4f60a2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -231,7 +231,7 @@ object QueryCompilationErrors { } def invalidStarUsageError(prettyName: String): Throwable = { - new AnalysisException(s"Invalid usage of '*' in expression '$prettyName'") + new AnalysisException(s"Invalid usage of '*' in $prettyName") } def orderByPositionRangeError(index: Int, size: Int, t: TreeNode[_]): Throwable = { @@ -292,8 +292,8 @@ object QueryCompilationErrors { s"Please file a bug report with this error message, stack trace, and the query.") } - def windowFunctionNotAllowedError(): Throwable = { - new AnalysisException("It is not allowed to use window functions inside WHERE/HAVING clause") + def windowFunctionNotAllowedError(clauseName: String): Throwable = { + new AnalysisException(s"It is not allowed to use window functions inside $clauseName clause") } def cannotSpecifyWindowFrameError(prettyName: String): Throwable = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index cb85d63df03cf..f9a009190eafd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1476,7 +1476,7 @@ class Analyzer(override val catalogManager: CatalogManager) } ) case g: Generate if containsStar(g.generator.children) => - throw QueryCompilationErrors.invalidStarUsageError(g.generator.prettyName) + throw QueryCompilationErrors.invalidStarUsageError("explode/json_tuple/UDTF") // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, _, _) if !j.duplicateResolved => @@ -1736,7 +1736,7 @@ class Analyzer(override val catalogManager: CatalogManager) }) // count(*) has been replaced by count(1) case o if containsStar(o.children) => - throw QueryCompilationErrors.invalidStarUsageError(o.prettyName) + throw QueryCompilationErrors.invalidStarUsageError(s"expression '${o.prettyName}'") } } } @@ -2836,10 +2836,10 @@ class Analyzer(override val catalogManager: CatalogManager) def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsDown { case Filter(condition, _) if hasWindowFunction(condition) => - throw QueryCompilationErrors.windowFunctionNotAllowedError + throw QueryCompilationErrors.windowFunctionNotAllowedError("WHERE") case UnresolvedHaving(condition, _) if hasWindowFunction(condition) => - throw QueryCompilationErrors.windowFunctionNotAllowedError + throw QueryCompilationErrors.windowFunctionNotAllowedError("HAVING") // Aggregate with Having clause. This rule works with an unresolved Aggregate because // a resolved Aggregate will not have Window Functions. From 7440dad6639a5c9c005b5e90b0fb9cabd05913db Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 9 Dec 2020 14:50:28 +0800 Subject: [PATCH 11/11] Remove s --- .../scala/org/apache/spark/sql/QueryCompilationErrors.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index c3ad8e4f60a2a..87387b18dbab4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -289,7 +289,7 @@ object QueryCompilationErrors { def expressionWithMultiWindowExpressionsError( expr: NamedExpression, distinctWindowSpec: Seq[WindowSpecDefinition]): Throwable = { new AnalysisException(s"$expr has multiple Window Specifications ($distinctWindowSpec)." + - s"Please file a bug report with this error message, stack trace, and the query.") + "Please file a bug report with this error message, stack trace, and the query.") } def windowFunctionNotAllowedError(clauseName: String): Throwable = { @@ -308,7 +308,7 @@ object QueryCompilationErrors { def windowFunctionWithWindowFrameNotOrderedError(wf: WindowFunction): Throwable = { new AnalysisException(s"Window function $wf requires window to be ordered, please add " + s"ORDER BY clause. For example SELECT $wf(value_expr) OVER (PARTITION BY window_partition " + - s"ORDER BY window_ordering) from table") + "ORDER BY window_ordering) from table") } def cannotResolveUserSpecifiedColumnsError(col: String, t: TreeNode[_]): Throwable = { @@ -317,7 +317,7 @@ object QueryCompilationErrors { def writeTableWithMismatchedColumnsError( columnSize: Int, outputSize: Int, t: TreeNode[_]): Throwable = { - new AnalysisException(s"Cannot write to table due to mismatched user specified column " + + new AnalysisException("Cannot write to table due to mismatched user specified column " + s"size($columnSize) and data column size($outputSize)", t.origin.line, t.origin.startPosition) }