Skip to content

Commit f3c1b33

Browse files
committed
Refactor SQLConf to display better error message
1. Add `SQLConfEntry` to store the information about a configuration. For those configurations that cannot be found in `sql-programming-guide.md`, I left the doc as `<TODO>`. 2. Verify the value when setting a configuration if this is in SQLConf. 3. Add a command `conf` to display all public configurations.
1 parent 7658eb2 commit f3c1b33

32 files changed

+640
-202
lines changed

docs/sql-programming-guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,7 +1220,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
12201220
<td>false</td>
12211221
<td>
12221222
Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do
1223-
not differentiate between binary data and strings when writing out the Parquet schema. This
1223+
not differentiate between binary data and strings when writing out the Parquet schema. This
12241224
flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
12251225
</td>
12261226
</tr>
@@ -1237,7 +1237,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
12371237
<td><code>spark.sql.parquet.cacheMetadata</code></td>
12381238
<td>true</td>
12391239
<td>
1240-
Turns on caching of Parquet schema metadata. Can speed up querying of static data.
1240+
Turns on caching of Parquet schema metadata. Can speed up querying of static data.
12411241
</td>
12421242
</tr>
12431243
<tr>

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 332 additions & 89 deletions
Large diffs are not rendered by default.

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
8686
* @group config
8787
* @since 1.0.0
8888
*/
89-
def setConf(key: String, value: String): Unit = conf.setConf(key, value)
89+
def setConf(key: String, value: String): Unit = conf.setRawConf(key, value)
9090

9191
/**
9292
* Return the value of Spark SQL configuration property for the given key.
9393
*
9494
* @group config
9595
* @since 1.0.0
9696
*/
97-
def getConf(key: String): String = conf.getConf(key)
97+
def getConf(key: String): String = conf.getRawConf(key)
9898

9999
/**
100100
* Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -103,7 +103,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
103103
* @group config
104104
* @since 1.0.0
105105
*/
106-
def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
106+
def getConf(key: String, defaultValue: String): String = conf.getRawConf(key, defaultValue)
107107

108108
/**
109109
* Return all the configuration properties that have been set (i.e. not the default).
@@ -154,7 +154,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
154154
// SET spark.sql.dialect=sql; Let's reset as default dialect automatically.
155155
val dialect = conf.dialect
156156
// reset the sql dialect
157-
conf.unsetConf(SQLConf.DIALECT)
157+
conf.unsetConf(SQLConf.DIALECT.key)
158158
// throw out the exception, and the default sql dialect will take effect for next query.
159159
throw new DialectException(
160160
s"""Instantiating dialect '$dialect' failed.

sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
5757
protected val AS = Keyword("AS")
5858
protected val CACHE = Keyword("CACHE")
5959
protected val CLEAR = Keyword("CLEAR")
60+
protected val CONF = Keyword("CONF")
6061
protected val IN = Keyword("IN")
6162
protected val LAZY = Keyword("LAZY")
6263
protected val SET = Keyword("SET")
@@ -65,7 +66,8 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
6566
protected val TABLES = Keyword("TABLES")
6667
protected val UNCACHE = Keyword("UNCACHE")
6768

68-
override protected lazy val start: Parser[LogicalPlan] = cache | uncache | set | show | others
69+
override protected lazy val start: Parser[LogicalPlan] =
70+
cache | uncache | set | show | conf | others
6971

7072
private lazy val cache: Parser[LogicalPlan] =
7173
CACHE ~> LAZY.? ~ (TABLE ~> ident) ~ (AS ~> restInput).? ^^ {
@@ -90,6 +92,9 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
9092
case _ ~ dbName => ShowTablesCommand(dbName)
9193
}
9294

95+
private lazy val conf: Parser[LogicalPlan] =
96+
CONF ^^^ ConfCommand
97+
9398
private lazy val others: Parser[LogicalPlan] =
9499
wholeInput ^^ {
95100
case input => fallback(input)

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ case class SetCommand(
9191
"determining the number of reducers is not supported."
9292
throw new IllegalArgumentException(msg)
9393
} else {
94-
sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
94+
sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
9595
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
9696
}
9797

@@ -254,3 +254,26 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
254254
rows
255255
}
256256
}
257+
258+
/**
259+
* TODO
260+
*
261+
* :: DeveloperApi ::
262+
*/
263+
@DeveloperApi
264+
case object ConfCommand extends RunnableCommand {
265+
266+
// The result of SHOW TABLES has two columns, tableName and isTemporary.
267+
override val output: Seq[Attribute] = {
268+
val schema = StructType(
269+
StructField("key", StringType, false) ::
270+
StructField("default", StringType, false) ::
271+
StructField("meaning", StringType, false) :: Nil)
272+
273+
schema.toAttributes
274+
}
275+
276+
override def run(sqlContext: SQLContext): Seq[Row] = {
277+
sqlContext.conf.getAllDefinedConfs.map { case (k, d, v) => Row(k, d, v) }
278+
}
279+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ package object debug {
4646
*/
4747
implicit class DebugSQLContext(sqlContext: SQLContext) {
4848
def debug(): Unit = {
49-
sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
49+
sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS.key, "false")
5050
}
5151
}
5252

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ private[sql] case class ParquetTableScan(
114114

115115
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
116116
conf.set(
117-
SQLConf.PARQUET_CACHE_METADATA,
118-
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
117+
SQLConf.PARQUET_CACHE_METADATA.key,
118+
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA.key, "true"))
119119

120120
val baseRDD =
121121
new org.apache.spark.rdd.NewHadoopRDD(
@@ -536,7 +536,7 @@ private[parquet] class FilteringParquetRowInputFormat
536536

537537
import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.blockLocationCache
538538

539-
val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
539+
val cacheMetadata = configuration.getBoolean(SQLConf.PARQUET_CACHE_METADATA.key, true)
540540

541541
val splits = mutable.ArrayBuffer.empty[ParquetInputSplit]
542542
val filter: Filter = ParquetInputFormat.getFilter(configuration)

sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ private[sql] class ParquetRelation2(
220220
}
221221

222222
conf.setClass(
223-
SQLConf.OUTPUT_COMMITTER_CLASS,
223+
SQLConf.OUTPUT_COMMITTER_CLASS.key,
224224
committerClass,
225225
classOf[ParquetOutputCommitter])
226226

@@ -259,7 +259,7 @@ private[sql] class ParquetRelation2(
259259
filters: Array[Filter],
260260
inputFiles: Array[FileStatus],
261261
broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
262-
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
262+
val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA.key, "true").toBoolean
263263
val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
264264
// Create the function to set variable Parquet confs at both driver and executor side.
265265
val initLocalJobFuncOpt =
@@ -498,7 +498,7 @@ private[sql] object ParquetRelation2 extends Logging {
498498
ParquetTypesConverter.convertToString(dataSchema.toAttributes))
499499

500500
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
501-
conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
501+
conf.set(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache.toString)
502502
}
503503

504504
/** This closure sets input paths at the driver side. */

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ private[sql] abstract class BaseWriterContainer(
332332

333333
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
334334
val committerClass = context.getConfiguration.getClass(
335-
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
335+
SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
336336

337337
Option(committerClass).map { clazz =>
338338
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class LocalSQLContext
3838
protected[sql] class SQLSession extends super.SQLSession {
3939
protected[sql] override lazy val conf: SQLConf = new SQLConf {
4040
/** Fewer partitions to speed up testing. */
41-
override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
41+
override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5)
4242
}
4343
}
4444

0 commit comments

Comments
 (0)