Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import java.util.TimeZone

import org.apache.spark.sql.internal.SQLConf


/**
* A SQLConf that can be used for local testing. This class is only here to minimize the change
* for ticket SPARK-19944 (moves SQLConf from sql/core to sql/catalyst). This class should
* eventually be removed (test cases should just create SQLConf and set values appropriately).
*/
case class SimpleCatalystConf(
Copy link
Contributor

@hvanhovell hvanhovell Mar 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does SQLConf contain these defaults?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just to keep the code change minimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I figured as much.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup the purpose is to minimize the change. Otherwise I will change 50+ files ...

override val caseSensitiveAnalysis: Boolean,
override val orderByOrdinal: Boolean = true,
override val groupByOrdinal: Boolean = true,
override val optimizerMaxIterations: Int = 100,
override val optimizerInSetConversionThreshold: Int = 10,
override val maxCaseBranchesForCodegen: Int = 20,
override val tableRelationCacheSize: Int = 1000,
override val runSQLonFile: Boolean = true,
override val crossJoinEnabled: Boolean = false,
override val cboEnabled: Boolean = false,
override val joinReorderEnabled: Boolean = false,
override val joinReorderDPThreshold: Int = 12,
override val warehousePath: String = "/user/hive/warehouse",
override val sessionLocalTimeZone: String = TimeZone.getDefault().getID)
extends SQLConf {

override def clone(): SimpleCatalystConf = this.copy()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.internal.SQLConf

/**
* Catalyst is a library for manipulating relational query plans. All classes in catalyst are
* considered an internal API to Spark SQL and are subject to change between minor releases.
Expand All @@ -29,4 +31,9 @@ package object catalyst {
*/
protected[sql] object ScalaReflectionLock

/**
* This class is only here to minimize the change for ticket SPARK-19944
* (moves SQLConf from sql/core to sql/catalyst). This class should eventually be removed.
*/
type CatalystConf = SQLConf
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@ import scala.collection.JavaConverters._
import scala.collection.immutable

import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputCommitter

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.analysis.Resolver

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
Expand Down Expand Up @@ -251,7 +247,7 @@ object SQLConf {
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
.internal()
.stringConf
.createWithDefault(classOf[ParquetOutputCommitter].getName)
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit weird to me that we refer to sql/core configs in catalyst. Shouldn't we just move the dynamic configuration and the catalyst only configurations to CatalystConf and make SQLConf extend this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea good idea


val PARQUET_VECTORIZED_READER_ENABLED =
buildConf("spark.sql.parquet.enableVectorizedReader")
Expand Down Expand Up @@ -417,7 +413,8 @@ object SQLConf {
buildConf("spark.sql.sources.commitProtocolClass")
.internal()
.stringConf
.createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName)
.createWithDefault(
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")

val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
Expand Down Expand Up @@ -578,7 +575,7 @@ object SQLConf {
buildConf("spark.sql.streaming.commitProtocolClass")
.internal()
.stringConf
.createWithDefault(classOf[ManifestFileCommitProtocol].getName)
.createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol")

val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD =
buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold")
Expand Down Expand Up @@ -723,7 +720,7 @@ object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
class SQLConf extends Serializable with Logging {
import SQLConf._

/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
Expand Down Expand Up @@ -833,6 +830,18 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
*/
def resolver: Resolver = {
if (caseSensitiveAnalysis) {
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
} else {
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
}
}

def subexpressionEliminationEnabled: Boolean =
getConf(SUBEXPRESSION_ELIMINATION_ENABLED)

Expand Down Expand Up @@ -890,7 +899,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)

override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)

def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)

Expand All @@ -907,21 +916,21 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def hiveThriftServerSingleSession: Boolean =
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION)

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)

override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)

override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)

def ndvMaxError: Double = getConf(NDV_MAX_ERROR)

override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)

override def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)
def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)

override def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)

/** ********************** SQLConf functionality methods ************ */

Expand Down Expand Up @@ -1050,66 +1059,3 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
result
}
}

/**
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
*/
object StaticSQLConf {

import SQLConf.buildStaticConf

val WAREHOUSE_PATH = buildStaticConf("spark.sql.warehouse.dir")
.doc("The default location for managed databases and tables.")
.stringConf
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)

val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")

val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase")
.internal()
.stringConf
.createWithDefault("global_temp")

// This is used to control when we will split a schema's JSON string to multiple pieces
// in order to fit the JSON string in metastore's table property (by default, the value has
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
// value of this property). We will split the JSON string of a schema to its length exceeds the
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
// that's why this conf has to be a static SQL conf.
val SCHEMA_STRING_LENGTH_THRESHOLD =
buildStaticConf("spark.sql.sources.schemaStringLengthThreshold")
.doc("The maximum length allowed in a single cell when " +
"storing additional schema information in Hive's metastore.")
.internal()
.intConf
.createWithDefault(4000)

val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
.internal()
.doc("The maximum size of the cache that maps qualified table names to table relation plans.")
.intConf
.checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative")
.createWithDefault(1000)

// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
val DEBUG_MODE = buildStaticConf("spark.sql.debug")
.internal()
.doc("Only used for internal debugging. Not all functions are supported when it is enabled.")
.booleanConf
.createWithDefault(false)

val HIVE_THRIFT_SERVER_SINGLESESSION =
buildStaticConf("spark.sql.hive.thriftServer.singleSession")
.doc("When set to true, Hive Thrift server is running in a single session mode. " +
"All the JDBC/ODBC connections share the temporary views, function registries, " +
"SQL configuration and the current database.")
.booleanConf
.createWithDefault(false)
}
Loading