diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index c10b796285174..5b8fc51b7a654 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.parser.ParserUtils import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} -import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -34,8 +33,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully * resolved. */ -class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) - extends TreeNodeException(tree, s"Invalid call to $function on unresolved object", null) +class UnresolvedException(function: String) + extends AnalysisException(s"Invalid call to $function on unresolved object") /** * Holds the name of a relation that has yet to be looked up in a catalog. @@ -145,10 +144,10 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un def name: String = nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") + override def exprId: ExprId = throw new UnresolvedException("exprId") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") override lazy val resolved = false override def newInstance(): UnresolvedAttribute = this @@ -235,10 +234,10 @@ object UnresolvedAttribute { case class UnresolvedGenerator(name: FunctionIdentifier, children: Seq[Expression]) extends Generator { - override def elementSchema: StructType = throw new UnresolvedException(this, "elementTypes") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def foldable: Boolean = throw new UnresolvedException(this, "foldable") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def elementSchema: StructType = throw new UnresolvedException("elementTypes") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def foldable: Boolean = throw new UnresolvedException("foldable") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false override def prettyName: String = name.unquotedString @@ -264,8 +263,8 @@ case class UnresolvedFunction( override def children: Seq[Expression] = arguments ++ filter.toSeq - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false override def prettyName: String = name.unquotedString @@ -287,13 +286,13 @@ object UnresolvedFunction { */ abstract class Star extends LeafExpression with NamedExpression { - override def name: String = throw new UnresolvedException(this, "name") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") - override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def name: String = throw new UnresolvedException("name") + override def exprId: ExprId = throw new UnresolvedException("exprId") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") + override def toAttribute: Attribute = throw new UnresolvedException("toAttribute") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] @@ -412,19 +411,19 @@ case class UnresolvedRegex(regexPattern: String, table: Option[String], caseSens case class MultiAlias(child: Expression, names: Seq[String]) extends UnaryExpression with NamedExpression with Unevaluable { - override def name: String = throw new UnresolvedException(this, "name") + override def name: String = throw new UnresolvedException("name") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") + override def exprId: ExprId = throw new UnresolvedException("exprId") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") + override def dataType: DataType = throw new UnresolvedException("dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def nullable: Boolean = throw new UnresolvedException("nullable") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") - override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") + override def toAttribute: Attribute = throw new UnresolvedException("toAttribute") - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false @@ -439,7 +438,7 @@ case class MultiAlias(child: Expression, names: Seq[String]) * @param expressions Expressions to expand. */ case class ResolvedStar(expressions: Seq[NamedExpression]) extends Star with Unevaluable { - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = expressions override def toString: String = expressions.mkString("ResolvedStar(", ", ", ")") } @@ -458,8 +457,8 @@ case class UnresolvedExtractValue(child: Expression, extraction: Expression) override def left: Expression = child override def right: Expression = extraction - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false override def toString: String = s"$child[$extraction]" @@ -479,13 +478,13 @@ case class UnresolvedAlias( aliasFunc: Option[Expression => String] = None) extends UnaryExpression with NamedExpression with Unevaluable { - override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def name: String = throw new UnresolvedException(this, "name") - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def toAttribute: Attribute = throw new UnresolvedException("toAttribute") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") + override def exprId: ExprId = throw new UnresolvedException("exprId") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def name: String = throw new UnresolvedException("name") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false } @@ -527,14 +526,14 @@ case class UnresolvedDeserializer(deserializer: Expression, inputAttributes: Seq require(inputAttributes.forall(_.resolved), "Input attributes must all be resolved.") override def child: Expression = deserializer - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false } case class GetColumnByOrdinal(ordinal: Int, dataType: DataType) extends LeafExpression with Unevaluable with NonSQLExpression { - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false } @@ -550,8 +549,8 @@ case class GetColumnByOrdinal(ordinal: Int, dataType: DataType) extends LeafExpr */ case class UnresolvedOrdinal(ordinal: Int) extends LeafExpression with Unevaluable with NonSQLExpression { - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false } @@ -571,7 +570,7 @@ case class UnresolvedHaving( * A place holder expression used in random functions, will be replaced after analyze. */ case object UnresolvedSeed extends LeafExpression with Unevaluable { - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") override lazy val resolved = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala deleted file mode 100644 index 09a6b769da3f4..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst - -import scala.util.control.NonFatal - -import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.trees.TreeNode - -/** - * Functions for attaching and retrieving trees that are associated with errors. - */ -package object errors { - - class TreeNodeException[TreeType <: TreeNode[_]]( - @transient val tree: TreeType, - msg: String, - cause: Throwable) - extends Exception(msg, cause) { - - val treeString = tree.toString - - // Yes, this is the same as a default parameter, but... those don't seem to work with SBT - // external project dependencies for some reason. - def this(tree: TreeType, msg: String) = this(tree, msg, null) - - override def getMessage: String = { - s"${super.getMessage}, tree:${if (treeString contains "\n") "\n" else " "}$tree" - } - } - - /** - * Wraps any exceptions that are thrown while executing `f` in a - * [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`. - */ - def attachTree[TreeType <: TreeNode[_], A](tree: TreeType, msg: String = "")(f: => A): A = { - try f catch { - // SPARK-16748: We do not want SparkExceptions from job failures in the planning phase - // to create TreeNodeException. Hence, wrap exception only if it is not SparkException. - case NonFatal(e) if !e.isInstanceOf[SparkException] => - throw new TreeNodeException(tree, msg, e) - } - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 7ae5924b20faf..4d303aa95b78a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, JavaCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.types._ @@ -72,17 +71,16 @@ object BindReferences extends Logging { input: AttributeSeq, allowFailures: Boolean = false): A = { expression.transform { case a: AttributeReference => - attachTree(a, "Binding attribute") { - val ordinal = input.indexOf(a.exprId) - if (ordinal == -1) { - if (allowFailures) { - a - } else { - sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}") - } + val ordinal = input.indexOf(a.exprId) + if (ordinal == -1) { + if (allowFailures) { + a } else { - BoundReference(ordinal, a.dataType, input(ordinal).nullable) + throw new IllegalStateException( + s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}") } + } else { + BoundReference(ordinal, a.dataType, input(ordinal).nullable) } }.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible. } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index 3d80004be677b..0c39c83821d6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -40,12 +40,12 @@ case class UnresolvedNamedLambdaVariable(nameParts: Seq[String]) override def name: String = nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") - override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def exprId: ExprId = throw new UnresolvedException("exprId") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") + override def toAttribute: Attribute = throw new UnresolvedException("toAttribute") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false override def toString: String = s"lambda '$name" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index a4a011767d188..b889a19147d32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -272,8 +272,8 @@ case class UnresolvedWindowExpression( child: Expression, windowSpec: WindowSpecReference) extends UnaryExpression with Unevaluable { - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 6dab23c99f79b..52d89c51cec31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -390,7 +390,7 @@ case class MergeIntoTable( sealed abstract class MergeAction extends Expression with Unevaluable { def condition: Option[Expression] override def nullable: Boolean = false - override def dataType: DataType = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("nullable") override def children: Seq[Expression] = condition.toSeq } @@ -410,7 +410,7 @@ case class InsertAction( case class Assignment(key: Expression, value: Expression) extends Expression with Unevaluable { override def nullable: Boolean = false - override def dataType: DataType = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("nullable") override def children: Seq[Expression] = key :: value :: Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index d5b0884f6ff13..118ed85624d89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.rules import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.QueryPlanningTracker -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.catalyst.util.sideBySide @@ -169,7 +168,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { |Once strategy's idempotence is broken for batch ${batch.name} |${sideBySide(plan.treeString, reOptimized.treeString).mkString("\n")} """.stripMargin - throw new TreeNodeException(reOptimized, message, null) + throw new RuntimeException(message) } } @@ -199,7 +198,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (!isPlanIntegral(plan)) { val message = "The structural integrity of the input plan is broken in " + s"${this.getClass.getName.stripSuffix("$")}." - throw new TreeNodeException(plan, message, null) + throw new RuntimeException(message) } batches.foreach { batch => @@ -232,7 +231,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (effective && !isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." - throw new TreeNodeException(result, message, null) + throw new RuntimeException(message) } result @@ -249,7 +248,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + s"$endingMsg" if (Utils.isTesting || batch.strategy.errorOnExceed) { - throw new TreeNodeException(curPlan, message, null) + throw new RuntimeException(message) } else { logWarning(message) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 75a1f712671ee..00cd4e9077109 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -30,7 +30,6 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.sql.catalyst.{AliasIdentifier, IdentifierWithDatabase} import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} @@ -465,7 +464,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { */ private def makeCopy( newArgs: Array[AnyRef], - allowEmptyArgs: Boolean): BaseType = attachTree(this, "makeCopy") { + allowEmptyArgs: Boolean): BaseType = { val allCtors = getClass.getConstructors if (newArgs.isEmpty && allCtors.isEmpty) { // This is a singleton object which doesn't have any constructor. Just return `this` as we @@ -504,8 +503,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } } catch { case e: java.lang.IllegalArgumentException => - throw new TreeNodeException( - this, + throw new IllegalStateException( s""" |Failed to copy node. |Is otherCopyArgs specified correctly for $nodeName. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index a4f6a5598007e..0e0142eb76894 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan @@ -818,7 +817,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { $"a" / $"d" as "div4", $"e" / $"e" as "div5") - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[RuntimeException] { testAnalyzer.execute(plan) }.getMessage assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + @@ -980,7 +979,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { $"a" / $"d" as "div4", $"e" / $"e" as "div5") - val message1 = intercept[TreeNodeException[LogicalPlan]] { + val message1 = intercept[RuntimeException] { testAnalyzer.execute(plan) }.getMessage assert(message1.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + @@ -990,13 +989,13 @@ class AnalysisSuite extends AnalysisTest with Matchers { try { testAnalyzer.execute(plan) } catch { - case ex: TreeNodeException[_] + case ex: AnalysisException if ex.getMessage.contains(SQLConf.ANALYZER_MAX_ITERATIONS.key) => fail("analyzer.execute should not reach max iterations.") } } - val message2 = intercept[TreeNodeException[LogicalPlan]] { + val message2 = intercept[RuntimeException] { testAnalyzer.execute(plan) }.getMessage assert(message2.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index 42ab43242a16b..f4a52180373c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2Sess import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, LogicalPlan, OneRowRelation, Project} @@ -53,7 +52,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { test("check for invalid plan after execution of rule - unresolved attribute") { val analyzed = Project(Alias(Literal(10), "attr")() :: Nil, OneRowRelation()).analyze assert(analyzed.resolved) - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[RuntimeException] { Optimize.execute(analyzed) }.getMessage val ruleName = OptimizeRuleBreakSI.ruleName @@ -68,7 +67,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { assert(analyzed.resolved) // Should fail verification with the OptimizeRuleBreakSI rule - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[RuntimeException] { Optimize.execute(analyzed) }.getMessage val ruleName = OptimizeRuleBreakSI.ruleName @@ -86,7 +85,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { val invalidPlan = OptimizeRuleBreakSI.apply(analyzed) // Should fail verification right at the beginning - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[RuntimeException] { Optimize.execute(invalidPlan) }.getMessage assert(message.contains("The structural integrity of the input plan is broken")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala index 48c62fe2990e9..6b63f860b7da9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Alias, IntegerLiteral, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} @@ -48,7 +48,7 @@ class OptimizerSuite extends PlanTest { DecrementLiterals) :: Nil } - val message1 = intercept[TreeNodeException[LogicalPlan]] { + val message1 = intercept[RuntimeException] { optimizer.execute(analyzed) }.getMessage assert(message1.startsWith(s"Max iterations ($maxIterationsNotEnough) reached for batch " + @@ -58,13 +58,13 @@ class OptimizerSuite extends PlanTest { try { optimizer.execute(analyzed) } catch { - case ex: TreeNodeException[_] + case ex: AnalysisException if ex.getMessage.contains(SQLConf.OPTIMIZER_MAX_ITERATIONS.key) => fail("optimizer.execute should not reach max iterations.") } } - val message2 = intercept[TreeNodeException[LogicalPlan]] { + val message2 = intercept[RuntimeException] { optimizer.execute(analyzed) }.getMessage assert(message2.startsWith(s"Max iterations ($maxIterationsNotEnough) reached for batch " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index 60dd9a1fe8a12..25352e2d242a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -18,9 +18,7 @@ package org.apache.spark.sql.catalyst.trees import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} class RuleExecutorSuite extends SparkFunSuite { @@ -67,7 +65,7 @@ class RuleExecutorSuite extends SparkFunSuite { val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil } - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[RuntimeException] { ToFixedPoint.execute(Literal(100)) }.getMessage assert(message.contains("Max iterations (10) reached for batch fixedPoint")) @@ -84,7 +82,7 @@ class RuleExecutorSuite extends SparkFunSuite { assert(WithSIChecker.execute(Literal(10)) === Literal(9)) - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[RuntimeException] { // The input is already invalid as determined by WithSIChecker.isPlanIntegral WithSIChecker.execute(Literal(10.1)) }.getMessage @@ -102,7 +100,7 @@ class RuleExecutorSuite extends SparkFunSuite { assert(WithSICheckerForPositiveLiteral.execute(Literal(2)) === Literal(1)) - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[RuntimeException] { WithSICheckerForPositiveLiteral.execute(Literal(1)) }.getMessage assert(message.contains("the structural integrity of the plan is broken")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index 0615324b84305..6f5bf15d82638 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ @@ -54,7 +53,7 @@ case class ExpandExec( private[this] val projection = (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitions { iter => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index f9c696c3342c5..9deeca09ef9ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -26,7 +26,6 @@ import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -159,9 +158,7 @@ case class ShuffleQueryStageExec( throw new IllegalStateException("wrong plan for shuffle stage:\n " + plan.treeString) } - override def doMaterialize(): Future[Any] = attachTree(this, "execute") { - shuffle.mapOutputStatisticsFuture - } + override def doMaterialize(): Future[Any] = shuffle.mapOutputStatisticsFuture override def newReuseInstance(newStageId: Int, newOutput: Seq[Attribute]): QueryStageExec = { val reuse = ShuffleQueryStageExec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index cdad9de00620b..7d45638146a71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -25,7 +25,6 @@ import org.apache.spark.TaskContext import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -81,7 +80,7 @@ case class HashAggregateExec( } } - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val peakMemory = longMetric("peakMemory") val spillSize = longMetric("spillSize") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 9bb196a3311ef..e5f59e0d4e9bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.util.truncatedString @@ -80,7 +79,7 @@ case class ObjectHashAggregateExec( "number of tasks fall-backed to sort-based aggregation") ) - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val aggTime = longMetric("aggTime") val spillSize = longMetric("spillSize") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index aa1559e2ea3b6..2400ceef544d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.util.truncatedString @@ -51,7 +50,7 @@ case class SortAggregateExec( groupingExpressions.map(SortOrder(_, Ascending)) } - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (partIndex, iter) => // Because the constructor of an aggregation iterator will read at least the first row, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 70f20cd8b7c06..57df4e4614fd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution.command import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} @@ -157,7 +157,7 @@ case class ExplainCommand( override def run(sparkSession: SparkSession): Seq[Row] = try { val outputString = sparkSession.sessionState.executePlan(logicalPlan).explainString(mode) Seq(Row(outputString)) - } catch { case cause: TreeNodeException[_] => + } catch { case NonFatal(cause) => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } @@ -179,7 +179,7 @@ case class StreamingExplainCommand( queryExecution.simpleString } Seq(Row(outputString)) - } catch { case cause: TreeNodeException[_] => + } catch { case NonFatal(cause) => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index affa92de693af..2a7b12f7f515a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -29,7 +29,6 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics @@ -160,7 +159,7 @@ case class ShuffleExchangeExec( */ private var cachedShuffleRDD: ShuffledRowRDD = null - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { // Returns the same ShuffleRowRDD if this plan is used by multiple plans. if (cachedShuffleRDD == null) { cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1449d937982e8..e52f2a17b659d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark @@ -59,7 +58,7 @@ case class StatefulOperatorStateInfo( trait StatefulOperator extends SparkPlan { def stateInfo: Option[StatefulOperatorStateInfo] - protected def getStateInfo: StatefulOperatorStateInfo = attachTree(this) { + protected def getStateInfo: StatefulOperatorStateInfo = { stateInfo.getOrElse { throw new IllegalStateException("State location not present for execution") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala index 25b4464823e5f..a31e2382940e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions.{Attribute, RowOrdering, SortOrder} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.util.CompletionIterator @@ -39,7 +38,7 @@ case class ReferenceSort( override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { + protected override def doExecute(): RDD[InternalRow] = { child.execute().mapPartitions( { iterator => val ordering = RowOrdering.create(sortOrder, child.output) val sorter = new ExternalSorter[InternalRow, Null, InternalRow]( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 71eaed269e6c2..c073df2017a53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -445,7 +445,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession "GROUP BY k").foreach { query => val e = intercept[Exception] { sql(query).collect - }.getCause + } assert(e.isInstanceOf[IllegalStateException]) assert(e.getMessage.contains(expectedErrMsg)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 6138323dfb0f6..c6e8b7ec8c3cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -760,7 +760,7 @@ class AdaptiveQueryExecSuite val error = intercept[Exception] { aggregated.count() } - assert(error.getCause().toString contains "Invalid bucket file") + assert(error.toString contains "Invalid bucket file") assert(error.getSuppressed.size === 0) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index d0f569cf675f3..d90c8732ea287 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -845,7 +845,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti aggregated.count() } - assert(error.getCause().toString contains "Invalid bucket file") + assert(error.toString contains "Invalid bucket file") } }