From 5e5609799d580b2972ae3f971ae781696546f3e5 Mon Sep 17 00:00:00 2001 From: beliefer Date: Tue, 26 Jan 2021 11:35:48 +0800 Subject: [PATCH 01/12] Remove TreeNodeException that not work --- .../sql/catalyst/analysis/unresolved.scala | 4 +-- .../spark/sql/catalyst/errors/package.scala | 34 ------------------- .../catalyst/expressions/BoundAttribute.scala | 17 ++++------ .../sql/catalyst/rules/RuleExecutor.scala | 10 +++--- .../spark/sql/catalyst/trees/TreeNode.scala | 7 ++-- .../sql/catalyst/analysis/AnalysisSuite.scala | 9 +++-- ...mizerStructuralIntegrityCheckerSuite.scala | 8 ++--- .../catalyst/optimizer/OptimizerSuite.scala | 8 ++--- .../catalyst/trees/RuleExecutorSuite.scala | 9 +++-- .../spark/sql/execution/ExpandExec.scala | 3 +- .../execution/adaptive/QueryStageExec.scala | 5 +-- .../aggregate/HashAggregateExec.scala | 3 +- .../aggregate/ObjectHashAggregateExec.scala | 3 +- .../aggregate/SortAggregateExec.scala | 3 +- .../sql/execution/command/commands.scala | 7 ++-- .../exchange/ShuffleExchangeExec.scala | 3 +- .../streaming/statefulOperators.scala | 3 +- .../spark/sql/execution/ReferenceSort.scala | 3 +- 18 files changed, 44 insertions(+), 95 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 1b83cbfa03056..1c5ebd34d0546 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.parser.ParserUtils @@ -35,7 +35,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * resolved. */ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) - extends TreeNodeException(tree, s"Invalid call to $function on unresolved object", null) + extends AnalysisException(s"Invalid call to $function on unresolved object") /** * Holds the name of a relation that has yet to be looked up in a catalog. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala index 09a6b769da3f4..045128bd1afb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala @@ -17,43 +17,9 @@ package org.apache.spark.sql.catalyst -import scala.util.control.NonFatal - -import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.trees.TreeNode - /** * Functions for attaching and retrieving trees that are associated with errors. */ package object errors { - class TreeNodeException[TreeType <: TreeNode[_]]( - @transient val tree: TreeType, - msg: String, - cause: Throwable) - extends Exception(msg, cause) { - - val treeString = tree.toString - - // Yes, this is the same as a default parameter, but... those don't seem to work with SBT - // external project dependencies for some reason. - def this(tree: TreeType, msg: String) = this(tree, msg, null) - - override def getMessage: String = { - s"${super.getMessage}, tree:${if (treeString contains "\n") "\n" else " "}$tree" - } - } - - /** - * Wraps any exceptions that are thrown while executing `f` in a - * [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`. - */ - def attachTree[TreeType <: TreeNode[_], A](tree: TreeType, msg: String = "")(f: => A): A = { - try f catch { - // SPARK-16748: We do not want SparkExceptions from job failures in the planning phase - // to create TreeNodeException. Hence, wrap exception only if it is not SparkException. - case NonFatal(e) if !e.isInstanceOf[SparkException] => - throw new TreeNodeException(tree, msg, e) - } - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 7ae5924b20faf..3fa273f2e2d4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, JavaCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.types._ @@ -72,17 +71,15 @@ object BindReferences extends Logging { input: AttributeSeq, allowFailures: Boolean = false): A = { expression.transform { case a: AttributeReference => - attachTree(a, "Binding attribute") { - val ordinal = input.indexOf(a.exprId) - if (ordinal == -1) { - if (allowFailures) { - a - } else { - sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}") - } + val ordinal = input.indexOf(a.exprId) + if (ordinal == -1) { + if (allowFailures) { + a } else { - BoundReference(ordinal, a.dataType, input(ordinal).nullable) + sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}") } + } else { + BoundReference(ordinal, a.dataType, input(ordinal).nullable) } }.asInstanceOf[A] // Kind of a hack, but safe. TODO: Tighten return type when possible. } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index d5b0884f6ff13..52699caff7b05 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.catalyst.rules import org.apache.spark.internal.Logging +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.QueryPlanningTracker -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND import org.apache.spark.sql.catalyst.util.sideBySide @@ -169,7 +169,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { |Once strategy's idempotence is broken for batch ${batch.name} |${sideBySide(plan.treeString, reOptimized.treeString).mkString("\n")} """.stripMargin - throw new TreeNodeException(reOptimized, message, null) + throw new AnalysisException(message) } } @@ -199,7 +199,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (!isPlanIntegral(plan)) { val message = "The structural integrity of the input plan is broken in " + s"${this.getClass.getName.stripSuffix("$")}." - throw new TreeNodeException(plan, message, null) + throw new AnalysisException(message) } batches.foreach { batch => @@ -232,7 +232,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (effective && !isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." - throw new TreeNodeException(result, message, null) + throw new AnalysisException(message) } result @@ -249,7 +249,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + s"$endingMsg" if (Utils.isTesting || batch.strategy.errorOnExceed) { - throw new TreeNodeException(curPlan, message, null) + throw new AnalysisException(message) } else { logWarning(message) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 75a1f712671ee..7671365a4f1ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -27,10 +27,10 @@ import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{AliasIdentifier, IdentifierWithDatabase} import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} @@ -465,7 +465,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { */ private def makeCopy( newArgs: Array[AnyRef], - allowEmptyArgs: Boolean): BaseType = attachTree(this, "makeCopy") { + allowEmptyArgs: Boolean): BaseType = { val allCtors = getClass.getConstructors if (newArgs.isEmpty && allCtors.isEmpty) { // This is a singleton object which doesn't have any constructor. Just return `this` as we @@ -504,8 +504,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } } catch { case e: java.lang.IllegalArgumentException => - throw new TreeNodeException( - this, + throw new AnalysisException( s""" |Failed to copy node. |Is otherCopyArgs specified correctly for $nodeName. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index f66871ee75ecc..3bf54fbfec360 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan @@ -799,7 +798,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { $"a" / $"d" as "div4", $"e" / $"e" as "div5") - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[AnalysisException] { testAnalyzer.execute(plan) }.getMessage assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + @@ -961,7 +960,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { $"a" / $"d" as "div4", $"e" / $"e" as "div5") - val message1 = intercept[TreeNodeException[LogicalPlan]] { + val message1 = intercept[AnalysisException] { testAnalyzer.execute(plan) }.getMessage assert(message1.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + @@ -971,13 +970,13 @@ class AnalysisSuite extends AnalysisTest with Matchers { try { testAnalyzer.execute(plan) } catch { - case ex: TreeNodeException[_] + case ex: AnalysisException if ex.getMessage.contains(SQLConf.ANALYZER_MAX_ITERATIONS.key) => fail("analyzer.execute should not reach max iterations.") } } - val message2 = intercept[TreeNodeException[LogicalPlan]] { + val message2 = intercept[AnalysisException] { testAnalyzer.execute(plan) }.getMessage assert(message2.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index 42ab43242a16b..a4ec3e02526b0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, LogicalPlan, OneRowRelation, Project} @@ -53,7 +53,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { test("check for invalid plan after execution of rule - unresolved attribute") { val analyzed = Project(Alias(Literal(10), "attr")() :: Nil, OneRowRelation()).analyze assert(analyzed.resolved) - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[AnalysisException] { Optimize.execute(analyzed) }.getMessage val ruleName = OptimizeRuleBreakSI.ruleName @@ -68,7 +68,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { assert(analyzed.resolved) // Should fail verification with the OptimizeRuleBreakSI rule - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[AnalysisException] { Optimize.execute(analyzed) }.getMessage val ruleName = OptimizeRuleBreakSI.ruleName @@ -86,7 +86,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { val invalidPlan = OptimizeRuleBreakSI.apply(analyzed) // Should fail verification right at the beginning - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[AnalysisException] { Optimize.execute(invalidPlan) }.getMessage assert(message.contains("The structural integrity of the input plan is broken")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala index 48c62fe2990e9..b9757e3e929d9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.catalyst.optimizer +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Alias, IntegerLiteral, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} @@ -48,7 +48,7 @@ class OptimizerSuite extends PlanTest { DecrementLiterals) :: Nil } - val message1 = intercept[TreeNodeException[LogicalPlan]] { + val message1 = intercept[AnalysisException] { optimizer.execute(analyzed) }.getMessage assert(message1.startsWith(s"Max iterations ($maxIterationsNotEnough) reached for batch " + @@ -58,13 +58,13 @@ class OptimizerSuite extends PlanTest { try { optimizer.execute(analyzed) } catch { - case ex: TreeNodeException[_] + case ex: AnalysisException if ex.getMessage.contains(SQLConf.OPTIMIZER_MAX_ITERATIONS.key) => fail("optimizer.execute should not reach max iterations.") } } - val message2 = intercept[TreeNodeException[LogicalPlan]] { + val message2 = intercept[AnalysisException] { optimizer.execute(analyzed) }.getMessage assert(message2.startsWith(s"Max iterations ($maxIterationsNotEnough) reached for batch " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index 60dd9a1fe8a12..111ce5b1f553b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -18,9 +18,8 @@ package org.apache.spark.sql.catalyst.trees import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.errors.TreeNodeException +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} class RuleExecutorSuite extends SparkFunSuite { @@ -67,7 +66,7 @@ class RuleExecutorSuite extends SparkFunSuite { val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil } - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[AnalysisException] { ToFixedPoint.execute(Literal(100)) }.getMessage assert(message.contains("Max iterations (10) reached for batch fixedPoint")) @@ -84,7 +83,7 @@ class RuleExecutorSuite extends SparkFunSuite { assert(WithSIChecker.execute(Literal(10)) === Literal(9)) - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[AnalysisException] { // The input is already invalid as determined by WithSIChecker.isPlanIntegral WithSIChecker.execute(Literal(10.1)) }.getMessage @@ -102,7 +101,7 @@ class RuleExecutorSuite extends SparkFunSuite { assert(WithSICheckerForPositiveLiteral.execute(Literal(2)) === Literal(1)) - val message = intercept[TreeNodeException[LogicalPlan]] { + val message = intercept[AnalysisException] { WithSICheckerForPositiveLiteral.execute(Literal(1)) }.getMessage assert(message.contains("the structural integrity of the plan is broken")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index 0615324b84305..6f5bf15d82638 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ @@ -54,7 +53,7 @@ case class ExpandExec( private[this] val projection = (exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output) - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitions { iter => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala index f9c696c3342c5..9deeca09ef9ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala @@ -26,7 +26,6 @@ import org.apache.spark.{FutureAction, MapOutputStatistics, SparkException} import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning @@ -159,9 +158,7 @@ case class ShuffleQueryStageExec( throw new IllegalStateException("wrong plan for shuffle stage:\n " + plan.treeString) } - override def doMaterialize(): Future[Any] = attachTree(this, "execute") { - shuffle.mapOutputStatisticsFuture - } + override def doMaterialize(): Future[Any] = shuffle.mapOutputStatisticsFuture override def newReuseInstance(newStageId: Int, newOutput: Seq[Attribute]): QueryStageExec = { val reuse = ShuffleQueryStageExec( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index cdad9de00620b..7d45638146a71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -25,7 +25,6 @@ import org.apache.spark.TaskContext import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -81,7 +80,7 @@ case class HashAggregateExec( } } - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val peakMemory = longMetric("peakMemory") val spillSize = longMetric("spillSize") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 02f666383d9f0..9977e3a5772e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -21,7 +21,6 @@ import java.util.concurrent.TimeUnit._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.util.truncatedString @@ -77,7 +76,7 @@ case class ObjectHashAggregateExec( "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build") ) - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val aggTime = longMetric("aggTime") val fallbackCountThreshold = sqlContext.conf.objectAggSortBasedFallbackThreshold diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index aa1559e2ea3b6..2400ceef544d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.util.truncatedString @@ -51,7 +50,7 @@ case class SortAggregateExec( groupingExpressions.map(SortOrder(_, Ascending)) } - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") child.execute().mapPartitionsWithIndexInternal { (partIndex, iter) => // Because the constructor of an aggregation iterator will read at least the first row, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 70f20cd8b7c06..cdc6e43481864 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.execution.command import scala.collection.JavaConverters._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} @@ -157,7 +156,7 @@ case class ExplainCommand( override def run(sparkSession: SparkSession): Seq[Row] = try { val outputString = sparkSession.sessionState.executePlan(logicalPlan).explainString(mode) Seq(Row(outputString)) - } catch { case cause: TreeNodeException[_] => + } catch { case cause: AnalysisException => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } @@ -179,7 +178,7 @@ case class StreamingExplainCommand( queryExecution.simpleString } Seq(Row(outputString)) - } catch { case cause: TreeNodeException[_] => + } catch { case cause: AnalysisException => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index affa92de693af..2a7b12f7f515a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -29,7 +29,6 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics @@ -160,7 +159,7 @@ case class ShuffleExchangeExec( */ private var cachedShuffleRDD: ShuffledRowRDD = null - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + protected override def doExecute(): RDD[InternalRow] = { // Returns the same ShuffleRowRDD if this plan is used by multiple plans. if (cachedShuffleRDD == null) { cachedShuffleRDD = new ShuffledRowRDD(shuffleDependency, readMetrics) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1449d937982e8..e52f2a17b659d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -24,7 +24,6 @@ import scala.collection.JavaConverters._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark @@ -59,7 +58,7 @@ case class StatefulOperatorStateInfo( trait StatefulOperator extends SparkPlan { def stateInfo: Option[StatefulOperatorStateInfo] - protected def getStateInfo: StatefulOperatorStateInfo = attachTree(this) { + protected def getStateInfo: StatefulOperatorStateInfo = { stateInfo.getOrElse { throw new IllegalStateException("State location not present for execution") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala index 25b4464823e5f..a31e2382940e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions.{Attribute, RowOrdering, SortOrder} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.util.CompletionIterator @@ -39,7 +38,7 @@ case class ReferenceSort( override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "sort") { + protected override def doExecute(): RDD[InternalRow] = { child.execute().mapPartitions( { iterator => val ordering = RowOrdering.create(sortOrder, child.output) val sorter = new ExternalSorter[InternalRow, Null, InternalRow]( From a746160c94e73db1362f9677d2c6d407f508cf20 Mon Sep 17 00:00:00 2001 From: beliefer Date: Tue, 26 Jan 2021 19:39:52 +0800 Subject: [PATCH 02/12] Update code --- .../org/apache/spark/sql/execution/WholeStageCodegenSuite.scala | 2 +- .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 +- .../scala/org/apache/spark/sql/sources/BucketedReadSuite.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 71eaed269e6c2..c073df2017a53 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -445,7 +445,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession "GROUP BY k").foreach { query => val e = intercept[Exception] { sql(query).collect - }.getCause + } assert(e.isInstanceOf[IllegalStateException]) assert(e.getMessage.contains(expectedErrMsg)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 6138323dfb0f6..c6e8b7ec8c3cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -760,7 +760,7 @@ class AdaptiveQueryExecSuite val error = intercept[Exception] { aggregated.count() } - assert(error.getCause().toString contains "Invalid bucket file") + assert(error.toString contains "Invalid bucket file") assert(error.getSuppressed.size === 0) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 9dcc0cfda93f1..482cd4b89cfff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -813,7 +813,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils with Adapti aggregated.count() } - assert(error.getCause().toString contains "Invalid bucket file") + assert(error.toString contains "Invalid bucket file") } } From 2b289c321b4be348a5d4932d41755bababef3090 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Sun, 7 Feb 2021 11:52:30 +0800 Subject: [PATCH 03/12] Update code --- .../sql/catalyst/analysis/unresolved.scala | 84 +++++++++---------- .../catalyst/expressions/BoundAttribute.scala | 5 +- .../expressions/higherOrderFunctions.scala | 12 +-- .../expressions/windowExpressions.scala | 4 +- .../catalyst/plans/logical/v2Commands.scala | 4 +- .../sql/errors/QueryExecutionErrors.scala | 7 +- 6 files changed, 61 insertions(+), 55 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 1c5ebd34d0546..ce15222859719 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully * resolved. */ -class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: String) +class UnresolvedException[TreeType <: TreeNode[_]](function: String) extends AnalysisException(s"Invalid call to $function on unresolved object") /** @@ -145,10 +145,10 @@ case class UnresolvedAttribute(nameParts: Seq[String]) extends Attribute with Un def name: String = nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") + override def exprId: ExprId = throw new UnresolvedException("exprId") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") override lazy val resolved = false override def newInstance(): UnresolvedAttribute = this @@ -235,10 +235,10 @@ object UnresolvedAttribute { case class UnresolvedGenerator(name: FunctionIdentifier, children: Seq[Expression]) extends Generator { - override def elementSchema: StructType = throw new UnresolvedException(this, "elementTypes") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def foldable: Boolean = throw new UnresolvedException(this, "foldable") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def elementSchema: StructType = throw new UnresolvedException("elementTypes") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def foldable: Boolean = throw new UnresolvedException("foldable") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false override def prettyName: String = name.unquotedString @@ -264,8 +264,8 @@ case class UnresolvedFunction( override def children: Seq[Expression] = arguments ++ filter.toSeq - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false override def prettyName: String = name.unquotedString @@ -287,13 +287,13 @@ object UnresolvedFunction { */ abstract class Star extends LeafExpression with NamedExpression { - override def name: String = throw new UnresolvedException(this, "name") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") - override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def name: String = throw new UnresolvedException("name") + override def exprId: ExprId = throw new UnresolvedException("exprId") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") + override def toAttribute: Attribute = throw new UnresolvedException("toAttribute") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] @@ -408,19 +408,19 @@ case class UnresolvedRegex(regexPattern: String, table: Option[String], caseSens case class MultiAlias(child: Expression, names: Seq[String]) extends UnaryExpression with NamedExpression with Unevaluable { - override def name: String = throw new UnresolvedException(this, "name") + override def name: String = throw new UnresolvedException("name") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") + override def exprId: ExprId = throw new UnresolvedException("exprId") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") + override def dataType: DataType = throw new UnresolvedException("dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def nullable: Boolean = throw new UnresolvedException("nullable") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") - override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") + override def toAttribute: Attribute = throw new UnresolvedException("toAttribute") - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false @@ -435,7 +435,7 @@ case class MultiAlias(child: Expression, names: Seq[String]) * @param expressions Expressions to expand. */ case class ResolvedStar(expressions: Seq[NamedExpression]) extends Star with Unevaluable { - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override def expand(input: LogicalPlan, resolver: Resolver): Seq[NamedExpression] = expressions override def toString: String = expressions.mkString("ResolvedStar(", ", ", ")") } @@ -454,8 +454,8 @@ case class UnresolvedExtractValue(child: Expression, extraction: Expression) override def left: Expression = child override def right: Expression = extraction - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false override def toString: String = s"$child[$extraction]" @@ -475,13 +475,13 @@ case class UnresolvedAlias( aliasFunc: Option[Expression => String] = None) extends UnaryExpression with NamedExpression with Unevaluable { - override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def name: String = throw new UnresolvedException(this, "name") - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def toAttribute: Attribute = throw new UnresolvedException("toAttribute") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") + override def exprId: ExprId = throw new UnresolvedException("exprId") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def name: String = throw new UnresolvedException("name") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false } @@ -523,14 +523,14 @@ case class UnresolvedDeserializer(deserializer: Expression, inputAttributes: Seq require(inputAttributes.forall(_.resolved), "Input attributes must all be resolved.") override def child: Expression = deserializer - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false } case class GetColumnByOrdinal(ordinal: Int, dataType: DataType) extends LeafExpression with Unevaluable with NonSQLExpression { - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false } @@ -546,8 +546,8 @@ case class GetColumnByOrdinal(ordinal: Int, dataType: DataType) extends LeafExpr */ case class UnresolvedOrdinal(ordinal: Int) extends LeafExpression with Unevaluable with NonSQLExpression { - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false } @@ -567,7 +567,7 @@ case class UnresolvedHaving( * A place holder expression used in random functions, will be replaced after analyze. */ case object UnresolvedSeed extends LeafExpression with Unevaluable { - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") override lazy val resolved = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 3fa273f2e2d4d..a5469f3d182e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, JavaCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode, FalseLiteral, JavaCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ /** @@ -76,7 +77,7 @@ object BindReferences extends Logging { if (allowFailures) { a } else { - sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}") + throw QueryExecutionErrors.cannotFindExpressionInInputAttributesError(a, input) } } else { BoundReference(ordinal, a.dataType, input(ordinal).nullable) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index 3d80004be677b..0c39c83821d6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -40,12 +40,12 @@ case class UnresolvedNamedLambdaVariable(nameParts: Seq[String]) override def name: String = nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".") - override def exprId: ExprId = throw new UnresolvedException(this, "exprId") - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") - override def qualifier: Seq[String] = throw new UnresolvedException(this, "qualifier") - override def toAttribute: Attribute = throw new UnresolvedException(this, "toAttribute") - override def newInstance(): NamedExpression = throw new UnresolvedException(this, "newInstance") + override def exprId: ExprId = throw new UnresolvedException("exprId") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") + override def qualifier: Seq[String] = throw new UnresolvedException("qualifier") + override def toAttribute: Attribute = throw new UnresolvedException("toAttribute") + override def newInstance(): NamedExpression = throw new UnresolvedException("newInstance") override lazy val resolved = false override def toString: String = s"lambda '$name" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index a4a011767d188..b889a19147d32 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -272,8 +272,8 @@ case class UnresolvedWindowExpression( child: Expression, windowSpec: WindowSpecReference) extends UnaryExpression with Unevaluable { - override def dataType: DataType = throw new UnresolvedException(this, "dataType") - override def nullable: Boolean = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("dataType") + override def nullable: Boolean = throw new UnresolvedException("nullable") override lazy val resolved = false } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 65b08980423ff..f2305db8a43e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -388,7 +388,7 @@ case class MergeIntoTable( sealed abstract class MergeAction extends Expression with Unevaluable { def condition: Option[Expression] override def nullable: Boolean = false - override def dataType: DataType = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("nullable") override def children: Seq[Expression] = condition.toSeq } @@ -408,7 +408,7 @@ case class InsertAction( case class Assignment(key: Expression, value: Expression) extends Expression with Unevaluable { override def nullable: Boolean = false - override def dataType: DataType = throw new UnresolvedException(this, "nullable") + override def dataType: DataType = throw new UnresolvedException("nullable") override def children: Seq[Expression] = key :: value :: Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 847ec4a7fd96c..642a12388a3df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -28,7 +28,7 @@ import org.codehaus.janino.InternalCompilerException import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator import org.apache.spark.sql.catalyst.catalog.CatalogDatabase -import org.apache.spark.sql.catalyst.expressions.{Expression, UnevaluableAggregate} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, UnevaluableAggregate} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} import org.apache.spark.sql.types.{DataType, Decimal} import org.apache.spark.unsafe.array.ByteArrayMethods @@ -328,4 +328,9 @@ object QueryExecutionErrors { def compilerError(e: CompileException): Throwable = { new CompileException(failedToCompileMsg(e), e.getLocation) } + + def cannotFindExpressionInInputAttributesError( + a: AttributeReference, input: AttributeSeq): Throwable = { + new IllegalStateException(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}") + } } From 8ac05ab0815a0b6368be14ecb96909a1660a61b9 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Sun, 7 Feb 2021 12:17:47 +0800 Subject: [PATCH 04/12] Fix scala style --- .../apache/spark/sql/catalyst/expressions/BoundAttribute.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index a5469f3d182e9..93eca546142a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{CodeGenerator, CodegenContext, ExprCode, FalseLiteral, JavaCode} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, JavaCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ From 5f4e266ec58637a6b80661f0987aaedb21baba5a Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Sun, 7 Feb 2021 14:47:10 +0800 Subject: [PATCH 05/12] Update code --- .../org/apache/spark/sql/catalyst/analysis/unresolved.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index b245194affbd6..5b8fc51b7a654 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.parser.ParserUtils import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode} -import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.quoteIdentifier import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -34,7 +33,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * Thrown when an invalid attempt is made to access a property of a tree that has yet to be fully * resolved. */ -class UnresolvedException[TreeType <: TreeNode[_]](function: String) +class UnresolvedException(function: String) extends AnalysisException(s"Invalid call to $function on unresolved object") /** From ce6242408edb5a0219a4473734432bb415a7e6dd Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Mon, 8 Feb 2021 17:11:22 +0800 Subject: [PATCH 06/12] Optimize code --- .../apache/spark/sql/catalyst/rules/RuleExecutor.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 52699caff7b05..118ed85624d89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.rules import org.apache.spark.internal.Logging -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND @@ -169,7 +168,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { |Once strategy's idempotence is broken for batch ${batch.name} |${sideBySide(plan.treeString, reOptimized.treeString).mkString("\n")} """.stripMargin - throw new AnalysisException(message) + throw new RuntimeException(message) } } @@ -199,7 +198,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (!isPlanIntegral(plan)) { val message = "The structural integrity of the input plan is broken in " + s"${this.getClass.getName.stripSuffix("$")}." - throw new AnalysisException(message) + throw new RuntimeException(message) } batches.foreach { batch => @@ -232,7 +231,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (effective && !isPlanIntegral(result)) { val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " + "the structural integrity of the plan is broken." - throw new AnalysisException(message) + throw new RuntimeException(message) } result @@ -249,7 +248,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + s"$endingMsg" if (Utils.isTesting || batch.strategy.errorOnExceed) { - throw new AnalysisException(message) + throw new RuntimeException(message) } else { logWarning(message) } From b9e87bc198b3512820940886bc19699bc9ec18bf Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 9 Feb 2021 10:36:14 +0800 Subject: [PATCH 07/12] Update code --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- .../apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 6 +++--- .../org/apache/spark/sql/execution/command/commands.scala | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 7671365a4f1ac..2ca834f57d450 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -504,7 +504,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } } catch { case e: java.lang.IllegalArgumentException => - throw new AnalysisException( + throw new IllegalStateException( s""" |Failed to copy node. |Is otherCopyArgs specified correctly for $nodeName. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 7c371d91583fa..0e0142eb76894 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -817,7 +817,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { $"a" / $"d" as "div4", $"e" / $"e" as "div5") - val message = intercept[AnalysisException] { + val message = intercept[RuntimeException] { testAnalyzer.execute(plan) }.getMessage assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + @@ -979,7 +979,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { $"a" / $"d" as "div4", $"e" / $"e" as "div5") - val message1 = intercept[AnalysisException] { + val message1 = intercept[RuntimeException] { testAnalyzer.execute(plan) }.getMessage assert(message1.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + @@ -995,7 +995,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { } } - val message2 = intercept[AnalysisException] { + val message2 = intercept[RuntimeException] { testAnalyzer.execute(plan) }.getMessage assert(message2.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index cdc6e43481864..594016055fab1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command import scala.collection.JavaConverters._ +import scala.util.control.NonFatal import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -156,7 +157,7 @@ case class ExplainCommand( override def run(sparkSession: SparkSession): Seq[Row] = try { val outputString = sparkSession.sessionState.executePlan(logicalPlan).explainString(mode) Seq(Row(outputString)) - } catch { case cause: AnalysisException => + } catch { case NonFatal(cause) => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } @@ -178,7 +179,7 @@ case class StreamingExplainCommand( queryExecution.simpleString } Seq(Row(outputString)) - } catch { case cause: AnalysisException => + } catch { case NonFatal(cause) => ("Error occurred during query planning: \n" + cause.getMessage).split("\n").map(Row(_)) } } From 13bc978db5aaf944036989ef04dfa0732e75f077 Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 9 Feb 2021 12:10:00 +0800 Subject: [PATCH 08/12] Update code --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 2ca834f57d450..00cd4e9077109 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -27,7 +27,6 @@ import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{AliasIdentifier, IdentifierWithDatabase} import org.apache.spark.sql.catalyst.ScalaReflection._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, FunctionResource} From 59f98d59d17f123dbe6d7024c34c3d0a4f36bf5b Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 9 Feb 2021 13:01:13 +0800 Subject: [PATCH 09/12] Update code --- .../scala/org/apache/spark/sql/execution/command/commands.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 594016055fab1..57df4e4614fd7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{AnalysisException, Row, SparkSession} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.QueryPlan From 131094121c912b6907e23cdafba1fabf5f0d003f Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Tue, 9 Feb 2021 15:09:32 +0800 Subject: [PATCH 10/12] Update test cases --- .../OptimizerStructuralIntegrityCheckerSuite.scala | 7 +++---- .../spark/sql/catalyst/optimizer/OptimizerSuite.scala | 4 ++-- .../spark/sql/catalyst/trees/RuleExecutorSuite.scala | 7 +++---- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index a4ec3e02526b0..f4a52180373c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ @@ -53,7 +52,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { test("check for invalid plan after execution of rule - unresolved attribute") { val analyzed = Project(Alias(Literal(10), "attr")() :: Nil, OneRowRelation()).analyze assert(analyzed.resolved) - val message = intercept[AnalysisException] { + val message = intercept[RuntimeException] { Optimize.execute(analyzed) }.getMessage val ruleName = OptimizeRuleBreakSI.ruleName @@ -68,7 +67,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { assert(analyzed.resolved) // Should fail verification with the OptimizeRuleBreakSI rule - val message = intercept[AnalysisException] { + val message = intercept[RuntimeException] { Optimize.execute(analyzed) }.getMessage val ruleName = OptimizeRuleBreakSI.ruleName @@ -86,7 +85,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { val invalidPlan = OptimizeRuleBreakSI.apply(analyzed) // Should fail verification right at the beginning - val message = intercept[AnalysisException] { + val message = intercept[RuntimeException] { Optimize.execute(invalidPlan) }.getMessage assert(message.contains("The structural integrity of the input plan is broken")) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala index b9757e3e929d9..6b63f860b7da9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerSuite.scala @@ -48,7 +48,7 @@ class OptimizerSuite extends PlanTest { DecrementLiterals) :: Nil } - val message1 = intercept[AnalysisException] { + val message1 = intercept[RuntimeException] { optimizer.execute(analyzed) }.getMessage assert(message1.startsWith(s"Max iterations ($maxIterationsNotEnough) reached for batch " + @@ -64,7 +64,7 @@ class OptimizerSuite extends PlanTest { } } - val message2 = intercept[AnalysisException] { + val message2 = intercept[RuntimeException] { optimizer.execute(analyzed) }.getMessage assert(message2.startsWith(s"Max iterations ($maxIterationsNotEnough) reached for batch " + diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala index 111ce5b1f553b..25352e2d242a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/RuleExecutorSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.catalyst.trees import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, IntegerLiteral, Literal} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} @@ -66,7 +65,7 @@ class RuleExecutorSuite extends SparkFunSuite { val batches = Batch("fixedPoint", FixedPoint(10), DecrementLiterals) :: Nil } - val message = intercept[AnalysisException] { + val message = intercept[RuntimeException] { ToFixedPoint.execute(Literal(100)) }.getMessage assert(message.contains("Max iterations (10) reached for batch fixedPoint")) @@ -83,7 +82,7 @@ class RuleExecutorSuite extends SparkFunSuite { assert(WithSIChecker.execute(Literal(10)) === Literal(9)) - val message = intercept[AnalysisException] { + val message = intercept[RuntimeException] { // The input is already invalid as determined by WithSIChecker.isPlanIntegral WithSIChecker.execute(Literal(10.1)) }.getMessage @@ -101,7 +100,7 @@ class RuleExecutorSuite extends SparkFunSuite { assert(WithSICheckerForPositiveLiteral.execute(Literal(2)) === Literal(1)) - val message = intercept[AnalysisException] { + val message = intercept[RuntimeException] { WithSICheckerForPositiveLiteral.execute(Literal(1)) }.getMessage assert(message.contains("the structural integrity of the plan is broken")) From c6fb5a8a696af79c5ccacd71b8c3a711093b415d Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 10 Feb 2021 11:06:24 +0800 Subject: [PATCH 11/12] Remove errors/package --- .../spark/sql/catalyst/errors/package.scala | 25 ------------------- 1 file changed, 25 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala deleted file mode 100644 index 045128bd1afb0..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst - -/** - * Functions for attaching and retrieving trees that are associated with errors. - */ -package object errors { - -} From ebc738b64b75ccbd0fe1fd406f12b7676816877a Mon Sep 17 00:00:00 2001 From: gengjiaan Date: Wed, 10 Feb 2021 11:31:24 +0800 Subject: [PATCH 12/12] Update code --- .../spark/sql/catalyst/expressions/BoundAttribute.scala | 4 ++-- .../org/apache/spark/sql/errors/QueryExecutionErrors.scala | 7 +------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala index 93eca546142a1..4d303aa95b78a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala @@ -21,7 +21,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode, FalseLiteral, JavaCode} import org.apache.spark.sql.catalyst.expressions.codegen.Block._ -import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ /** @@ -77,7 +76,8 @@ object BindReferences extends Logging { if (allowFailures) { a } else { - throw QueryExecutionErrors.cannotFindExpressionInInputAttributesError(a, input) + throw new IllegalStateException( + s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}") } } else { BoundReference(ordinal, a.dataType, input(ordinal).nullable) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 7b2629b2d2a71..b3b22b551f828 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -28,7 +28,7 @@ import org.codehaus.janino.InternalCompilerException import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator import org.apache.spark.sql.catalyst.catalog.CatalogDatabase -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSeq, Expression, UnevaluableAggregate} +import org.apache.spark.sql.catalyst.expressions.{Expression, UnevaluableAggregate} import org.apache.spark.sql.types.{DataType, Decimal} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.UTF8String @@ -322,9 +322,4 @@ object QueryExecutionErrors { def compilerError(e: CompileException): Throwable = { new CompileException(failedToCompileMsg(e), e.getLocation) } - - def cannotFindExpressionInInputAttributesError( - a: AttributeReference, input: AttributeSeq): Throwable = { - new IllegalStateException(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}") - } }