Skip to content

Commit 0ee38a3

Browse files
rxincloud-fan
authored andcommitted
[SPARK-19944][SQL] Move SQLConf from sql/core to sql/catalyst
## What changes were proposed in this pull request? This patch moves SQLConf from sql/core to sql/catalyst. To minimize the changes, the patch used type alias to still keep CatalystConf (as a type alias) and SimpleCatalystConf (as a concrete class that extends SQLConf). Motivation for the change is that it is pretty weird to have SQLConf only in sql/core and then we have to duplicate config options that impact optimizer/analyzer in sql/catalyst using CatalystConf. ## How was this patch tested? N/A Author: Reynold Xin <[email protected]> Closes #17285 from rxin/SPARK-19944.
1 parent 4ce970d commit 0ee38a3

File tree

5 files changed

+165
-173
lines changed

5 files changed

+165
-173
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala

Lines changed: 0 additions & 93 deletions
This file was deleted.
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst
19+
20+
import java.util.TimeZone
21+
22+
import org.apache.spark.sql.internal.SQLConf
23+
24+
25+
/**
26+
* A SQLConf that can be used for local testing. This class is only here to minimize the change
27+
* for ticket SPARK-19944 (moves SQLConf from sql/core to sql/catalyst). This class should
28+
* eventually be removed (test cases should just create SQLConf and set values appropriately).
29+
*/
30+
case class SimpleCatalystConf(
31+
override val caseSensitiveAnalysis: Boolean,
32+
override val orderByOrdinal: Boolean = true,
33+
override val groupByOrdinal: Boolean = true,
34+
override val optimizerMaxIterations: Int = 100,
35+
override val optimizerInSetConversionThreshold: Int = 10,
36+
override val maxCaseBranchesForCodegen: Int = 20,
37+
override val tableRelationCacheSize: Int = 1000,
38+
override val runSQLonFile: Boolean = true,
39+
override val crossJoinEnabled: Boolean = false,
40+
override val cboEnabled: Boolean = false,
41+
override val joinReorderEnabled: Boolean = false,
42+
override val joinReorderDPThreshold: Int = 12,
43+
override val warehousePath: String = "/user/hive/warehouse",
44+
override val sessionLocalTimeZone: String = TimeZone.getDefault().getID)
45+
extends SQLConf {
46+
47+
override def clone(): SimpleCatalystConf = this.copy()
48+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.sql.internal.SQLConf
21+
2022
/**
2123
* Catalyst is a library for manipulating relational query plans. All classes in catalyst are
2224
* considered an internal API to Spark SQL and are subject to change between minor releases.
@@ -29,4 +31,9 @@ package object catalyst {
2931
*/
3032
protected[sql] object ScalaReflectionLock
3133

34+
/**
35+
* This class is only here to minimize the change for ticket SPARK-19944
36+
* (moves SQLConf from sql/core to sql/catalyst). This class should eventually be removed.
37+
*/
38+
type CatalystConf = SQLConf
3239
}

sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala renamed to sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 26 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,11 @@ import scala.collection.JavaConverters._
2424
import scala.collection.immutable
2525

2626
import org.apache.hadoop.fs.Path
27-
import org.apache.parquet.hadoop.ParquetOutputCommitter
2827

2928
import org.apache.spark.internal.Logging
3029
import org.apache.spark.internal.config._
3130
import org.apache.spark.network.util.ByteUnit
32-
import org.apache.spark.sql.catalyst.CatalystConf
33-
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
34-
import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
35-
import org.apache.spark.util.Utils
31+
import org.apache.spark.sql.catalyst.analysis.Resolver
3632

3733
////////////////////////////////////////////////////////////////////////////////////////////////////
3834
// This file defines the configuration options for Spark SQL.
@@ -251,7 +247,7 @@ object SQLConf {
251247
"of org.apache.parquet.hadoop.ParquetOutputCommitter.")
252248
.internal()
253249
.stringConf
254-
.createWithDefault(classOf[ParquetOutputCommitter].getName)
250+
.createWithDefault("org.apache.parquet.hadoop.ParquetOutputCommitter")
255251

