Skip to content

Commit 78a430e

Browse files
zsxwingrxin
authored andcommitted
[SPARK-7961][SQL]Refactor SQLConf to display better error message
1. Add `SQLConfEntry` to store the information about a configuration. For those configurations that cannot be found in `sql-programming-guide.md`, I left the doc as `<TODO>`. 2. Verify the value when setting a configuration if this is in SQLConf. 3. Use `SET -v` to display all public configurations. Author: zsxwing <[email protected]> Closes apache#6747 from zsxwing/sqlconf and squashes the following commits: 7d09bad [zsxwing] Use SQLConfEntry in HiveContext 49f6213 [zsxwing] Add getConf, setConf to SQLContext and HiveContext e014f53 [zsxwing] Merge branch 'master' into sqlconf 93dad8e [zsxwing] Fix the unit tests cf950c1 [zsxwing] Fix the code style and tests 3c5f03e [zsxwing] Add unsetConf(SQLConfEntry) and fix the code style a2f4add [zsxwing] getConf will return the default value if a config is not set 037b1db [zsxwing] Add schema to SetCommand 0520c3c [zsxwing] Merge branch 'master' into sqlconf 7afb0ec [zsxwing] Fix the configurations about HiveThriftServer 7e728e3 [zsxwing] Add doc for SQLConfEntry and fix 'toString' 5e95b10 [zsxwing] Add enumConf c6ba76d [zsxwing] setRawString => setConfString, getRawString => getConfString 4abd807 [zsxwing] Fix the test for 'set -v' 6e47e56 [zsxwing] Fix the compilation error 8973ced [zsxwing] Remove floatConf 1fc3a8b [zsxwing] Remove the 'conf' command and use 'set -v' instead 99c9c16 [zsxwing] Fix tests that use SQLConfEntry as a string 88a03cc [zsxwing] Add new lines between confs and return types ce7c6c8 [zsxwing] Remove seqConf f3c1b33 [zsxwing] Refactor SQLConf to display better error message
1 parent 9db73ec commit 78a430e

File tree

37 files changed

+861
-296
lines changed

37 files changed

+861
-296
lines changed

docs/sql-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,7 +1220,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
12201220
<td>false</td>
12211221
<td>
12221222
Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do
1223-
not differentiate between binary data and strings when writing out the Parquet schema. This
1223+
not differentiate between binary data and strings when writing out the Parquet schema. This
12241224
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
12251225
</td>
12261226
</tr>
@@ -1237,7 +1237,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
12371237
<td><code>spark.sql.parquet.cacheMetadata</code></td>
12381238
<td>true</td>
12391239
<td>
1240-
Turns on caching of Parquet schema metadata. Can speed up querying of static data.
1240+
Turns on caching of Parquet schema metadata. Can speed up querying of static data.
12411241
</td>
12421242
</tr>
12431243
<tr>

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 398 additions & 95 deletions
Large diffs are not rendered by default.

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import org.apache.spark.SparkContext
3131
import org.apache.spark.annotation.{DeveloperApi, Experimental}
3232
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
3333
import org.apache.spark.rdd.RDD
34+
import org.apache.spark.sql.SQLConf.SQLConfEntry
3435
import org.apache.spark.sql.catalyst.analysis._
3536
import org.apache.spark.sql.catalyst.errors.DialectException
3637
import org.apache.spark.sql.catalyst.expressions._
@@ -79,21 +80,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
7980
*/
8081
def setConf(props: Properties): Unit = conf.setConf(props)
8182

83+
/** Set the given Spark SQL configuration property. */
84+
private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = conf.setConf(entry, value)
85+
8286
/**
8387
* Set the given Spark SQL configuration property.
8488
*
8589
* @group config
8690
* @since 1.0.0
8791
*/
88-
def setConf(key: String, value: String): Unit = conf.setConf(key, value)
92+
def setConf(key: String, value: String): Unit = conf.setConfString(key, value)
8993

9094
/**
9195
* Return the value of Spark SQL configuration property for the given key.
9296
*
9397
* @group config
9498
* @since 1.0.0
9599
*/
96-
def getConf(key: String): String = conf.getConf(key)
100+
def getConf(key: String): String = conf.getConfString(key)
101+
102+
/**
103+
* Return the value of Spark SQL configuration property for the given key. If the key is not set
104+
* yet, return `defaultValue` in [[SQLConfEntry]].
105+
*/
106+
private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry)
107+
108+
/**
109+
* Return the value of Spark SQL configuration property for the given key. If the key is not set
110+
* yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
111+
* desired one.
112+
*/
113+
private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
114+
conf.getConf(entry, defaultValue)
115+
}
97116

98117
/**
99118
* Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -102,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
102121
* @group config
103122
* @since 1.0.0
104123
*/
105-
def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
124+
def getConf(key: String, defaultValue: String): String = conf.getConfString(key, defaultValue)
106125

