From 65ff151744457bf1561ef651b87453f204cc3548 Mon Sep 17 00:00:00 2001 From: anchovYu Date: Thu, 20 Aug 2020 11:48:02 -0700 Subject: [PATCH 1/6] initial commit --- .../org/apache/spark/sql/CatalystErrors.scala | 167 ++++++++++++++++++ .../sql/catalyst/analysis/Analyzer.scala | 85 +++------ 2 files changed, 195 insertions(+), 57 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala new file mode 100644 index 0000000000000..81780178fb38d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.catalyst.expressions.{Expression, GroupingID} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util.toPrettySQL +import org.apache.spark.sql.connector.catalog.TableChange +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType} + +/** + * Object for grouping all error messages in catalyst. + * Currently it includes all AnalysisExcpetions created and thrown directly in + * org.apache.spark.sql.catalyst.analysis.Analyzer. + */ +object CatalystErrors { + def groupingIDMismatchError(groupingID: GroupingID, groupByExprs: Seq[Expression]): Throwable = { + new AnalysisException( + s"Columns of grouping_id (${groupingID.groupByExprs.mkString(",")}) " + + s"does not match grouping columns (${groupByExprs.mkString(",")})") + } + + def groupingColInvalidError(groupingCol: Expression, groupByExprs: Seq[Expression]): Throwable = { + new AnalysisException( + s"Column of grouping ($groupingCol) can't be found " + + s"in grouping columns ${groupByExprs.mkString(",")}") + } + + def groupingSizeTooLargeError(sizeLimit: Int): Throwable = { + new AnalysisException( + s"Grouping sets size cannot be greater than $sizeLimit") + } + + def unorderablePivotColError(pivotCol: Expression): Throwable = { + new AnalysisException( + s"Invalid pivot column '$pivotCol'. Pivot columns must be comparable." + ) + } + + def nonliteralPivotValError(pivotVal: Expression): Throwable = { + new AnalysisException( + s"Literal expressions required for pivot values, found '$pivotVal'") + } + + def pivotValDataTypeMismatchError(pivotVal: Expression, pivotCol: Expression): Throwable = { + new AnalysisException( + s"Invalid pivot value '$pivotVal': " + + s"value data type ${pivotVal.dataType.simpleString} does not match " + + s"pivot column data type ${pivotCol.dataType.catalogString}") + } + + def unresolvedRelationForTimeTravelError(): Throwable = { + new AnalysisException("Cannot specify time travel in multiple formats.") + } + + def unsupportedIfNotExistsError(tableName: String): Throwable = { + new AnalysisException( + s"Cannot write, IF NOT EXISTS is not supported for table: $tableName") + } + + def nonPartitionColError(partitionName: String): Throwable = { + new AnalysisException( + s"PARTITION clause cannot contain a non-partition column name: $partitionName") + } + + def addStaticValToUnknownColError(staticName: String): Throwable = { + new AnalysisException( + s"Cannot add static value for unknown column: $staticName") + } + + def unknownStaticPartitionColError(name: String): Throwable = { + new AnalysisException(s"Unknown static partition column: $name") + } + + def nestedGeneratorError(trimmedNestedGenerator: Expression): Throwable = { + new AnalysisException( + "Generators are not supported when it's nested in " + + "expressions, but got: " + toPrettySQL(trimmedNestedGenerator)) + } + + def moreThanOneGeneratorError(generators: Seq[Expression], clause: String): Throwable = { + new AnalysisException( + s"Only one generator allowed per $clause clause but found " + + generators.size + ": " + generators.map(toPrettySQL).mkString(", ")) + } + + def generatorOutsideSelectError(plan: LogicalPlan): Throwable = { + new AnalysisException( + "Generators are not supported outside the SELECT clause, but " + + "got: " + plan.simpleString(SQLConf.get.maxToStringFields)) + } + + def legacyStoreAssignmentPolicyError(): Throwable = { + val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key + new AnalysisException( + s"LEGACY store assignment policy is disallowed in Spark data source V2. " + + s"Please set the configuration $configKey to other values.") + } + + def unresolvedUsingColForJoinError( + colName: String, plan: LogicalPlan, side: String): Throwable = { + new AnalysisException( + s"USING column `$colName` cannot be resolved on the $side " + + s"side of the join. The $side-side columns: [${plan.output.map(_.name).mkString(", ")}]") + } + + def dataTypeMismatchForDeserializerError( + dataType: DataType, desiredType: String): Throwable = { + val quantifier = if (desiredType.equals("array")) "an" else "a" + new AnalysisException( + s"need $quantifier $desiredType field but got " + dataType.catalogString) + } + + def fieldNumberMismatchForDeserializerError( + schema: StructType, maxOrdinal: Int): Throwable = { + new AnalysisException( + s"Try to map ${schema.catalogString} to Tuple${maxOrdinal + 1}, " + + s"but failed as the number of fields does not line up.") + } + + def upCastFailureError( + fromStr: String, from: Expression, to: DataType, walkedTypePath: Seq[String]): Throwable = { + new AnalysisException( + s"Cannot up cast $fromStr from " + + s"${from.dataType.catalogString} to ${to.catalogString}.\n" + + s"The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") + + "You can either add an explicit cast to the input data or choose a higher precision " + + "type of the field in the target object") + } + + def unsupportedAbstractDataTypeForUpCastError(gotType: AbstractDataType): Throwable = { + new AnalysisException( + s"UpCast only support DecimalType as AbstractDataType yet, but got: $gotType") + } + + def outerScopeFailureForNewInstanceError(className: String): Throwable = { + new AnalysisException( + s"Unable to generate an encoder for inner class `$className` without " + + "access to the scope that this class was defined in.\n" + + "Try moving this class out of its parent class.") + } + + def referenceColNotFoundForAlterTableChangesError( + after: TableChange.After, parentName: String): Throwable = { + new AnalysisException( + s"Couldn't find the reference column for $after at $parentName") + } + +} + + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4516c71bbc514..d5088e07955bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.{AnalysisException, CatalystErrors} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -443,9 +443,7 @@ class Analyzer( e.groupByExprs.map(_.canonicalized) == groupByExprs.map(_.canonicalized)) { Alias(gid, toPrettySQL(e))() } else { - throw new AnalysisException( - s"Columns of grouping_id (${e.groupByExprs.mkString(",")}) does not match " + - s"grouping columns (${groupByExprs.mkString(",")})") + throw CatalystErrors.groupingIDMismatchError(e, groupByExprs) } case e @ Grouping(col: Expression) => val idx = groupByExprs.indexWhere(_.semanticEquals(col)) @@ -453,8 +451,7 @@ class Analyzer( Alias(Cast(BitwiseAnd(ShiftRight(gid, Literal(groupByExprs.length - 1 - idx)), Literal(1L)), ByteType), toPrettySQL(e))() } else { - throw new AnalysisException(s"Column of grouping ($col) can't be found " + - s"in grouping columns ${groupByExprs.mkString(",")}") + throw CatalystErrors.groupingColInvalidError(col, groupByExprs) } } } @@ -570,8 +567,7 @@ class Analyzer( val finalGroupByExpressions = getFinalGroupByExpressions(selectedGroupByExprs, groupByExprs) if (finalGroupByExpressions.size > GroupingID.dataType.defaultSize * 8) { - throw new AnalysisException( - s"Grouping sets size cannot be greater than ${GroupingID.dataType.defaultSize * 8}") + throw CatalystErrors.groupingSizeTooLargeError(GroupingID.dataType.defaultSize * 8) } // Expand works by setting grouping expressions to null as determined by the @@ -707,8 +703,7 @@ class Analyzer( || !p.pivotColumn.resolved || !p.pivotValues.forall(_.resolved) => p case Pivot(groupByExprsOpt, pivotColumn, pivotValues, aggregates, child) => if (!RowOrdering.isOrderable(pivotColumn.dataType)) { - throw new AnalysisException( - s"Invalid pivot column '${pivotColumn}'. Pivot columns must be comparable.") + throw CatalystErrors.unorderablePivotColError(pivotColumn) } // Check all aggregate expressions. aggregates.foreach(checkValidAggregateExpression) @@ -719,13 +714,10 @@ class Analyzer( case _ => value.foldable } if (!foldable) { - throw new AnalysisException( - s"Literal expressions required for pivot values, found '$value'") + throw CatalystErrors.nonliteralPivotValError(value) } if (!Cast.canCast(value.dataType, pivotColumn.dataType)) { - throw new AnalysisException(s"Invalid pivot value '$value': " + - s"value data type ${value.dataType.simpleString} does not match " + - s"pivot column data type ${pivotColumn.dataType.catalogString}") + throw CatalystErrors.pivotValDataTypeMismatchError(value, pivotColumn) } Cast(value, pivotColumn.dataType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) } @@ -1048,8 +1040,7 @@ class Analyzer( case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _) if i.query.resolved => // ifPartitionNotExists is append with validation, but validation is not supported if (i.ifPartitionNotExists) { - throw new AnalysisException( - s"Cannot write, IF NOT EXISTS is not supported for table: ${r.table.name}") + throw CatalystErrors.unsupportedIfNotExistsError(r.table.name) } val partCols = partitionColumnNames(r.table) @@ -1086,8 +1077,7 @@ class Analyzer( partitionColumnNames.find(name => conf.resolver(name, partitionName)) match { case Some(_) => case None => - throw new AnalysisException( - s"PARTITION clause cannot contain a non-partition column name: $partitionName") + throw CatalystErrors.nonPartitionColError(partitionName) } } } @@ -1109,8 +1099,7 @@ class Analyzer( case Some(attr) => attr.name -> staticName case _ => - throw new AnalysisException( - s"Cannot add static value for unknown column: $staticName") + throw CatalystErrors.addStaticValToUnknownColError(staticName) }).toMap val queryColumns = query.output.iterator @@ -1152,7 +1141,7 @@ class Analyzer( // an UnresolvedAttribute. EqualTo(UnresolvedAttribute(attr.name), Cast(Literal(value), attr.dataType)) case None => - throw new AnalysisException(s"Unknown static partition column: $name") + throw CatalystErrors.unknownStaticPartitionColError(name) } }.reduce(And) } @@ -2365,23 +2354,19 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case Project(projectList, _) if projectList.exists(hasNestedGenerator) => val nestedGenerator = projectList.find(hasNestedGenerator).get - throw new AnalysisException("Generators are not supported when it's nested in " + - "expressions, but got: " + toPrettySQL(trimAlias(nestedGenerator))) + throw CatalystErrors.nestedGeneratorError(trimAlias(nestedGenerator)) case Project(projectList, _) if projectList.count(hasGenerator) > 1 => val generators = projectList.filter(hasGenerator).map(trimAlias) - throw new AnalysisException("Only one generator allowed per select clause but found " + - generators.size + ": " + generators.map(toPrettySQL).mkString(", ")) + throw CatalystErrors.moreThanOneGeneratorError(generators, "select") case Aggregate(_, aggList, _) if aggList.exists(hasNestedGenerator) => val nestedGenerator = aggList.find(hasNestedGenerator).get - throw new AnalysisException("Generators are not supported when it's nested in " + - "expressions, but got: " + toPrettySQL(trimAlias(nestedGenerator))) + throw CatalystErrors.nestedGeneratorError(trimAlias(nestedGenerator)) case Aggregate(_, aggList, _) if aggList.count(hasGenerator) > 1 => val generators = aggList.filter(hasGenerator).map(trimAlias) - throw new AnalysisException("Only one generator allowed per aggregate clause but found " + - generators.size + ": " + generators.map(toPrettySQL).mkString(", ")) + throw CatalystErrors.moreThanOneGeneratorError(generators, "aggregate") case agg @ Aggregate(groupList, aggList, child) if aggList.forall { case AliasedGenerator(_, _, _) => true @@ -2464,8 +2449,7 @@ class Analyzer( case g: Generate => g case p if p.expressions.exists(hasGenerator) => - throw new AnalysisException("Generators are not supported outside the SELECT clause, but " + - "got: " + p.simpleString(SQLConf.get.maxToStringFields)) + throw CatalystErrors.generatorOutsideSelectError(p) } } @@ -3026,10 +3010,7 @@ class Analyzer( private def validateStoreAssignmentPolicy(): Unit = { // SPARK-28730: LEGACY store assignment policy is disallowed in data source v2. if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) { - val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key - throw new AnalysisException(s""" - |"LEGACY" store assignment policy is disallowed in Spark data source V2. - |Please set the configuration $configKey to other values.""".stripMargin) + throw CatalystErrors.legacyStoreAssignmentPolicyError() } } @@ -3042,14 +3023,12 @@ class Analyzer( hint: JoinHint) = { val leftKeys = joinNames.map { keyName => left.output.find(attr => resolver(attr.name, keyName)).getOrElse { - throw new AnalysisException(s"USING column `$keyName` cannot be resolved on the left " + - s"side of the join. The left-side columns: [${left.output.map(_.name).mkString(", ")}]") + throw CatalystErrors.unresolvedUsingColForJoinError(keyName, left, "left") } } val rightKeys = joinNames.map { keyName => right.output.find(attr => resolver(attr.name, keyName)).getOrElse { - throw new AnalysisException(s"USING column `$keyName` cannot be resolved on the right " + - s"side of the join. The right-side columns: [${right.output.map(_.name).mkString(", ")}]") + throw CatalystErrors.unresolvedUsingColForJoinError(keyName, right, "right") } } val joinPairs = leftKeys.zip(rightKeys) @@ -3112,7 +3091,8 @@ class Analyzer( ExtractValue(child, fieldName, resolver) } case other => - throw new AnalysisException("need an array field but got " + other.catalogString) + throw CatalystErrors.dataTypeMismatchForDeserializerError(other, + "array") } case u: UnresolvedCatalystToExternalMap if u.child.resolved => u.child.dataType match { @@ -3122,7 +3102,7 @@ class Analyzer( ExtractValue(child, fieldName, resolver) } case other => - throw new AnalysisException("need a map field but got " + other.catalogString) + throw CatalystErrors.dataTypeMismatchForDeserializerError(other, "map") } } validateNestedTupleFields(result) @@ -3131,8 +3111,7 @@ class Analyzer( } private def fail(schema: StructType, maxOrdinal: Int): Unit = { - throw new AnalysisException(s"Try to map ${schema.catalogString} to Tuple${maxOrdinal + 1}" + - ", but failed as the number of fields does not line up.") + throw CatalystErrors.fieldNumberMismatchForDeserializerError(schema, maxOrdinal) } /** @@ -3191,10 +3170,7 @@ class Analyzer( case n: NewInstance if n.childrenResolved && !n.resolved => val outer = OuterScopes.getOuterScope(n.cls) if (outer == null) { - throw new AnalysisException( - s"Unable to generate an encoder for inner class `${n.cls.getName}` without " + - "access to the scope that this class was defined in.\n" + - "Try moving this class out of its parent class.") + throw CatalystErrors.outerScopeFailureForNewInstanceError(n.cls.getName) } n.copy(outerPointer = Some(outer)) } @@ -3210,11 +3186,7 @@ class Analyzer( case l: LambdaVariable => "array element" case e => e.sql } - throw new AnalysisException(s"Cannot up cast $fromStr from " + - s"${from.dataType.catalogString} to ${to.catalogString}.\n" + - "The type path of the target object is:\n" + walkedTypePath.mkString("", "\n", "\n") + - "You can either add an explicit cast to the input data or choose a higher precision " + - "type of the field in the target object") + throw CatalystErrors.upCastFailureError(fromStr, from, to, walkedTypePath) } def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { @@ -3225,8 +3197,7 @@ class Analyzer( case u @ UpCast(child, _, _) if !child.resolved => u case UpCast(_, target, _) if target != DecimalType && !target.isInstanceOf[DataType] => - throw new AnalysisException( - s"UpCast only support DecimalType as AbstractDataType yet, but got: $target") + throw CatalystErrors.unsupportedAbstractDataTypeForUpCastError(target) case UpCast(child, target, walkedTypePath) if target == DecimalType && child.dataType.isInstanceOf[DecimalType] => @@ -3405,8 +3376,8 @@ class Analyzer( case Some(colName) => ColumnPosition.after(colName) case None => - throw new AnalysisException("Couldn't find the reference column for " + - s"$after at $parentName") + throw CatalystErrors.referenceColNotFoundForAlterTableChangesError(after, + parentName) } case other => other } From 2d76356f87f4bb5f3a76644bb51d20f876acc04e Mon Sep 17 00:00:00 2001 From: anchovYu Date: Thu, 20 Aug 2020 14:14:10 -0700 Subject: [PATCH 2/6] remove unnecessary function --- .../src/main/scala/org/apache/spark/sql/CatalystErrors.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala index 81780178fb38d..62ee112a88f46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala @@ -65,10 +65,6 @@ object CatalystErrors { s"pivot column data type ${pivotCol.dataType.catalogString}") } - def unresolvedRelationForTimeTravelError(): Throwable = { - new AnalysisException("Cannot specify time travel in multiple formats.") - } - def unsupportedIfNotExistsError(tableName: String): Throwable = { new AnalysisException( s"Cannot write, IF NOT EXISTS is not supported for table: $tableName") From 5877edf048312734e5320965bf76657c2b04e27a Mon Sep 17 00:00:00 2001 From: anchovYu Date: Wed, 9 Sep 2020 23:12:56 -0700 Subject: [PATCH 3/6] update string prefix --- .../src/main/scala/org/apache/spark/sql/CatalystErrors.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala index 62ee112a88f46..7e8c21e4877b0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala @@ -105,7 +105,7 @@ object CatalystErrors { def legacyStoreAssignmentPolicyError(): Throwable = { val configKey = SQLConf.STORE_ASSIGNMENT_POLICY.key new AnalysisException( - s"LEGACY store assignment policy is disallowed in Spark data source V2. " + + "LEGACY store assignment policy is disallowed in Spark data source V2. " + s"Please set the configuration $configKey to other values.") } @@ -127,7 +127,7 @@ object CatalystErrors { schema: StructType, maxOrdinal: Int): Throwable = { new AnalysisException( s"Try to map ${schema.catalogString} to Tuple${maxOrdinal + 1}, " + - s"but failed as the number of fields does not line up.") + "but failed as the number of fields does not line up.") } def upCastFailureError( From 032c9160b3cbaefa2be2a3ba7c4c57530a3e4d6c Mon Sep 17 00:00:00 2001 From: anchovYu Date: Sun, 15 Nov 2020 21:10:43 -0800 Subject: [PATCH 4/6] update package and object name --- .../src/main/scala/org/apache/spark/sql/CatalystErrors.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala index 7e8c21e4877b0..ace8ccd01d779 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql +package org.apache.spark.sql.errors import org.apache.spark.sql.catalyst.expressions.{Expression, GroupingID} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType} * Currently it includes all AnalysisExcpetions created and thrown directly in * org.apache.spark.sql.catalyst.analysis.Analyzer. */ -object CatalystErrors { +object QueryCompilationErrors { def groupingIDMismatchError(groupingID: GroupingID, groupByExprs: Seq[Expression]): Throwable = { new AnalysisException( s"Columns of grouping_id (${groupingID.groupByExprs.mkString(",")}) " + From f39172149a266072128f5adf8308cad55efe2d45 Mon Sep 17 00:00:00 2001 From: anchovYu Date: Sun, 15 Nov 2020 21:27:52 -0800 Subject: [PATCH 5/6] update object name and packege include --- ...ors.scala => QueryCompilationErrors.scala} | 1 + .../sql/catalyst/analysis/Analyzer.scala | 53 ++++++++++--------- 2 files changed, 28 insertions(+), 26 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/{CatalystErrors.scala => QueryCompilationErrors.scala} (99%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala similarity index 99% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index ace8ccd01d779..3c1dbc7dc9650 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/CatalystErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.errors +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, GroupingID} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.toPrettySQL diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d5088e07955bd..9a912b92ee5d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -25,7 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random -import org.apache.spark.sql.{AnalysisException, CatalystErrors} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.OuterScopes @@ -42,6 +42,7 @@ import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform} +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{PartitionOverwriteMode, StoreAssignmentPolicy} @@ -443,7 +444,7 @@ class Analyzer( e.groupByExprs.map(_.canonicalized) == groupByExprs.map(_.canonicalized)) { Alias(gid, toPrettySQL(e))() } else { - throw CatalystErrors.groupingIDMismatchError(e, groupByExprs) + throw QueryCompilationErrors.groupingIDMismatchError(e, groupByExprs) } case e @ Grouping(col: Expression) => val idx = groupByExprs.indexWhere(_.semanticEquals(col)) @@ -451,7 +452,7 @@ class Analyzer( Alias(Cast(BitwiseAnd(ShiftRight(gid, Literal(groupByExprs.length - 1 - idx)), Literal(1L)), ByteType), toPrettySQL(e))() } else { - throw CatalystErrors.groupingColInvalidError(col, groupByExprs) + throw QueryCompilationErrors.groupingColInvalidError(col, groupByExprs) } } } @@ -567,7 +568,7 @@ class Analyzer( val finalGroupByExpressions = getFinalGroupByExpressions(selectedGroupByExprs, groupByExprs) if (finalGroupByExpressions.size > GroupingID.dataType.defaultSize * 8) { - throw CatalystErrors.groupingSizeTooLargeError(GroupingID.dataType.defaultSize * 8) + throw QueryCompilationErrors.groupingSizeTooLargeError(GroupingID.dataType.defaultSize * 8) } // Expand works by setting grouping expressions to null as determined by the @@ -703,7 +704,7 @@ class Analyzer( || !p.pivotColumn.resolved || !p.pivotValues.forall(_.resolved) => p case Pivot(groupByExprsOpt, pivotColumn, pivotValues, aggregates, child) => if (!RowOrdering.isOrderable(pivotColumn.dataType)) { - throw CatalystErrors.unorderablePivotColError(pivotColumn) + throw QueryCompilationErrors.unorderablePivotColError(pivotColumn) } // Check all aggregate expressions. aggregates.foreach(checkValidAggregateExpression) @@ -714,10 +715,10 @@ class Analyzer( case _ => value.foldable } if (!foldable) { - throw CatalystErrors.nonliteralPivotValError(value) + throw QueryCompilationErrors.nonliteralPivotValError(value) } if (!Cast.canCast(value.dataType, pivotColumn.dataType)) { - throw CatalystErrors.pivotValDataTypeMismatchError(value, pivotColumn) + throw QueryCompilationErrors.pivotValDataTypeMismatchError(value, pivotColumn) } Cast(value, pivotColumn.dataType, Some(conf.sessionLocalTimeZone)).eval(EmptyRow) } @@ -1040,7 +1041,7 @@ class Analyzer( case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _) if i.query.resolved => // ifPartitionNotExists is append with validation, but validation is not supported if (i.ifPartitionNotExists) { - throw CatalystErrors.unsupportedIfNotExistsError(r.table.name) + throw QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name) } val partCols = partitionColumnNames(r.table) @@ -1077,7 +1078,7 @@ class Analyzer( partitionColumnNames.find(name => conf.resolver(name, partitionName)) match { case Some(_) => case None => - throw CatalystErrors.nonPartitionColError(partitionName) + throw QueryCompilationErrors.nonPartitionColError(partitionName) } } } @@ -1099,7 +1100,7 @@ class Analyzer( case Some(attr) => attr.name -> staticName case _ => - throw CatalystErrors.addStaticValToUnknownColError(staticName) + throw QueryCompilationErrors.addStaticValToUnknownColError(staticName) }).toMap val queryColumns = query.output.iterator @@ -1141,7 +1142,7 @@ class Analyzer( // an UnresolvedAttribute. EqualTo(UnresolvedAttribute(attr.name), Cast(Literal(value), attr.dataType)) case None => - throw CatalystErrors.unknownStaticPartitionColError(name) + throw QueryCompilationErrors.unknownStaticPartitionColError(name) } }.reduce(And) } @@ -2354,19 +2355,19 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case Project(projectList, _) if projectList.exists(hasNestedGenerator) => val nestedGenerator = projectList.find(hasNestedGenerator).get - throw CatalystErrors.nestedGeneratorError(trimAlias(nestedGenerator)) + throw QueryCompilationErrors.nestedGeneratorError(trimAlias(nestedGenerator)) case Project(projectList, _) if projectList.count(hasGenerator) > 1 => val generators = projectList.filter(hasGenerator).map(trimAlias) - throw CatalystErrors.moreThanOneGeneratorError(generators, "select") + throw QueryCompilationErrors.moreThanOneGeneratorError(generators, "select") case Aggregate(_, aggList, _) if aggList.exists(hasNestedGenerator) => val nestedGenerator = aggList.find(hasNestedGenerator).get - throw CatalystErrors.nestedGeneratorError(trimAlias(nestedGenerator)) + throw QueryCompilationErrors.nestedGeneratorError(trimAlias(nestedGenerator)) case Aggregate(_, aggList, _) if aggList.count(hasGenerator) > 1 => val generators = aggList.filter(hasGenerator).map(trimAlias) - throw CatalystErrors.moreThanOneGeneratorError(generators, "aggregate") + throw QueryCompilationErrors.moreThanOneGeneratorError(generators, "aggregate") case agg @ Aggregate(groupList, aggList, child) if aggList.forall { case AliasedGenerator(_, _, _) => true @@ -2449,7 +2450,7 @@ class Analyzer( case g: Generate => g case p if p.expressions.exists(hasGenerator) => - throw CatalystErrors.generatorOutsideSelectError(p) + throw QueryCompilationErrors.generatorOutsideSelectError(p) } } @@ -3010,7 +3011,7 @@ class Analyzer( private def validateStoreAssignmentPolicy(): Unit = { // SPARK-28730: LEGACY store assignment policy is disallowed in data source v2. if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) { - throw CatalystErrors.legacyStoreAssignmentPolicyError() + throw QueryCompilationErrors.legacyStoreAssignmentPolicyError() } } @@ -3023,12 +3024,12 @@ class Analyzer( hint: JoinHint) = { val leftKeys = joinNames.map { keyName => left.output.find(attr => resolver(attr.name, keyName)).getOrElse { - throw CatalystErrors.unresolvedUsingColForJoinError(keyName, left, "left") + throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, left, "left") } } val rightKeys = joinNames.map { keyName => right.output.find(attr => resolver(attr.name, keyName)).getOrElse { - throw CatalystErrors.unresolvedUsingColForJoinError(keyName, right, "right") + throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, right, "right") } } val joinPairs = leftKeys.zip(rightKeys) @@ -3091,7 +3092,7 @@ class Analyzer( ExtractValue(child, fieldName, resolver) } case other => - throw CatalystErrors.dataTypeMismatchForDeserializerError(other, + throw QueryCompilationErrors.dataTypeMismatchForDeserializerError(other, "array") } case u: UnresolvedCatalystToExternalMap if u.child.resolved => @@ -3102,7 +3103,7 @@ class Analyzer( ExtractValue(child, fieldName, resolver) } case other => - throw CatalystErrors.dataTypeMismatchForDeserializerError(other, "map") + throw QueryCompilationErrors.dataTypeMismatchForDeserializerError(other, "map") } } validateNestedTupleFields(result) @@ -3111,7 +3112,7 @@ class Analyzer( } private def fail(schema: StructType, maxOrdinal: Int): Unit = { - throw CatalystErrors.fieldNumberMismatchForDeserializerError(schema, maxOrdinal) + throw QueryCompilationErrors.fieldNumberMismatchForDeserializerError(schema, maxOrdinal) } /** @@ -3170,7 +3171,7 @@ class Analyzer( case n: NewInstance if n.childrenResolved && !n.resolved => val outer = OuterScopes.getOuterScope(n.cls) if (outer == null) { - throw CatalystErrors.outerScopeFailureForNewInstanceError(n.cls.getName) + throw QueryCompilationErrors.outerScopeFailureForNewInstanceError(n.cls.getName) } n.copy(outerPointer = Some(outer)) } @@ -3186,7 +3187,7 @@ class Analyzer( case l: LambdaVariable => "array element" case e => e.sql } - throw CatalystErrors.upCastFailureError(fromStr, from, to, walkedTypePath) + throw QueryCompilationErrors.upCastFailureError(fromStr, from, to, walkedTypePath) } def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { @@ -3197,7 +3198,7 @@ class Analyzer( case u @ UpCast(child, _, _) if !child.resolved => u case UpCast(_, target, _) if target != DecimalType && !target.isInstanceOf[DataType] => - throw CatalystErrors.unsupportedAbstractDataTypeForUpCastError(target) + throw QueryCompilationErrors.unsupportedAbstractDataTypeForUpCastError(target) case UpCast(child, target, walkedTypePath) if target == DecimalType && child.dataType.isInstanceOf[DecimalType] => @@ -3376,7 +3377,7 @@ class Analyzer( case Some(colName) => ColumnPosition.after(colName) case None => - throw CatalystErrors.referenceColNotFoundForAlterTableChangesError(after, + throw QueryCompilationErrors.referenceColNotFoundForAlterTableChangesError(after, parentName) } case other => other From 645d81bb4622c32119adab7c21c18ea3cce14fdb Mon Sep 17 00:00:00 2001 From: anchovYu Date: Sun, 15 Nov 2020 23:53:02 -0800 Subject: [PATCH 6/6] update comment and function name --- .../scala/org/apache/spark/sql/QueryCompilationErrors.scala | 4 ++-- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala index 3c1dbc7dc9650..c680502cb328f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/QueryCompilationErrors.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{AbstractDataType, DataType, StructType} /** - * Object for grouping all error messages in catalyst. + * Object for grouping all error messages of the query compilation. * Currently it includes all AnalysisExcpetions created and thrown directly in * org.apache.spark.sql.catalyst.analysis.Analyzer. */ @@ -54,7 +54,7 @@ object QueryCompilationErrors { ) } - def nonliteralPivotValError(pivotVal: Expression): Throwable = { + def nonLiteralPivotValError(pivotVal: Expression): Throwable = { new AnalysisException( s"Literal expressions required for pivot values, found '$pivotVal'") } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 9a912b92ee5d7..20ec921e54740 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -715,7 +715,7 @@ class Analyzer( case _ => value.foldable } if (!foldable) { - throw QueryCompilationErrors.nonliteralPivotValError(value) + throw QueryCompilationErrors.nonLiteralPivotValError(value) } if (!Cast.canCast(value.dataType, pivotColumn.dataType)) { throw QueryCompilationErrors.pivotValDataTypeMismatchError(value, pivotColumn)