256252
val PARQUET_VECTORIZED_READER_ENABLED =
257253
buildConf("spark.sql.parquet.enableVectorizedReader")
@@ -417,7 +413,8 @@ object SQLConf {
417413
buildConf("spark.sql.sources.commitProtocolClass")
418414
.internal()
419415
.stringConf
420-
.createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName)
416+
.createWithDefault(
417+
"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
421418

422419
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
423420
buildConf("spark.sql.sources.parallelPartitionDiscovery.threshold")
@@ -578,7 +575,7 @@ object SQLConf {
578575
buildConf("spark.sql.streaming.commitProtocolClass")
579576
.internal()
580577
.stringConf
581-
.createWithDefault(classOf[ManifestFileCommitProtocol].getName)
578+
.createWithDefault("org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol")
582579

583580
val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD =
584581
buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold")
@@ -723,7 +720,7 @@ object SQLConf {
723720
*
724721
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
725722
*/
726-
private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
723+
class SQLConf extends Serializable with Logging {
727724
import SQLConf._
728725

729726
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
@@ -833,6 +830,18 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
833830

834831
def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
835832

833+
/**
834+
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
835+
* identifiers are equal.
836+
*/
837+
def resolver: Resolver = {
838+
if (caseSensitiveAnalysis) {
839+
org.apache.spark.sql.catalyst.analysis.caseSensitiveResolution
840+
} else {
841+
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
842+
}
843+
}
844+
836845
def subexpressionEliminationEnabled: Boolean =
837846
getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
838847

@@ -890,7 +899,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
890899

891900
def dataFramePivotMaxValues: Int = getConf(DATAFRAME_PIVOT_MAX_VALUES)
892901

893-
override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
902+
def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
894903

895904
def enableTwoLevelAggMap: Boolean = getConf(ENABLE_TWOLEVEL_AGG_MAP)
896905

@@ -907,21 +916,21 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
907916
def hiveThriftServerSingleSession: Boolean =
908917
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION)
909918

910-
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
919+
def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
911920

912-
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
921+
def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
913922

914-
override def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
923+
def crossJoinEnabled: Boolean = getConf(SQLConf.CROSS_JOINS_ENABLED)
915924

916-
override def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
925+
def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
917926

918927
def ndvMaxError: Double = getConf(NDV_MAX_ERROR)
919928

920-
override def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
929+
def cboEnabled: Boolean = getConf(SQLConf.CBO_ENABLED)
921930

922-
override def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)
931+
def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)
923932

924-
override def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
933+
def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
925934

926935
/** ********************** SQLConf functionality methods ************ */
927936

@@ -1050,66 +1059,3 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
10501059
result
10511060
}
10521061
}
1053-
1054-
/**
1055-
* Static SQL configuration is a cross-session, immutable Spark configuration. External users can
1056-
* see the static sql configs via `SparkSession.conf`, but can NOT set/unset them.
1057-
*/
1058-
object StaticSQLConf {
1059-
1060-
import SQLConf.buildStaticConf
1061-
1062-
val WAREHOUSE_PATH = buildStaticConf("spark.sql.warehouse.dir")
1063-
.doc("The default location for managed databases and tables.")
1064-
.stringConf
1065-
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)
1066-
1067-
val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
1068-
.internal()
1069-
.stringConf
1070-
.checkValues(Set("hive", "in-memory"))
1071-
.createWithDefault("in-memory")
1072-
1073-
val GLOBAL_TEMP_DATABASE = buildStaticConf("spark.sql.globalTempDatabase")
1074-
.internal()
1075-
.stringConf
1076-
.createWithDefault("global_temp")
1077-
1078-
// This is used to control when we will split a schema's JSON string to multiple pieces
1079-
// in order to fit the JSON string in metastore's table property (by default, the value has
1080-
// a length restriction of 4000 characters, so do not use a value larger than 4000 as the default
1081-
// value of this property). We will split the JSON string of a schema to its length exceeds the
1082-
// threshold. Note that, this conf is only read in HiveExternalCatalog which is cross-session,
1083-
// that's why this conf has to be a static SQL conf.
1084-
val SCHEMA_STRING_LENGTH_THRESHOLD =
1085-
buildStaticConf("spark.sql.sources.schemaStringLengthThreshold")
1086-
.doc("The maximum length allowed in a single cell when " +
1087-
"storing additional schema information in Hive's metastore.")
1088-
.internal()
1089-
.intConf
1090-
.createWithDefault(4000)
1091-
1092-
val FILESOURCE_TABLE_RELATION_CACHE_SIZE =
1093-
buildStaticConf("spark.sql.filesourceTableRelationCacheSize")
1094-
.internal()
1095-
.doc("The maximum size of the cache that maps qualified table names to table relation plans.")
1096-
.intConf
1097-
.checkValue(cacheSize => cacheSize >= 0, "The maximum size of the cache must not be negative")
1098-
.createWithDefault(1000)
1099-
1100-
// When enabling the debug, Spark SQL internal table properties are not filtered out; however,
1101-
// some related DDL commands (e.g., ANALYZE TABLE and CREATE TABLE LIKE) might not work properly.
1102-
val DEBUG_MODE = buildStaticConf("spark.sql.debug")
1103-
.internal()
1104-
.doc("Only used for internal debugging. Not all functions are supported when it is enabled.")
1105-
.booleanConf
1106-
.createWithDefault(false)
1107-
1108-
val HIVE_THRIFT_SERVER_SINGLESESSION =
1109-
buildStaticConf("spark.sql.hive.thriftServer.singleSession")
1110-
.doc("When set to true, Hive Thrift server is running in a single session mode. " +
1111-
"All the JDBC/ODBC connections share the temporary views, function registries, " +
1112-
"SQL configuration and the current database.")
1113-
.booleanConf
1114-
.createWithDefault(false)
1115-
}

0 commit comments

Comments
 (0)