107126
/**
108127
* Return all the configuration properties that have been set (i.e. not the default).

sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
4444

4545
private val pair: Parser[LogicalPlan] =
4646
(key ~ ("=".r ~> value).?).? ^^ {
47-
case None => SetCommand(None, output)
48-
case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output)
47+
case None => SetCommand(None)
48+
case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)))
4949
}
5050

5151
def apply(input: String): LogicalPlan = parseAll(pair, input) match {

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 72 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import java.util.NoSuchElementException
21+
2022
import org.apache.spark.Logging
2123
import org.apache.spark.annotation.DeveloperApi
2224
import org.apache.spark.rdd.RDD
@@ -75,48 +77,92 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
7577
* :: DeveloperApi ::
7678
*/
7779
@DeveloperApi
78-
case class SetCommand(
79-
kv: Option[(String, Option[String])],
80-
override val output: Seq[Attribute])
81-
extends RunnableCommand with Logging {
80+
case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging {
81+
82+
private def keyValueOutput: Seq[Attribute] = {
83+
val schema = StructType(
84+
StructField("key", StringType, false) ::
85+
StructField("value", StringType, false) :: Nil)
86+
schema.toAttributes
87+
}
8288

83-
override def run(sqlContext: SQLContext): Seq[Row] = kv match {
89+
private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match {
8490
// Configures the deprecated "mapred.reduce.tasks" property.
8591
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
86-
logWarning(
87-
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
88-
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
89-
if (value.toInt < 1) {
90-
val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " +
91-
"determining the number of reducers is not supported."
92-
throw new IllegalArgumentException(msg)
93-
} else {
94-
sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
95-
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
92+
val runFunc = (sqlContext: SQLContext) => {
93+
logWarning(
94+
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
95+
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
96+
if (value.toInt < 1) {
97+
val msg =
98+
s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " +
99+
"determining the number of reducers is not supported."
100+
throw new IllegalArgumentException(msg)
101+
} else {
102+
sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
103+
Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
104+
}
96105
}
106+
(keyValueOutput, runFunc)
97107

98108
// Configures a single property.
99109
case Some((key, Some(value))) =>
100-
sqlContext.setConf(key, value)
101-
Seq(Row(s"$key=$value"))
110+
val runFunc = (sqlContext: SQLContext) => {
111+
sqlContext.setConf(key, value)
112+
Seq(Row(key, value))
113+
}
114+
(keyValueOutput, runFunc)
102115

103-
// Queries all key-value pairs that are set in the SQLConf of the sqlContext.
104-
// Notice that different from Hive, here "SET -v" is an alias of "SET".
105116
// (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
106-
case Some(("-v", None)) | None =>
107-
sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
117+
// Queries all key-value pairs that are set in the SQLConf of the sqlContext.
118+
case None =>
119+
val runFunc = (sqlContext: SQLContext) => {
120+
sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
121+
}
122+
(keyValueOutput, runFunc)
123+
124+
// Queries all properties along with their default values and docs that are defined in the
125+
// SQLConf of the sqlContext.
126+
case Some(("-v", None)) =>
127+
val runFunc = (sqlContext: SQLContext) => {
128+
sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
129+
Row(key, defaultValue, doc)
130+
}
131+
}
132+
val schema = StructType(
133+
StructField("key", StringType, false) ::
134+
StructField("default", StringType, false) ::
135+
StructField("meaning", StringType, false) :: Nil)
136+
(schema.toAttributes, runFunc)
108137

109138
// Queries the deprecated "mapred.reduce.tasks" property.
110139
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
111-
logWarning(
112-
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
113-
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
114-
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}"))
140+
val runFunc = (sqlContext: SQLContext) => {
141+
logWarning(
142+
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
143+
s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
144+
Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString))
145+
}
146+
(keyValueOutput, runFunc)
115147

116148
// Queries a single property.
117149
case Some((key, None)) =>
118-
Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
150+
val runFunc = (sqlContext: SQLContext) => {
151+
val value =
152+
try {
153+
sqlContext.getConf(key)
154+
} catch {
155+
case _: NoSuchElementException => "<undefined>"
156+
}
157+
Seq(Row(key, value))
158+
}
159+
(keyValueOutput, runFunc)
119160
}
161+
162+
override val output: Seq[Attribute] = _output
163+
164+
override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext)
165+
120166
}
121167

122168
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ package object debug {
4848
*/
4949
implicit class DebugSQLContext(sqlContext: SQLContext) {
5050
def debug(): Unit = {
51-
sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
51+
sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
5252
}
5353
}
5454

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,12 +113,12 @@ private[sql] case class ParquetTableScan(
113113
.foreach(ParquetInputFormat.setFilterPredicate(conf, _))
114114

115115
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
116-
conf.set(
117-
SQLConf.PARQUET_CACHE_METADATA,
118-
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
116+
conf.setBoolean(
117+
SQLConf.PARQUET_CACHE_METADATA.key,
118+
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, true))
119119

120120
// Use task side metadata in parquet
121-
conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
121+
conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
122122

123123
val baseRDD =
124124
new org.apache.spark.rdd.NewHadoopRDD(

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ private[sql] class ParquetRelation2(
220220
}
221221

222222
conf.setClass(
223-
SQLConf.OUTPUT_COMMITTER_CLASS,
223+
SQLConf.OUTPUT_COMMITTER_CLASS.key,
224224
committerClass,
225225
classOf[ParquetOutputCommitter])
226226

@@ -259,7 +259,7 @@ private[sql] class ParquetRelation2(
259259
filters: Array[Filter],
260260
inputFiles: Array[FileStatus],
261261
broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
262-
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
262+
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
263263
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
264264
// Create the function to set variable Parquet confs at both driver and executor side.
265265
val initLocalJobFuncOpt =
@@ -498,7 +498,7 @@ private[sql] object ParquetRelation2 extends Logging {
498498
ParquetTypesConverter.convertToString(dataSchema.toAttributes))
499499

500500
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
501-
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
501+
conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
502502
}
503503

504504
/** This closure sets input paths at the driver side. */

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ private[sql] abstract class BaseWriterContainer(
323323

324324
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
325325
val committerClass = context.getConfiguration.getClass(
326-
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
326+
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
327327

328328
Option(committerClass).map { clazz =>
329329
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class LocalSQLContext
3838
protected[sql] class SQLSession extends super.SQLSession {
3939
protected[sql] override lazy val conf: SQLConf = new SQLConf {
4040
/** Fewer partitions to speed up testing. */
41-
override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
41+
override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5)
4242
}
4343
}
4444

0 commit comments

Comments
 (0)