Skip to content

Commit cb722c1

Browse files
Finish up SQLConf patch.
1 parent 4ebf362 commit cb722c1

File tree

6 files changed

+56
-21
lines changed

6 files changed

+56
-21
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
131131
protected val OUTER = Keyword("OUTER")
132132
protected val RIGHT = Keyword("RIGHT")
133133
protected val SELECT = Keyword("SELECT")
134+
protected val SET = Keyword("SET")
134135
protected val STRING = Keyword("STRING")
135136
protected val SUM = Keyword("SUM")
136137
protected val TRUE = Keyword("TRUE")
@@ -168,11 +169,21 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
168169
}
169170
}
170171

171-
protected lazy val query: Parser[LogicalPlan] =
172-
select * (
173-
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
174-
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
175-
) | insert
172+
protected lazy val query: Parser[LogicalPlan] = (
173+
setCommand
174+
| select * (
175+
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
176+
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
177+
)
178+
| insert
179+
)
180+
181+
protected lazy val setCommand: Parser[LogicalPlan] = {
182+
val keyVal = ident | numericLit | stringLit
183+
(SET ~> keyVal <~ "=") ~ keyVal <~ opt(";") ^^ {
184+
case key ~ value => SetCommand(key, value)
185+
}
186+
}
176187

177188
protected lazy val select: Parser[LogicalPlan] =
178189
SELECT ~> opt(DISTINCT) ~ projections ~

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ abstract class Command extends LeafNode {
110110
*/
111111
case class NativeCommand(cmd: String) extends Command
112112

113+
/**
114+
* Commands of the form "set key=value".
115+
*/
116+
case class SetCommand(key: String, value: String) extends Command
117+
113118
/**
114119
* Returned by a parser when the users only wants to see what query plan would be executed, without
115120
* actually performing the execution.

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ class SQLConf {
2626

2727
private val settings = new mutable.HashMap[String, String]()
2828

29+
private[spark] def clear() {
30+
settings.clear()
31+
}
32+
2933
def set(key: String, value: String): SQLConf = {
3034
if (key == null) {
3135
throw new NullPointerException("null key")

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.types._
3333
import org.apache.spark.sql.catalyst.optimizer.Optimizer
34-
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
34+
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, Subquery, LogicalPlan}
3535
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3636

3737
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
@@ -40,7 +40,6 @@ import org.apache.spark.sql.execution._
4040
import org.apache.spark.sql.execution.SparkStrategies
4141

4242
import org.apache.spark.sql.parquet.ParquetRelation
43-
import org.apache.spark.sql.SQLConf
4443

4544
/**
4645
* :: AlphaComponent ::
@@ -250,6 +249,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
250249
@transient
251250
protected[sql] val planner = new SparkPlanner
252251

252+
@transient
253+
protected lazy val emptyResult =
254+
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
255+
253256
/**
254257
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
255258
* inserting shuffle operations as needed.
@@ -275,7 +278,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
275278
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
276279

277280
/** Internal version of the RDD. Avoids copies and has no schema */
278-
lazy val toRdd: RDD[Row] = executedPlan.execute()
281+
lazy val toRdd: RDD[Row] = {
282+
logical match {
283+
case SetCommand(key, value) =>
284+
sqlConf.set(key, value)
285+
emptyResult
286+
case _ => executedPlan.execute()
287+
}
288+
}
279289

280290
protected def stringOrError[A](f: => A): String =
281291
try f.toString catch { case e: Throwable => e.toString }

sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,15 @@ package org.apache.spark.sql
2020
import org.apache.spark.sql.test._
2121

2222
/* Implicits */
23-
//import TestSQLContext._
24-
//import TestData._
23+
import TestSQLContext._
2524

2625
class SQLConfSuite extends QueryTest {
2726

28-
// Make sure the tables are loaded.
29-
// TestData
30-
31-
test("basic setting and getting in SQLConf") {
32-
val conf = TestSQLContext.sqlConf
33-
val testKey = "test.key"
34-
val testVal = "test.val"
27+
val conf = TestSQLContext.sqlConf
28+
val testKey = "test.key.0"
29+
val testVal = "test.val.0"
3530

31+
test("programmatic ways of basic setting and getting") {
3632
assert(conf != null)
3733
assert(conf.getOption(testKey).isEmpty)
3834
assert(conf.getAll.toSet === Set())
@@ -49,6 +45,19 @@ class SQLConfSuite extends QueryTest {
4945
assert(TestSQLContext.sqlConf.get(testKey, testVal + "_") == testVal)
5046
assert(TestSQLContext.sqlConf.getOption(testKey) == Some(testVal))
5147
assert(TestSQLContext.sqlConf.contains(testKey))
48+
49+
conf.clear()
50+
}
51+
52+
test("SQLConf picks up SQL set commands") {
53+
sql(s"set $testKey=$testVal")
54+
assert(conf.get(testKey, testVal + "_") == testVal)
55+
assert(TestSQLContext.sqlConf.get(testKey, testVal + "_") == testVal)
56+
57+
sql("set mapred.reduce.tasks=20")
58+
assert(conf.get("mapred.reduce.tasks", "0") == "20")
59+
sql("set mapred.reduce.tasks = 40")
60+
assert(conf.get("mapred.reduce.tasks", "0") == "40")
5261
}
5362

5463
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
234234
@transient
235235
override protected[sql] val planner = hivePlanner
236236

237-
@transient
238-
protected lazy val emptyResult =
239-
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
240-
241237
/** Extends QueryExecution with hive specific features. */
242238
protected[sql] abstract class QueryExecution extends super.QueryExecution {
243239
// TODO: Create mixin for the analyzer instead of overriding things here.

0 commit comments

Comments
 (0)