From 6f143bf54cbd24db64ceb7c42b75438cbae47105 Mon Sep 17 00:00:00 2001 From: luluorta Date: Fri, 6 Nov 2020 11:34:07 +0800 Subject: [PATCH 1/2] [SPARK-33389][SQL] Make internal classes of SparkSession always using active SQLConf --- .../spark/sql/catalyst/SQLConfHelper.scala | 32 +++++ .../sql/catalyst/analysis/Analyzer.scala | 21 ++-- .../catalyst/analysis/timeZoneAnalysis.scala | 7 +- .../spark/sql/catalyst/analysis/view.scala | 3 - .../sql/catalyst/catalog/SessionCatalog.scala | 23 ++-- .../sql/catalyst/catalog/interface.scala | 5 +- .../sql/catalyst/optimizer/Optimizer.scala | 3 +- .../optimizer/PropagateEmptyRelation.scala | 3 - .../optimizer/StarSchemaDetection.scala | 6 +- .../sql/catalyst/parser/AstBuilder.scala | 6 +- .../sql/catalyst/parser/ParseDriver.scala | 7 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 10 +- .../spark/sql/catalyst/rules/Rule.scala | 6 +- .../connector/catalog/CatalogManager.scala | 4 +- .../AnalysisExternalCatalogSuite.scala | 8 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 38 +++--- .../sql/catalyst/analysis/AnalysisTest.scala | 4 +- .../analysis/DataSourceV2AnalysisSuite.scala | 4 +- .../analysis/DecimalPrecisionSuite.scala | 5 +- .../analysis/LookupFunctionsSuite.scala | 11 +- .../analysis/TableLookupCacheSuite.scala | 6 +- .../catalog/SessionCatalogSuite.scala | 38 +++--- .../BooleanSimplificationSuite.scala | 15 +-- ...EliminateSortsBeforeRepartitionSuite.scala | 4 +- ...mizerStructuralIntegrityCheckerSuite.scala | 4 +- .../RewriteDistinctAggregatesSuite.scala | 8 -- .../spark/sql/catalyst/plans/PlanTest.scala | 6 +- .../catalog/CatalogManagerSuite.scala | 119 +++++++++--------- .../spark/sql/execution/SparkPlanner.scala | 11 +- .../sql/execution/command/CommandCheck.scala | 4 +- .../datasources/DataSourceStrategy.scala | 11 +- .../datasources/v2/V2SessionCatalog.scala | 7 +- .../streaming/IncrementalExecution.scala | 8 +- .../internal/BaseSessionStateBuilder.scala | 11 +- .../sql/internal/VariableSubstitution.scala | 5 +- .../command/PlanResolutionSuite.scala | 2 +- .../v2/V2SessionCatalogSuite.scala | 4 +- .../spark/sql/hive/HiveSessionCatalog.scala | 3 - .../sql/hive/HiveSessionStateBuilder.scala | 7 +- .../apache/spark/sql/hive/TableReader.scala | 4 +- .../hive/execution/HiveTableScanExec.scala | 4 +- 41 files changed, 228 insertions(+), 259 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala new file mode 100644 index 0000000000000..cee35cdb8d840 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SQLConfHelper.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import org.apache.spark.sql.internal.SQLConf + +/** + * Trait for getting the active SQLConf. + */ +trait SQLConfHelper { + + /** + * The active config object within the current scope. + * See [[SQLConf.get]] for more information. + */ + def conf: SQLConf = SQLConf.get +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 690d66bec890d..14b50f481f387 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -58,15 +58,14 @@ import org.apache.spark.util.Utils */ object SimpleAnalyzer extends Analyzer( new CatalogManager( - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true), FakeV2SessionCatalog, new SessionCatalog( new InMemoryCatalog, - EmptyFunctionRegistry, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) { + EmptyFunctionRegistry) { override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {} - }), - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) + })) { + override def resolver: Resolver = caseSensitiveResolution +} object FakeV2SessionCatalog extends TableCatalog { private def fail() = throw new UnsupportedOperationException @@ -130,10 +129,8 @@ object AnalysisContext { * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and * [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]]. */ -class Analyzer( - override val catalogManager: CatalogManager, - conf: SQLConf) - extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog { +class Analyzer(override val catalogManager: CatalogManager) + extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog with SQLConfHelper { private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog @@ -144,10 +141,8 @@ class Analyzer( override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts) // Only for tests. - def this(catalog: SessionCatalog, conf: SQLConf) = { - this( - new CatalogManager(conf, FakeV2SessionCatalog, catalog), - conf) + def this(catalog: SessionCatalog) = { + this(new CatalogManager(FakeV2SessionCatalog, catalog)) } def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala index d8062744a4264..9234b58eb9f6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala @@ -16,10 +16,10 @@ */ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ListQuery, TimeZoneAwareExpression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType /** @@ -47,10 +47,7 @@ object ResolveTimeZone extends Rule[LogicalPlan] { * Mix-in trait for constructing valid [[Cast]] expressions. */ trait CastSupport { - /** - * Configuration used to create a valid cast expression. - */ - def conf: SQLConf + self: SQLConfHelper => /** * Create a Cast expression with the session local time zone. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index 65601640fa044..06de023098a1c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.internal.SQLConf /** * This file defines view types and analysis rules related to views. @@ -54,8 +53,6 @@ import org.apache.spark.sql.internal.SQLConf * completely resolved during the batch of Resolution. */ object EliminateView extends Rule[LogicalPlan] with CastSupport { - override def conf: SQLConf = SQLConf.get - override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { // The child has the different output attributes with the View operator. Adds a Project over // the child of the view. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index c00d51dc3df1f..17ab6664df75c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -61,10 +61,11 @@ class SessionCatalog( externalCatalogBuilder: () => ExternalCatalog, globalTempViewManagerBuilder: () => GlobalTempViewManager, functionRegistry: FunctionRegistry, - conf: SQLConf, hadoopConf: Configuration, parser: ParserInterface, - functionResourceLoader: FunctionResourceLoader) extends Logging { + functionResourceLoader: FunctionResourceLoader, + cacheSize: Int = SQLConf.get.tableRelationCacheSize, + cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging { import SessionCatalog._ import CatalogTypes.TablePartitionSpec @@ -77,18 +78,21 @@ class SessionCatalog( () => externalCatalog, () => new GlobalTempViewManager(conf.getConf(GLOBAL_TEMP_DATABASE)), functionRegistry, - conf, new Configuration(), new CatalystSqlParser(), - DummyFunctionResourceLoader) + DummyFunctionResourceLoader, + conf.tableRelationCacheSize, + conf.metadataCacheTTL) + } + + // For testing only. + def this(externalCatalog: ExternalCatalog, functionRegistry: FunctionRegistry) = { + this(externalCatalog, functionRegistry, SQLConf.get) } // For testing only. def this(externalCatalog: ExternalCatalog) = { - this( - externalCatalog, - new SimpleFunctionRegistry, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) + this(externalCatalog, new SimpleFunctionRegistry) } lazy val externalCatalog = externalCatalogBuilder() @@ -136,9 +140,6 @@ class SessionCatalog( } private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = { - val cacheSize = conf.tableRelationCacheSize - val cacheTTL = conf.metadataCacheTTL - var builder = CacheBuilder.newBuilder() .maximumSize(cacheSize) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 9c93691ca3b41..ee7216e93ebb5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} import org.apache.spark.sql.catalyst.plans.logical._ @@ -177,8 +177,7 @@ case class CatalogTablePartition( case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], - sortColumnNames: Seq[String]) { - def conf: SQLConf = SQLConf.get + sortColumnNames: Seq[String]) extends SQLConfHelper { if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) { throw new AnalysisException( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e492d01650097..86c46e072c887 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -377,9 +377,8 @@ object SimpleTestOptimizer extends SimpleTestOptimizer class SimpleTestOptimizer extends Optimizer( new CatalogManager( - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true), FakeV2SessionCatalog, - new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf()))) + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry))) /** * Remove redundant aliases from a query plan. A redundant alias is an alias that does not change diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala index 2627202c09c45..15d4561b47a23 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.internal.SQLConf /** * Collapse plans consisting empty local relations generated by [[PruneFilters]]. @@ -47,8 +46,6 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] = plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) } - override def conf: SQLConf = SQLConf.get - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case p: Union if p.children.exists(isEmptyLocalRelation) => val newChildren = p.children.filterNot(isEmptyLocalRelation) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala index 2aa762e2595ad..b65fc7f7e2bde 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -19,18 +19,16 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.internal.SQLConf /** * Encapsulates star-schema detection logic. */ -object StarSchemaDetection extends PredicateHelper { - - private def conf = SQLConf.get +object StarSchemaDetection extends PredicateHelper with SQLConfHelper { /** * Star schema consists of one or more fact tables referencing a number of dimension diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a5b8c118d6c54..c3855fe088db6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -28,7 +28,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, FunctionResource, FunctionResourceType} import org.apache.spark.sql.catalyst.expressions._ @@ -51,11 +51,9 @@ import org.apache.spark.util.random.RandomSampler * The AstBuilder converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or * TableIdentifier. */ -class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { +class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logging { import ParserUtils._ - protected def conf: SQLConf = SQLConf.get - protected def typedVisit[T](ctx: ParseTree): T = { ctx.accept(this).asInstanceOf[T] } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index 73a58f79ff132..ac3fbbf6b0512 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -23,19 +23,16 @@ import org.antlr.v4.runtime.tree.TerminalNodeImpl import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.Origin -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} /** * Base SQL parsing infrastructure. */ -abstract class AbstractSqlParser extends ParserInterface with Logging { - - protected def conf: SQLConf = SQLConf.get +abstract class AbstractSqlParser extends ParserInterface with SQLConfHelper with Logging { /** Creates/Resolves DataType for a given SQL string. */ override def parseDataType(sqlText: String): DataType = parse(sqlText) { parser => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index b1884eac27f73..864ca4f57483d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans import scala.collection.mutable import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag} import org.apache.spark.sql.internal.SQLConf @@ -35,15 +36,10 @@ import org.apache.spark.sql.types.{DataType, StructType} * The tree traverse APIs like `transform`, `foreach`, `collect`, etc. that are * inherited from `TreeNode`, do not traverse into query plans inside subqueries. */ -abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] { +abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] + extends TreeNode[PlanType] with SQLConfHelper { self: PlanType => - /** - * The active config object within the current scope. - * See [[SQLConf.get]] for more information. - */ - def conf: SQLConf = SQLConf.get - def output: Seq[Attribute] /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala index a774217ecc832..4ef71bbc7c098 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala @@ -18,10 +18,10 @@ package org.apache.spark.sql.catalyst.rules import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.trees.TreeNode -import org.apache.spark.sql.internal.SQLConf -abstract class Rule[TreeType <: TreeNode[_]] extends Logging { +abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging { /** Name for this rule, automatically inferred based on class name. */ val ruleName: String = { @@ -30,6 +30,4 @@ abstract class Rule[TreeType <: TreeNode[_]] extends Logging { } def apply(plan: TreeType): TreeType - - def conf: SQLConf = SQLConf.get } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index fc2ab99a3da8c..0779bf53fe446 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.connector.catalog import scala.collection.mutable import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.internal.SQLConf @@ -37,9 +38,8 @@ import org.apache.spark.sql.internal.SQLConf // need to track current database at all. private[sql] class CatalogManager( - conf: SQLConf, defaultSessionCatalog: CatalogPlugin, - val v1SessionCatalog: SessionCatalog) extends Logging { + val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging { import CatalogManager.SESSION_CATALOG_NAME import CatalogV2Util._ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExternalCatalogSuite.scala index 3dd38091051d8..df99cd851cc3e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisExternalCatalogSuite.scala @@ -27,13 +27,11 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class AnalysisExternalCatalogSuite extends AnalysisTest with Matchers { private def getAnalyzer(externCatalog: ExternalCatalog, databasePath: File): Analyzer = { - val conf = new SQLConf() - val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin, conf) + val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin) catalog.createDatabase( CatalogDatabase("default", "", databasePath.toURI, Map.empty), ignoreIfExists = false) @@ -44,7 +42,7 @@ class AnalysisExternalCatalogSuite extends AnalysisTest with Matchers { CatalogStorageFormat.empty, StructType(Seq(StructField("a", IntegerType, nullable = true)))), ignoreIfExists = false) - new Analyzer(catalog, conf) + new Analyzer(catalog) } test("query builtin functions don't call the external catalog") { @@ -66,7 +64,7 @@ class AnalysisExternalCatalogSuite extends AnalysisTest with Matchers { withTempDir { tempDir => val inMemoryCatalog = new InMemoryCatalog val externCatalog = spy(inMemoryCatalog) - val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin, conf) + val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin) catalog.createDatabase( CatalogDatabase("default", "", new URI(tempDir.toString), Map.empty), ignoreIfExists = false) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 37dcee1e59ee8..f0a24d4a56048 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import java.util.{Locale, TimeZone} +import java.util.TimeZone import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -771,22 +771,23 @@ class AnalysisSuite extends AnalysisTest with Matchers { // RuleExecutor only throw exception or log warning when the rule is supposed to run // more than once. val maxIterations = 2 - val conf = new SQLConf().copy(SQLConf.ANALYZER_MAX_ITERATIONS -> maxIterations) - val testAnalyzer = new Analyzer( - new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf) + withSQLConf(SQLConf.ANALYZER_MAX_ITERATIONS.key -> maxIterations.toString) { + val testAnalyzer = new Analyzer( + new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin)) - val plan = testRelation2.select( - $"a" / Literal(2) as "div1", - $"a" / $"b" as "div2", - $"a" / $"c" as "div3", - $"a" / $"d" as "div4", - $"e" / $"e" as "div5") + val plan = testRelation2.select( + $"a" / Literal(2) as "div1", + $"a" / $"b" as "div2", + $"a" / $"c" as "div3", + $"a" / $"d" as "div4", + $"e" / $"e" as "div5") - val message = intercept[TreeNodeException[LogicalPlan]] { - testAnalyzer.execute(plan) - }.getMessage - assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + - s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value.")) + val message = intercept[TreeNodeException[LogicalPlan]] { + testAnalyzer.execute(plan) + }.getMessage + assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + + s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value.")) + } } test("SPARK-30886 Deprecate two-parameter TRIM/LTRIM/RTRIM") { @@ -802,7 +803,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { withLogAppender(logAppender) { val testAnalyzer1 = new Analyzer( - new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf) + new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin)) val plan1 = testRelation2.select( UnresolvedFunction(f, $"a" :: Nil, isDistinct = false)) @@ -824,7 +825,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { // New analyzer from new SessionState val testAnalyzer2 = new Analyzer( - new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf) + new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin)) val plan4 = testRelation2.select( UnresolvedFunction(f, $"c" :: $"d" :: Nil, isDistinct = false)) testAnalyzer2.execute(plan4) @@ -933,9 +934,8 @@ class AnalysisSuite extends AnalysisTest with Matchers { val maxIterations = 2 val maxIterationsEnough = 5 withSQLConf(SQLConf.ANALYZER_MAX_ITERATIONS.key -> maxIterations.toString) { - val conf = SQLConf.get val testAnalyzer = new Analyzer( - new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf) + new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin)) val plan = testRelation2.select( $"a" / Literal(2) as "div1", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 8c14ffffa17a5..37db4be502a83 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -34,7 +34,7 @@ trait AnalysisTest extends PlanTest { protected def extendedAnalysisRules: Seq[Rule[LogicalPlan]] = Nil protected def getAnalyzer: Analyzer = { - val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf) + val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin) catalog.createDatabase( CatalogDatabase("default", "", new URI("loc"), Map.empty), ignoreIfExists = false) @@ -43,7 +43,7 @@ trait AnalysisTest extends PlanTest { catalog.createTempView("TaBlE3", TestRelations.testRelation3, overrideIfExists = true) catalog.createGlobalTempView("TaBlE4", TestRelations.testRelation4, overrideIfExists = true) catalog.createGlobalTempView("TaBlE5", TestRelations.testRelation5, overrideIfExists = true) - new Analyzer(catalog, conf) { + new Analyzer(catalog) { override val extendedResolutionRules = EliminateSubqueryAliases +: extendedAnalysisRules } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 349237c2aa893..67bafbd4a8122 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -223,11 +223,11 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS abstract class DataSourceV2AnalysisBaseSuite extends AnalysisTest { override def getAnalyzer: Analyzer = { - val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf) + val catalog = new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin) catalog.createDatabase( CatalogDatabase("default", "", new URI("loc"), Map.empty), ignoreIfExists = false) - new Analyzer(catalog, conf) { + new Analyzer(catalog) { override val extendedResolutionRules = EliminateSubqueryAliases :: Nil } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index d5991ff10ce6c..9892e62a9ce19 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -24,15 +24,14 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project, Union} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends AnalysisTest with BeforeAndAfter { - private val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) - private val analyzer = new Analyzer(catalog, conf) + private val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry) + private val analyzer = new Analyzer(catalog) private val relation = LocalRelation( AttributeReference("i", IntegerType)(), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala index cea0f2a9cbc97..e0f3c9a835b6e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala @@ -24,19 +24,17 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.internal.SQLConf class LookupFunctionsSuite extends PlanTest { test("SPARK-23486: the functionExists for the Persistent function check") { val externalCatalog = new CustomInMemoryCatalog - val conf = new SQLConf() - val catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin, conf) + val catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin) val analyzer = { catalog.createDatabase( CatalogDatabase("default", "", new URI("loc"), Map.empty), ignoreIfExists = false) - new Analyzer(catalog, conf) + new Analyzer(catalog) } def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref)) @@ -56,14 +54,13 @@ class LookupFunctionsSuite extends PlanTest { test("SPARK-23486: the functionExists for the Registered function check") { val externalCatalog = new InMemoryCatalog - val conf = new SQLConf() val customerFunctionReg = new CustomerFunctionRegistry - val catalog = new SessionCatalog(externalCatalog, customerFunctionReg, conf) + val catalog = new SessionCatalog(externalCatalog, customerFunctionReg) val analyzer = { catalog.createDatabase( CatalogDatabase("default", "", new URI("loc"), Map.empty), ignoreIfExists = false) - new Analyzer(catalog, conf) + new Analyzer(catalog) } def table(ref: String): LogicalPlan = UnresolvedRelation(TableIdentifier(ref)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala index 06ea531833a43..3e9a8b71a8fb6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TableLookupCacheSuite.scala @@ -29,13 +29,11 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.connector.InMemoryTableCatalog import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogNotFoundException, Identifier, Table, V1Table} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class TableLookupCacheSuite extends AnalysisTest with Matchers { private def getAnalyzer(externalCatalog: ExternalCatalog, databasePath: File): Analyzer = { - val conf = new SQLConf() - val v1Catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin, conf) + val v1Catalog = new SessionCatalog(externalCatalog, FunctionRegistry.builtin) v1Catalog.createDatabase( CatalogDatabase("default", "", databasePath.toURI, Map.empty), ignoreIfExists = false) @@ -64,7 +62,7 @@ class TableLookupCacheSuite extends AnalysisTest with Matchers { when(catalogManager.currentCatalog).thenReturn(v2Catalog) when(catalogManager.currentNamespace).thenReturn(Array("default")) - new Analyzer(catalogManager, conf) + new Analyzer(catalogManager) } test("table lookups to external catalog are cached") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index ad40cc010361c..f30ae70dceffa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -1618,26 +1618,28 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually { import org.apache.spark.sql.catalyst.dsl.plans._ Seq(true, false) foreach { caseSensitive => - val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive) - val catalog = new SessionCatalog(newBasicCatalog(), new SimpleFunctionRegistry, conf) - catalog.setCurrentDatabase("db1") - try { - val analyzer = new Analyzer(catalog, conf) - - // The analyzer should report the undefined function rather than the undefined table first. - val cause = intercept[AnalysisException] { - analyzer.execute( - UnresolvedRelation(TableIdentifier("undefined_table")).select( - UnresolvedFunction("undefined_fn", Nil, isDistinct = false) + withSQLConf(SQLConf.CASE_SENSITIVE.key -> caseSensitive.toString) { + val catalog = new SessionCatalog(newBasicCatalog(), new SimpleFunctionRegistry) + catalog.setCurrentDatabase("db1") + try { + val analyzer = new Analyzer(catalog) + + // The analyzer should report the undefined function + // rather than the undefined table first. + val cause = intercept[AnalysisException] { + analyzer.execute( + UnresolvedRelation(TableIdentifier("undefined_table")).select( + UnresolvedFunction("undefined_fn", Nil, isDistinct = false) + ) ) - ) - } + } - assert(cause.getMessage.contains("Undefined function: 'undefined_fn'")) - // SPARK-21318: the error message should contains the current database name - assert(cause.getMessage.contains("db1")) - } finally { - catalog.reset() + assert(cause.getMessage.contains("Undefined function: 'undefined_fn'")) + // SPARK-21318: the error message should contains the current database name + assert(cause.getMessage.contains("db1")) + } finally { + catalog.reset() + } } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 03d75340e31e9..04dcf50e0c3c5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.BooleanType class BooleanSimplificationSuite extends PlanTest with ExpressionEvalHelper with PredicateHelper { @@ -188,25 +187,23 @@ class BooleanSimplificationSuite extends PlanTest with ExpressionEvalHelper with checkCondition(!(('e || 'f) && ('g || 'h)), (!'e && !'f) || (!'g && !'h)) } - private val caseInsensitiveConf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> false) - private val caseInsensitiveAnalyzer = new Analyzer( - new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, caseInsensitiveConf), - caseInsensitiveConf) + private val analyzer = new Analyzer( + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry)) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { - val plan = caseInsensitiveAnalyzer.execute( + val plan = analyzer.execute( testRelation.where(('a > 2 && 'b > 3) || ('A > 2 && 'b < 5))) val actual = Optimize.execute(plan) - val expected = caseInsensitiveAnalyzer.execute( + val expected = analyzer.execute( testRelation.where('a > 2 && ('b > 3 || 'b < 5))) comparePlans(actual, expected) } test("(a || b) && (a || c) => a || (b && c) when case insensitive") { - val plan = caseInsensitiveAnalyzer.execute( + val plan = analyzer.execute( testRelation.where(('a > 2 || 'b > 3) && ('A > 2 || 'b < 5))) val actual = Optimize.execute(plan) - val expected = caseInsensitiveAnalyzer.execute( + val expected = analyzer.execute( testRelation.where('a > 2 || ('b > 3 && 'b < 5))) comparePlans(actual, expected) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala index 9f031358611b1..82db174ad41b0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsBeforeRepartitionSuite.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor class EliminateSortsBeforeRepartitionSuite extends PlanTest { - val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) - val analyzer = new Analyzer(catalog, conf) + val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry) + val analyzer = new Analyzer(catalog) val testRelation = LocalRelation('a.int, 'b.int, 'c.int) val anotherTestRelation = LocalRelation('d.int, 'e.int) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala index 5998437f11f4d..42ab43242a16b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LocalRelation, LogicalPlan, OneRowRelation, Project} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.connector.catalog.CatalogManager -import org.apache.spark.sql.internal.SQLConf class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { @@ -45,9 +44,8 @@ class OptimizerStructuralIntegrityCheckerSuite extends PlanTest { object Optimize extends Optimizer( new CatalogManager( - new SQLConf(), FakeV2SessionCatalog, - new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf()))) { + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry))) { val newBatch = Batch("OptimizeRuleBreakSI", Once, OptimizeRuleBreakSI) override def defaultBatches: Seq[Batch] = Seq(newBatch) ++ super.defaultBatches } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala index 8cb939e010c68..5d6abf516f288 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregatesSuite.scala @@ -16,23 +16,15 @@ */ package org.apache.spark.sql.catalyst.optimizer -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.catalyst.expressions.aggregate.CollectSet import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Expand, LocalRelation, LogicalPlan} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, GROUP_BY_ORDINAL} import org.apache.spark.sql.types.{IntegerType, StringType} class RewriteDistinctAggregatesSuite extends PlanTest { - override val conf = new SQLConf().copy(CASE_SENSITIVE -> false, GROUP_BY_ORDINAL -> false) - val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf) - val analyzer = new Analyzer(catalog, conf) - val nullInt = Literal(null, IntegerType) val nullString = Literal(null, StringType) val testRelation = LocalRelation('a.string, 'b.string, 'c.string, 'd.string, 'e.int) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 6ad132cdfe449..7c70ab98e4183 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -22,6 +22,7 @@ import org.scalatest.Suite import org.scalatest.Tag import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode @@ -56,10 +57,7 @@ trait CodegenInterpretedPlanTest extends PlanTest { * Provides helper methods for comparing plans, but without the overhead of * mandating a FunSuite. */ -trait PlanTestBase extends PredicateHelper with SQLHelper { self: Suite => - - // TODO(gatorsmile): remove this from PlanTest and all the analyzer rules - protected def conf = SQLConf.get +trait PlanTestBase extends PredicateHelper with SQLHelper with SQLConfHelper { self: Suite => /** * Since attribute references are given globally unique ids during analysis, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala index 7dd0753fcf777..aec361b9799cc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala @@ -24,76 +24,77 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, FakeV2SessionCatalog, NoSuchNamespaceException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.plans.SQLHelper import org.apache.spark.sql.connector.InMemoryTableCatalog import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.CaseInsensitiveStringMap -class CatalogManagerSuite extends SparkFunSuite { +class CatalogManagerSuite extends SparkFunSuite with SQLHelper { - private def createSessionCatalog(conf: SQLConf): SessionCatalog = { + private def createSessionCatalog(): SessionCatalog = { val catalog = new InMemoryCatalog() catalog.createDatabase( CatalogDatabase(SessionCatalog.DEFAULT_DATABASE, "", new URI("fake"), Map.empty), ignoreIfExists = true) - new SessionCatalog(catalog, EmptyFunctionRegistry, conf) + new SessionCatalog(catalog, EmptyFunctionRegistry) } test("CatalogManager should reflect the changes of default catalog") { - val conf = new SQLConf - val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog, createSessionCatalog(conf)) + val catalogManager = new CatalogManager(FakeV2SessionCatalog, createSessionCatalog()) assert(catalogManager.currentCatalog.name() == CatalogManager.SESSION_CATALOG_NAME) assert(catalogManager.currentNamespace.sameElements(Array("default"))) - conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName) - conf.setConfString(SQLConf.DEFAULT_CATALOG.key, "dummy") - - // The current catalog should be changed if the default catalog is set. - assert(catalogManager.currentCatalog.name() == "dummy") - assert(catalogManager.currentNamespace.sameElements(Array("a", "b"))) + withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName, + SQLConf.DEFAULT_CATALOG.key -> "dummy") { + // The current catalog should be changed if the default catalog is set. + assert(catalogManager.currentCatalog.name() == "dummy") + assert(catalogManager.currentNamespace.sameElements(Array("a", "b"))) + } } test("CatalogManager should keep the current catalog once set") { - val conf = new SQLConf - val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog, createSessionCatalog(conf)) + val catalogManager = new CatalogManager(FakeV2SessionCatalog, createSessionCatalog()) assert(catalogManager.currentCatalog.name() == CatalogManager.SESSION_CATALOG_NAME) - conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName) - catalogManager.setCurrentCatalog("dummy") - assert(catalogManager.currentCatalog.name() == "dummy") - assert(catalogManager.currentNamespace.sameElements(Array("a", "b"))) - - conf.setConfString("spark.sql.catalog.dummy2", classOf[DummyCatalog].getName) - conf.setConfString(SQLConf.DEFAULT_CATALOG.key, "dummy2") - // The current catalog shouldn't be changed if it's set before. - assert(catalogManager.currentCatalog.name() == "dummy") + withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName) { + catalogManager.setCurrentCatalog("dummy") + assert(catalogManager.currentCatalog.name() == "dummy") + assert(catalogManager.currentNamespace.sameElements(Array("a", "b"))) + + withSQLConf("spark.sql.catalog.dummy2" -> classOf[DummyCatalog].getName, + SQLConf.DEFAULT_CATALOG.key -> "dummy2") { + // The current catalog shouldn't be changed if it's set before. + assert(catalogManager.currentCatalog.name() == "dummy") + } + } } test("current namespace should be updated when switching current catalog") { - val conf = new SQLConf - val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog, createSessionCatalog(conf)) - conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName) - catalogManager.setCurrentCatalog("dummy") - assert(catalogManager.currentNamespace.sameElements(Array("a", "b"))) - catalogManager.setCurrentNamespace(Array("a")) - assert(catalogManager.currentNamespace.sameElements(Array("a"))) - - // If we set current catalog to the same catalog, current namespace should stay the same. - catalogManager.setCurrentCatalog("dummy") - assert(catalogManager.currentNamespace.sameElements(Array("a"))) - - // If we switch to a different catalog, current namespace should be reset. - conf.setConfString("spark.sql.catalog.dummy2", classOf[DummyCatalog].getName) - catalogManager.setCurrentCatalog("dummy2") - assert(catalogManager.currentNamespace.sameElements(Array("a", "b"))) + val catalogManager = new CatalogManager(FakeV2SessionCatalog, createSessionCatalog()) + withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName) { + catalogManager.setCurrentCatalog("dummy") + assert(catalogManager.currentNamespace.sameElements(Array("a", "b"))) + catalogManager.setCurrentNamespace(Array("a")) + assert(catalogManager.currentNamespace.sameElements(Array("a"))) + + // If we set current catalog to the same catalog, current namespace should stay the same. + catalogManager.setCurrentCatalog("dummy") + assert(catalogManager.currentNamespace.sameElements(Array("a"))) + + // If we switch to a different catalog, current namespace should be reset. + withSQLConf("spark.sql.catalog.dummy2" -> classOf[DummyCatalog].getName) { + catalogManager.setCurrentCatalog("dummy2") + assert(catalogManager.currentNamespace.sameElements(Array("a", "b"))) + } + } } test("set current namespace") { - val conf = new SQLConf - val v1SessionCatalog = createSessionCatalog(conf) + val v1SessionCatalog = createSessionCatalog() v1SessionCatalog.createDatabase( CatalogDatabase( "test", "", v1SessionCatalog.getDefaultDBPath("test"), Map.empty), ignoreIfExists = false) - val catalogManager = new CatalogManager(conf, FakeV2SessionCatalog, v1SessionCatalog) + val catalogManager = new CatalogManager(FakeV2SessionCatalog, v1SessionCatalog) // If the current catalog is session catalog, setting current namespace actually sets // `SessionCatalog.currentDb`. @@ -106,23 +107,25 @@ class CatalogManagerSuite extends SparkFunSuite { } // when switching current catalog, `SessionCatalog.currentDb` should be reset. - conf.setConfString("spark.sql.catalog.dummy", classOf[DummyCatalog].getName) - catalogManager.setCurrentCatalog("dummy") - assert(v1SessionCatalog.getCurrentDatabase == "default") - catalogManager.setCurrentNamespace(Array("test2")) - assert(v1SessionCatalog.getCurrentDatabase == "default") - - // Check namespace existence if currentCatalog implements SupportsNamespaces. - conf.setConfString("spark.sql.catalog.testCatalog", classOf[InMemoryTableCatalog].getName) - catalogManager.setCurrentCatalog("testCatalog") - catalogManager.currentCatalog.asInstanceOf[InMemoryTableCatalog] - .createNamespace(Array("test3"), Map.empty[String, String].asJava) - assert(v1SessionCatalog.getCurrentDatabase == "default") - catalogManager.setCurrentNamespace(Array("test3")) - assert(v1SessionCatalog.getCurrentDatabase == "default") - - intercept[NoSuchNamespaceException] { - catalogManager.setCurrentNamespace(Array("ns1", "ns2")) + withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName) { + catalogManager.setCurrentCatalog("dummy") + assert(v1SessionCatalog.getCurrentDatabase == "default") + catalogManager.setCurrentNamespace(Array("test2")) + assert(v1SessionCatalog.getCurrentDatabase == "default") + + // Check namespace existence if currentCatalog implements SupportsNamespaces. + withSQLConf("spark.sql.catalog.testCatalog" -> classOf[InMemoryTableCatalog].getName) { + catalogManager.setCurrentCatalog("testCatalog") + catalogManager.currentCatalog.asInstanceOf[InMemoryTableCatalog] + .createNamespace(Array("test3"), Map.empty[String, String].asJava) + assert(v1SessionCatalog.getCurrentDatabase == "default") + catalogManager.setCurrentNamespace(Array("test3")) + assert(v1SessionCatalog.getCurrentDatabase == "default") + + intercept[NoSuchNamespaceException] { + catalogManager.setCurrentNamespace(Array("ns1", "ns2")) + } + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 895eeedd86b8b..c88fcecc9983b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -18,18 +18,15 @@ package org.apache.spark.sql.execution import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.adaptive.LogicalQueryStageStrategy import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy -import org.apache.spark.sql.internal.SQLConf -class SparkPlanner( - val session: SparkSession, - val conf: SQLConf, - val experimentalMethods: ExperimentalMethods) - extends SparkStrategies { +class SparkPlanner(val session: SparkSession, val experimentalMethods: ExperimentalMethods) + extends SparkStrategies with SQLConfHelper { def numPartitions: Int = conf.numShufflePartitions @@ -40,7 +37,7 @@ class SparkPlanner( PythonEvals :: new DataSourceV2Strategy(session) :: FileSourceStrategy :: - DataSourceStrategy(conf) :: + DataSourceStrategy :: SpecialLimits :: Aggregation :: Window :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandCheck.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandCheck.scala index dedace4af4d14..216636c7ea14f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandCheck.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandCheck.scala @@ -17,14 +17,14 @@ package org.apache.spark.sql.execution.command +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.SchemaUtils /** * Checks legitimization of various execution commands. */ -case class CommandCheck(conf: SQLConf) extends (LogicalPlan => Unit) { +object CommandCheck extends (LogicalPlan => Unit) with SQLConfHelper { override def apply(plan: LogicalPlan): Unit = { plan.foreach { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 822bdbdad8f00..361d1fab03421 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale -import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -27,7 +26,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QualifiedTableName, SQLConfHelper} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ @@ -42,9 +41,7 @@ import org.apache.spark.sql.connector.catalog.SupportsRead import org.apache.spark.sql.connector.catalog.TableCapability._ import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.FileSourceStrategy.{extractPredicatesWithinOutputSet, logInfo} import org.apache.spark.sql.execution.streaming.StreamingRelation -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -314,8 +311,8 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] /** * A Strategy for planning scans over data sources defined using the sources API. */ -case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with CastSupport { - import DataSourceStrategy._ +object DataSourceStrategy + extends Strategy with Logging with CastSupport with PredicateHelper with SQLConfHelper { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { case ScanOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _, _)) => @@ -466,9 +463,7 @@ case class DataSourceStrategy(conf: SQLConf) extends Strategy with Logging with private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = { toCatalystRDD(relation, relation.output, rdd) } -} -object DataSourceStrategy extends PredicateHelper { /** * The attribute name may differ from the one in the schema if the query analyzer * is case insensitive. We should change attribute names to match the ones in the schema, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 6dda1d4aaf37e..9ee145580ce6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -23,22 +23,21 @@ import java.util import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{SQLConfHelper, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A [[TableCatalog]] that translates calls to the v1 SessionCatalog. */ -class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) - extends TableCatalog with SupportsNamespaces { +class V2SessionCatalog(catalog: SessionCatalog) + extends TableCatalog with SupportsNamespaces with SQLConfHelper { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper import V2SessionCatalog._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index bfa60cf7dfd78..b871874f52967 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -21,14 +21,13 @@ import java.util.UUID import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} +import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.QueryPlanningTracker import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, ExpressionWithRandomSeed} import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{LeafExecNode, LocalLimitExec, QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} -import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike} +import org.apache.spark.sql.execution.{LocalLimitExec, QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.util.Utils @@ -51,7 +50,6 @@ class IncrementalExecution( // Modified planner with stateful operations. override val planner: SparkPlanner = new SparkPlanner( sparkSession, - sparkSession.sessionState.conf, sparkSession.sessionState.experimentalMethods) { override def strategies: Seq[Strategy] = extraPlanningStrategies ++ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 33c15ec76654d..538a5408723bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -150,7 +150,6 @@ abstract class BaseSessionStateBuilder( () => session.sharedState.externalCatalog, () => session.sharedState.globalTempViewManager, functionRegistry, - conf, SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), sqlParser, resourceLoader) @@ -158,9 +157,9 @@ abstract class BaseSessionStateBuilder( catalog } - protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf) + protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog) - protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog) + protected lazy val catalogManager = new CatalogManager(v2SessionCatalog, catalog) /** * Interface exposed to the user for registering user-defined functions. @@ -175,7 +174,7 @@ abstract class BaseSessionStateBuilder( * * Note: this depends on the `conf` and `catalog` fields. */ - protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) { + protected def analyzer: Analyzer = new Analyzer(catalogManager) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: @@ -197,7 +196,7 @@ abstract class BaseSessionStateBuilder( PreReadCheck +: HiveOnlyCheck +: TableCapabilityCheck +: - CommandCheck(conf) +: + CommandCheck +: customCheckRules } @@ -270,7 +269,7 @@ abstract class BaseSessionStateBuilder( * Note: this depends on the `conf` and `experimentalMethods` fields. */ protected def planner: SparkPlanner = { - new SparkPlanner(session, conf, experimentalMethods) { + new SparkPlanner(session, experimentalMethods) { override def extraPlanningStrategies: Seq[Strategy] = super.extraPlanningStrategies ++ customPlanningStrategies } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala index 2b9c574aaaf0c..248dfa107bc4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.internal import org.apache.spark.internal.config._ +import org.apache.spark.sql.catalyst.SQLConfHelper /** * A helper class that enables substitution using syntax like @@ -25,9 +26,7 @@ import org.apache.spark.internal.config._ * * Variable substitution is controlled by `SQLConf.variableSubstituteEnabled`. */ -class VariableSubstitution { - - private def conf = SQLConf.get +class VariableSubstitution extends SQLConfHelper { private val provider = new ConfigProvider { override def get(key: String): Option[String] = Option(conf.getConfString(key, "")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index d5820b016736a..f5809ebbb836e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -151,7 +151,7 @@ class PlanResolutionSuite extends AnalysisTest { } else { catalogManagerWithoutDefault } - val analyzer = new Analyzer(catalogManager, conf) + val analyzer = new Analyzer(catalogManager) // TODO: run the analyzer directly. val rules = Seq( CTESubstitution, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index c3bcf86c1ed27..1a4f08418f8d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -29,7 +29,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser -import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, NamespaceChange, TableCatalog, TableChange, V1Table} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -46,7 +46,7 @@ abstract class V2SessionCatalogBaseSuite extends SharedSparkSession with BeforeA val testIdent: Identifier = Identifier.of(testNs, "test_table") def newCatalog(): V2SessionCatalog = { - val newCatalog = new V2SessionCatalog(spark.sessionState.catalog, spark.sessionState.conf) + val newCatalog = new V2SessionCatalog(spark.sessionState.catalog) newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) newCatalog } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 8a248a251820f..f60bad180a710 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Cast, Expression} import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, DoubleType} import org.apache.spark.util.Utils @@ -44,7 +43,6 @@ private[sql] class HiveSessionCatalog( globalTempViewManagerBuilder: () => GlobalTempViewManager, val metastoreCatalog: HiveMetastoreCatalog, functionRegistry: FunctionRegistry, - conf: SQLConf, hadoopConf: Configuration, parser: ParserInterface, functionResourceLoader: FunctionResourceLoader) @@ -52,7 +50,6 @@ private[sql] class HiveSessionCatalog( externalCatalogBuilder, globalTempViewManagerBuilder, functionRegistry, - conf, hadoopConf, parser, functionResourceLoader) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index f79aaa464de81..b30492802495f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -62,7 +62,6 @@ class HiveSessionStateBuilder( () => session.sharedState.globalTempViewManager, new HiveMetastoreCatalog(session), functionRegistry, - conf, SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), sqlParser, resourceLoader) @@ -73,7 +72,7 @@ class HiveSessionStateBuilder( /** * A logical query plan `Analyzer` with rules specific to Hive. */ - override protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) { + override protected def analyzer: Analyzer = new Analyzer(catalogManager) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new ResolveHiveSerdeTable(session) +: new FindDataSourceTable(session) +: @@ -98,7 +97,7 @@ class HiveSessionStateBuilder( PreWriteCheck +: PreReadCheck +: TableCapabilityCheck +: - CommandCheck(conf) +: + CommandCheck +: customCheckRules } @@ -109,7 +108,7 @@ class HiveSessionStateBuilder( * Planner that takes into account Hive-specific strategies. */ override protected def planner: SparkPlanner = { - new SparkPlanner(session, conf, experimentalMethods) with HiveStrategies { + new SparkPlanner(session, experimentalMethods) with HiveStrategies { override val sparkSession: SparkSession = session override def extraPlanningStrategies: Seq[Strategy] = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 3e0d44160c8a1..eb9ce877fc8d2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -39,7 +39,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.analysis.CastSupport import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -68,7 +68,7 @@ class HadoopTableReader( @transient private val tableDesc: TableDesc, @transient private val sparkSession: SparkSession, hadoopConf: Configuration) - extends TableReader with CastSupport with Logging { + extends TableReader with CastSupport with SQLConfHelper with Logging { // Hadoop honors "mapreduce.job.maps" as hint, // but will ignore when mapreduce.jobtracker.address is "local". diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 41820b0135f4a..5db353e4f1fe1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -172,7 +172,7 @@ case class HiveTableScanExec( prunePartitions(hivePartitions) } } else { - if (sparkSession.sessionState.conf.metastorePartitionPruning && + if (conf.metastorePartitionPruning && partitionPruningPred.nonEmpty) { rawPartitions } else { @@ -184,7 +184,7 @@ case class HiveTableScanExec( // exposed for tests @transient lazy val rawPartitions: Seq[HivePartition] = { val prunedPartitions = - if (sparkSession.sessionState.conf.metastorePartitionPruning && + if (conf.metastorePartitionPruning && partitionPruningPred.nonEmpty) { // Retrieve the original attributes based on expression ID so that capitalization matches. val normalizedFilters = partitionPruningPred.map(_.transform { From 99619e3f0d2596f2777df6859f2f61a3c90d1d3c Mon Sep 17 00:00:00 2001 From: luluorta Date: Mon, 16 Nov 2020 16:02:33 +0800 Subject: [PATCH 2/2] revert unnecessary changes --- .../apache/spark/sql/hive/execution/HiveTableScanExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 5db353e4f1fe1..41820b0135f4a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -172,7 +172,7 @@ case class HiveTableScanExec( prunePartitions(hivePartitions) } } else { - if (conf.metastorePartitionPruning && + if (sparkSession.sessionState.conf.metastorePartitionPruning && partitionPruningPred.nonEmpty) { rawPartitions } else { @@ -184,7 +184,7 @@ case class HiveTableScanExec( // exposed for tests @transient lazy val rawPartitions: Seq[HivePartition] = { val prunedPartitions = - if (conf.metastorePartitionPruning && + if (sparkSession.sessionState.conf.metastorePartitionPruning && partitionPruningPred.nonEmpty) { // Retrieve the original attributes based on expression ID so that capitalization matches. val normalizedFilters = partitionPruningPred.map(_.transform {