Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.internal.SQLConf

/**
* Trait for getting the active SQLConf.
*/
trait SQLConfHelper {

/**
* The active config object within the current scope.
* See [[SQLConf.get]] for more information.
*/
def conf: SQLConf = SQLConf.get
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,14 @@ import org.apache.spark.util.Utils
*/
object SimpleAnalyzer extends Analyzer(
new CatalogManager(
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true),
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) {
EmptyFunctionRegistry) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
}),
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
})) {
override def resolver: Resolver = caseSensitiveResolution
}

object FakeV2SessionCatalog extends TableCatalog {
private def fail() = throw new UnsupportedOperationException
Expand Down Expand Up @@ -130,10 +129,8 @@ object AnalysisContext {
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
class Analyzer(
override val catalogManager: CatalogManager,
conf: SQLConf)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
class Analyzer(override val catalogManager: CatalogManager)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog with SQLConfHelper {

private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog

Expand All @@ -144,10 +141,8 @@ class Analyzer(
override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts)

// Only for tests.
def this(catalog: SessionCatalog, conf: SQLConf) = {
this(
new CatalogManager(conf, FakeV2SessionCatalog, catalog),
conf)
def this(catalog: SessionCatalog) = {
this(new CatalogManager(FakeV2SessionCatalog, catalog))
}

def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ListQuery, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType

/**
Expand Down Expand Up @@ -47,10 +47,7 @@ object ResolveTimeZone extends Rule[LogicalPlan] {
* Mix-in trait for constructing valid [[Cast]] expressions.
*/
trait CastSupport {
/**
* Configuration used to create a valid cast expression.
*/
def conf: SQLConf
self: SQLConfHelper =>

/**
* Create a Cast expression with the session local time zone.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf

/**
* This file defines view types and analysis rules related to views.
Expand Down Expand Up @@ -54,8 +53,6 @@ import org.apache.spark.sql.internal.SQLConf
* completely resolved during the batch of Resolution.
*/
object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def conf: SQLConf = SQLConf.get

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// The child has the different output attributes with the View operator. Adds a Project over
// the child of the view.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ class SessionCatalog(
externalCatalogBuilder: () => ExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader) extends Logging {
functionResourceLoader: FunctionResourceLoader,
cacheSize: Int = SQLConf.get.tableRelationCacheSize,
cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec

Expand All @@ -77,18 +78,21 @@ class SessionCatalog(
() => externalCatalog,
() => new GlobalTempViewManager(conf.getConf(GLOBAL_TEMP_DATABASE)),
functionRegistry,
conf,
new Configuration(),
new CatalystSqlParser(),
DummyFunctionResourceLoader)
DummyFunctionResourceLoader,
conf.tableRelationCacheSize,
conf.metadataCacheTTL)
}

// For testing only.
def this(externalCatalog: ExternalCatalog, functionRegistry: FunctionRegistry) = {
this(externalCatalog, functionRegistry, SQLConf.get)
}

// For testing only.
def this(externalCatalog: ExternalCatalog) = {
this(
externalCatalog,
new SimpleFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
this(externalCatalog, new SimpleFunctionRegistry)
}

lazy val externalCatalog = externalCatalogBuilder()
Expand Down Expand Up @@ -136,9 +140,6 @@ class SessionCatalog(
}

private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
val cacheSize = conf.tableRelationCacheSize
val cacheTTL = conf.metadataCacheTTL

var builder = CacheBuilder.newBuilder()
.maximumSize(cacheSize)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -177,8 +177,7 @@ case class CatalogTablePartition(
case class BucketSpec(
numBuckets: Int,
bucketColumnNames: Seq[String],
sortColumnNames: Seq[String]) {
def conf: SQLConf = SQLConf.get
sortColumnNames: Seq[String]) extends SQLConfHelper {

if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,8 @@ object SimpleTestOptimizer extends SimpleTestOptimizer

class SimpleTestOptimizer extends Optimizer(
new CatalogManager(
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true),
FakeV2SessionCatalog,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf())))
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry)))

/**
* Remove redundant aliases from a query plan. A redundant alias is an alias that does not change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf

/**
* Collapse plans consisting empty local relations generated by [[PruneFilters]].
Expand All @@ -47,8 +46,6 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) }

override def conf: SQLConf = SQLConf.get

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p: Union if p.children.exists(isEmptyLocalRelation) =>
val newChildren = p.children.filterNot(isEmptyLocalRelation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ package org.apache.spark.sql.catalyst.optimizer

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf

/**
* Encapsulates star-schema detection logic.
*/
object StarSchemaDetection extends PredicateHelper {

private def conf = SQLConf.get
object StarSchemaDetection extends PredicateHelper with SQLConfHelper {

/**
* Star schema consists of one or more fact tables referencing a number of dimension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -51,11 +51,9 @@ import org.apache.spark.util.random.RandomSampler
* The AstBuilder converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or
* TableIdentifier.
*/
class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logging {
import ParserUtils._

protected def conf: SQLConf = SQLConf.get

protected def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,16 @@ import org.antlr.v4.runtime.tree.TerminalNodeImpl

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}

/**
* Base SQL parsing infrastructure.
*/
abstract class AbstractSqlParser extends ParserInterface with Logging {

protected def conf: SQLConf = SQLConf.get
abstract class AbstractSqlParser extends ParserInterface with SQLConfHelper with Logging {

/** Creates/Resolves DataType for a given SQL string. */
override def parseDataType(sqlText: String): DataType = parse(sqlText) { parser =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag}
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -35,15 +36,10 @@ import org.apache.spark.sql.types.{DataType, StructType}
* The tree traverse APIs like `transform`, `foreach`, `collect`, etc. that are
* inherited from `TreeNode`, do not traverse into query plans inside subqueries.
*/
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
extends TreeNode[PlanType] with SQLConfHelper {
self: PlanType =>

/**
* The active config object within the current scope.
* See [[SQLConf.get]] for more information.
*/
def conf: SQLConf = SQLConf.get

def output: Seq[Attribute]

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.spark.sql.catalyst.rules

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.internal.SQLConf

abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
abstract class Rule[TreeType <: TreeNode[_]] extends SQLConfHelper with Logging {

/** Name for this rule, automatically inferred based on class name. */
val ruleName: String = {
Expand All @@ -30,6 +30,4 @@ abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
}

def apply(plan: TreeType): TreeType

def conf: SQLConf = SQLConf.get
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.connector.catalog
import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -37,9 +38,8 @@ import org.apache.spark.sql.internal.SQLConf
// need to track current database at all.
private[sql]
class CatalogManager(
conf: SQLConf,
defaultSessionCatalog: CatalogPlugin,
val v1SessionCatalog: SessionCatalog) extends Logging {
val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,11 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat, CatalogTable, CatalogTableType, ExternalCatalog, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class AnalysisExternalCatalogSuite extends AnalysisTest with Matchers {
private def getAnalyzer(externCatalog: ExternalCatalog, databasePath: File): Analyzer = {
val conf = new SQLConf()
val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin, conf)
val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin)
catalog.createDatabase(
CatalogDatabase("default", "", databasePath.toURI, Map.empty),
ignoreIfExists = false)
Expand All @@ -44,7 +42,7 @@ class AnalysisExternalCatalogSuite extends AnalysisTest with Matchers {
CatalogStorageFormat.empty,
StructType(Seq(StructField("a", IntegerType, nullable = true)))),
ignoreIfExists = false)
new Analyzer(catalog, conf)
new Analyzer(catalog)
}

test("query builtin functions don't call the external catalog") {
Expand All @@ -66,7 +64,7 @@ class AnalysisExternalCatalogSuite extends AnalysisTest with Matchers {
withTempDir { tempDir =>
val inMemoryCatalog = new InMemoryCatalog
val externCatalog = spy(inMemoryCatalog)
val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin, conf)
val catalog = new SessionCatalog(externCatalog, FunctionRegistry.builtin)
catalog.createDatabase(
CatalogDatabase("default", "", new URI(tempDir.toString), Map.empty),
ignoreIfExists = false)
Expand Down
Loading