Skip to content

Commit 037b1db

Browse files
committed
Add schema to SetCommand
1 parent 0520c3c commit 037b1db

File tree

5 files changed

+65
-35
lines changed

5 files changed

+65
-35
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
4444

4545
private val pair: Parser[LogicalPlan] =
4646
(key ~ ("=".r ~> value).?).? ^^ {
47-
case None => SetCommand(None, output)
48-
case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output)
47+
case None => SetCommand(None)
48+
case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)))
4949
}
5050

5151
def apply(input: String): LogicalPlan = parseAll(pair, input) match {

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -76,53 +76,86 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
7676
*/
7777
@DeveloperApi
7878
case class SetCommand(
79-
kv: Option[(String, Option[String])],
80-
override val output: Seq[Attribute])
79+
kv: Option[(String, Option[String])])
8180
extends RunnableCommand with Logging {
8281

83-
override def run(sqlContext: SQLContext): Seq[Row] = kv match {
82+
private def keyValueOutput: Seq[Attribute] = {
83+
val schema = StructType(
84+
StructField("key", StringType, false) ::
85+
StructField("value", StringType, false) :: Nil)
86+
schema.toAttributes
87+
}
88+
89+
private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match {
8490
// Configures the deprecated "mapred.reduce.tasks" property.
8591
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
86-
logWarning(
87-
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
88-
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
89-
if (value.toInt < 1) {
90-
val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " +
91-
"determining the number of reducers is not supported."
92-
throw new IllegalArgumentException(msg)
93-
} else {
94-
sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
95-
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS.key}=$value"))
92+
val runFunc = (sqlContext: SQLContext) => {
93+
logWarning(
94+
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
95+
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
96+
if (value.toInt < 1) {
97+
val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " +
98+
"determining the number of reducers is not supported."
99+
throw new IllegalArgumentException(msg)
100+
} else {
101+
sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
102+
Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
103+
}
96104
}
105+
(keyValueOutput, runFunc)
97106

98107
// Configures a single property.
99108
case Some((key, Some(value))) =>
100-
sqlContext.setConf(key, value)
101-
Seq(Row(s"$key=$value"))
109+
val runFunc = (sqlContext: SQLContext) => {
110+
sqlContext.setConf(key, value)
111+
Seq(Row(key, value))
112+
}
113+
(keyValueOutput, runFunc)
102114

103115
// (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
104116
// Queries all key-value pairs that are set in the SQLConf of the sqlContext.
105117
case None =>
106-
sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
118+
val runFunc = (sqlContext: SQLContext) => {
119+
sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
120+
}
121+
(keyValueOutput, runFunc)
107122

108123
// Queries all properties along with their default values and docs that are defined in the
109124
// SQLConf of the sqlContext.
110125
case Some(("-v", None)) =>
111-
sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
112-
Row(s"$key\tdefault: $defaultValue\t$doc")
126+
val runFunc = (sqlContext: SQLContext) => {
127+
sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
128+
Row(key, defaultValue, doc)
129+
}
113130
}
131+
val schema = StructType(
132+
StructField("key", StringType, false) ::
133+
StructField("default", StringType, false) ::
134+
StructField("meaning", StringType, false) :: Nil)
135+
(schema.toAttributes, runFunc)
114136

115137
// Queries the deprecated "mapred.reduce.tasks" property.
116138
case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
117-
logWarning(
118-
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
119-
s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
120-
Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS.key}=${sqlContext.conf.numShufflePartitions}"))
139+
val runFunc = (sqlContext: SQLContext) => {
140+
logWarning(
141+
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
142+
s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
143+
Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString))
144+
}
145+
(keyValueOutput, runFunc)
121146

122147
// Queries a single property.
123148
case Some((key, None)) =>
124-
Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
149+
val runFunc = (sqlContext: SQLContext) => {
150+
Seq(Row(key, sqlContext.getConf(key, "<undefined>")))
151+
}
152+
(keyValueOutput, runFunc)
125153
}
154+
155+
override val output: Seq[Attribute] = _output
156+
157+
override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext)
158+
126159
}
127160

128161
/**

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ private[sql] case class ParquetTableScan(
114114

115115
// Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
116116
conf.set(
117-
SQLConf.PARQUET_CACHE_METADATA,
118-
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
117+
SQLConf.PARQUET_CACHE_METADATA.key,
118+
sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA.key, "true"))
119119

120120
// Use task side metadata in parquet
121121
conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ private[hive] class SparkExecuteStatementOperation(
219219
result = hiveContext.sql(statement)
220220
logDebug(result.queryExecution.toString())
221221
result.queryExecution.logical match {
222-
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value))), _) =>
222+
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
223223
sessionToActivePool(parentSession.getSessionHandle) = value
224224
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
225225
case _ =>

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,18 +1084,15 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
10841084
val testKey = "spark.sql.key.usedfortestonly"
10851085
val testVal = "test.val.0"
10861086
val nonexistentKey = "nonexistent"
1087-
val KV = "([^=]+)=([^=]*)".r
1088-
def collectResults(df: DataFrame): Set[(String, String)] =
1087+
def collectResults(df: DataFrame): Set[Any] =
10891088
df.collect().map {
10901089
case Row(key: String, value: String) => key -> value
1091-
case Row(KV(key, value)) => key -> value
1090+
case Row(key: String, defaultValue: String, doc: String) => (key, defaultValue, doc)
10921091
}.toSet
10931092
conf.clear()
10941093

1095-
val expectedConfs = conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
1096-
s"$key\tdefault: $defaultValue\t$doc"
1097-
}
1098-
assertResult(expectedConfs)(sql("SET -v").collect().map(_(0)))
1094+
val expectedConfs = conf.getAllDefinedConfs.toSet
1095+
assertResult(expectedConfs)(collectResults(sql("SET -v")))
10991096

11001097
// "SET" itself returns all config variables currently specified in SQLConf.
11011098
// TODO: Should we be listing the default here always? probably...

0 commit comments

Comments
 (0)