Skip to content

Commit 08ed9ad

Browse files
concretevitaminmarmbrus
authored andcommitted
[SPARK-1508][SQL] Add SQLConf to SQLContext.
This PR (1) introduces a new class SQLConf that stores key-value properties for a SQLContext (2) clean up the semantics of various forms of SET commands. The SQLConf class unlocks user-controllable optimization opportunities; for example, user can now override the number of partitions used during an Exchange. A SQLConf can be accessed and modified programmatically through its getters and setters. It can also be modified through SET commands executed by `sql()` or `hql()`. Note that users now have the ability to change a particular property for different queries inside the same Spark job, unlike settings configured in SparkConf. For SET commands: "SET" will return all properties currently set in a SQLConf, "SET key" will return the key-value pair (if set) or an undefined message, and "SET key=value" will call the setter on SQLConf, and if a HiveContext is used, it will be executed in Hive as well. Author: Zongheng Yang <[email protected]> Closes apache#956 from concretevitamin/sqlconf and squashes the following commits: 4968c11 [Zongheng Yang] Very minor cleanup. d74dde5 [Zongheng Yang] Remove the redundant mkQueryExecution() method. c129b86 [Zongheng Yang] Merge remote-tracking branch 'upstream/master' into sqlconf 26c40eb [Zongheng Yang] Make SQLConf a trait and have SQLContext mix it in. dd19666 [Zongheng Yang] Update a comment. baa5d29 [Zongheng Yang] Remove default param for shuffle partitions accessor. 5f7e6d8 [Zongheng Yang] Add default num partitions. 22d9ed7 [Zongheng Yang] Fix output() of Set physical. Add SQLConf param accessor method. e9856c4 [Zongheng Yang] Use java.util.Collections.synchronizedMap on a Java HashMap. 88dd0c8 [Zongheng Yang] Remove redundant SET Keyword. 271f0b1 [Zongheng Yang] Minor change. f8983d1 [Zongheng Yang] Minor changes per review comments. 1ce8a5e [Zongheng Yang] Invoke runSqlHive() in SQLConf#get for the HiveContext case. b766af9 [Zongheng Yang] Remove a test. d52e1bd [Zongheng Yang] De-hardcode number of shuffle partitions for BasicOperators (read from SQLConf). 555599c [Zongheng Yang] Bullet-proof (relatively) parsing SET per review comment. c2067e8 [Zongheng Yang] Mark SQLContext transient and put it in a second param list. 2ea8cdc [Zongheng Yang] Wrap long line. 41d7f09 [Zongheng Yang] Fix imports. 13279e6 [Zongheng Yang] Refactor the logic of eagerly processing SET commands. b14b83e [Zongheng Yang] In a HiveContext, make SQLConf a subset of HiveConf. 6983180 [Zongheng Yang] Move a SET test to SQLQuerySuite and make it complete. 5b67985 [Zongheng Yang] New line at EOF. c651797 [Zongheng Yang] Add commands.scala. efd82db [Zongheng Yang] Clean up semantics of several cases of SET. c1017c2 [Zongheng Yang] WIP in changing SetCommand to take two Options (for different semantics of SETs). 0f00d86 [Zongheng Yang] Add a test for singleton set command in SQL. 41acd75 [Zongheng Yang] Add a test for hql() in HiveQuerySuite. 2276929 [Zongheng Yang] Fix default hive result for set commands in HiveComparisonTest. 3b0c71b [Zongheng Yang] Remove Parser for set commands. A few other fixes. d0c4578 [Zongheng Yang] Tmux typo. 0ecea46 [Zongheng Yang] Changes for HiveQl and HiveContext. ce22d80 [Zongheng Yang] Fix parsing issues. cb722c1 [Zongheng Yang] Finish up SQLConf patch. 4ebf362 [Zongheng Yang] First cut at SQLConf inside SQLContext.
1 parent f971d6c commit 08ed9ad

File tree

14 files changed

+429
-61
lines changed

14 files changed

+429
-61
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,25 @@ import org.apache.spark.sql.catalyst.types._
4141
* for a SQL like language should checkout the HiveQL support in the sql/hive sub-project.
4242
*/
4343
class SqlParser extends StandardTokenParsers with PackratParsers {
44+
4445
def apply(input: String): LogicalPlan = {
45-
phrase(query)(new lexical.Scanner(input)) match {
46-
case Success(r, x) => r
47-
case x => sys.error(x.toString)
46+
// Special-case out set commands since the value fields can be
47+
// complex to handle without RegexParsers. Also this approach
48+
// is clearer for the several possible cases of set commands.
49+
if (input.trim.toLowerCase.startsWith("set")) {
50+
input.trim.drop(3).split("=", 2).map(_.trim) match {
51+
case Array("") => // "set"
52+
SetCommand(None, None)
53+
case Array(key) => // "set key"
54+
SetCommand(Some(key), None)
55+
case Array(key, value) => // "set key=value"
56+
SetCommand(Some(key), Some(value))
57+
}
58+
} else {
59+
phrase(query)(new lexical.Scanner(input)) match {
60+
case Success(r, x) => r
61+
case x => sys.error(x.toString)
62+
}
4863
}
4964
}
5065

@@ -169,11 +184,13 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
169184
}
170185
}
171186

172-
protected lazy val query: Parser[LogicalPlan] =
187+
protected lazy val query: Parser[LogicalPlan] = (
173188
select * (
174-
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
175-
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
176-
) | insert
189+
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
190+
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
191+
)
192+
| insert
193+
)
177194

178195
protected lazy val select: Parser[LogicalPlan] =
179196
SELECT ~> opt(DISTINCT) ~ projections ~

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
102102
*/
103103
abstract class Command extends LeafNode {
104104
self: Product =>
105-
def output: Seq[Attribute] = Seq.empty
105+
def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
106106
}
107107

108108
/**
@@ -111,6 +111,16 @@ abstract class Command extends LeafNode {
111111
*/
112112
case class NativeCommand(cmd: String) extends Command
113113

114+
/**
115+
* Commands of the form "SET (key) (= value)".
116+
*/
117+
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
118+
override def output = Seq(
119+
AttributeReference("key", StringType, nullable = false)(),
120+
AttributeReference("value", StringType, nullable = false)()
121+
)
122+
}
123+
114124
/**
115125
* Returned by a parser when the users only wants to see what query plan would be executed, without
116126
* actually performing the execution.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import java.util.Properties
21+
22+
import scala.collection.JavaConverters._
23+
24+
/**
25+
* SQLConf holds mutable config parameters and hints. These can be set and
26+
* queried either by passing SET commands into Spark SQL's DSL
27+
* functions (sql(), hql(), etc.), or by programmatically using setters and
28+
* getters of this class. This class is thread-safe.
29+
*/
30+
trait SQLConf {
31+
32+
/** Number of partitions to use for shuffle operators. */
33+
private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt
34+
35+
@transient
36+
private val settings = java.util.Collections.synchronizedMap(
37+
new java.util.HashMap[String, String]())
38+
39+
def set(props: Properties): Unit = {
40+
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
41+
}
42+
43+
def set(key: String, value: String): Unit = {
44+
require(key != null, "key cannot be null")
45+
require(value != null, s"value cannot be null for ${key}")
46+
settings.put(key, value)
47+
}
48+
49+
def get(key: String): String = {
50+
if (!settings.containsKey(key)) {
51+
throw new NoSuchElementException(key)
52+
}
53+
settings.get(key)
54+
}
55+
56+
def get(key: String, defaultValue: String): String = {
57+
if (!settings.containsKey(key)) defaultValue else settings.get(key)
58+
}
59+
60+
def getAll: Array[(String, String)] = settings.asScala.toArray
61+
62+
def getOption(key: String): Option[String] = {
63+
if (!settings.containsKey(key)) None else Some(settings.get(key))
64+
}
65+
66+
def contains(key: String): Boolean = settings.containsKey(key)
67+
68+
def toDebugString: String = {
69+
settings.synchronized {
70+
settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n")
71+
}
72+
}
73+
74+
private[spark] def clear() {
75+
settings.clear()
76+
}
77+
78+
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
3131
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.types._
3333
import org.apache.spark.sql.catalyst.optimizer.Optimizer
34-
import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
34+
import org.apache.spark.sql.catalyst.plans.logical.{SetCommand, LogicalPlan}
3535
import org.apache.spark.sql.catalyst.rules.RuleExecutor
3636

3737
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
@@ -52,6 +52,7 @@ import org.apache.spark.sql.parquet.ParquetRelation
5252
@AlphaComponent
5353
class SQLContext(@transient val sparkContext: SparkContext)
5454
extends Logging
55+
with SQLConf
5556
with dsl.ExpressionConversions
5657
with Serializable {
5758

@@ -190,6 +191,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
190191
protected[sql] class SparkPlanner extends SparkStrategies {
191192
val sparkContext = self.sparkContext
192193

194+
def numPartitions = self.numShufflePartitions
195+
193196
val strategies: Seq[Strategy] =
194197
CommandStrategy(self) ::
195198
TakeOrdered ::
@@ -246,37 +249,57 @@ class SQLContext(@transient val sparkContext: SparkContext)
246249
@transient
247250
protected[sql] val planner = new SparkPlanner
248251

252+
@transient
253+
protected[sql] lazy val emptyResult =
254+
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
255+
249256
/**
250257
* Prepares a planned SparkPlan for execution by binding references to specific ordinals, and
251258
* inserting shuffle operations as needed.
252259
*/
253260
@transient
254261
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
255262
val batches =
256-
Batch("Add exchange", Once, AddExchange) ::
263+
Batch("Add exchange", Once, AddExchange(self)) ::
257264
Batch("Prepare Expressions", Once, new BindReferences[SparkPlan]) :: Nil
258265
}
259266

260-
// TODO: or should we make QueryExecution protected[sql]?
261-
protected[sql] def mkQueryExecution(plan: LogicalPlan) = new QueryExecution {
262-
val logical = plan
263-
}
264-
265267
/**
266268
* The primary workflow for executing relational queries using Spark. Designed to allow easy
267269
* access to the intermediate phases of query execution for developers.
268270
*/
269271
protected abstract class QueryExecution {
270272
def logical: LogicalPlan
271273

274+
def eagerlyProcess(plan: LogicalPlan): RDD[Row] = plan match {
275+
case SetCommand(key, value) =>
276+
// Only this case needs to be executed eagerly. The other cases will
277+
// be taken care of when the actual results are being extracted.
278+
// In the case of HiveContext, sqlConf is overridden to also pass the
279+
// pair into its HiveConf.
280+
if (key.isDefined && value.isDefined) {
281+
set(key.get, value.get)
282+
}
283+
// It doesn't matter what we return here, since this is only used
284+
// to force the evaluation to happen eagerly. To query the results,
285+
// one must use SchemaRDD operations to extract them.
286+
emptyResult
287+
case _ => executedPlan.execute()
288+
}
289+
272290
lazy val analyzed = analyzer(logical)
273291
lazy val optimizedPlan = optimizer(analyzed)
274292
// TODO: Don't just pick the first one...
275293
lazy val sparkPlan = planner(optimizedPlan).next()
276294
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
277295

278296
/** Internal version of the RDD. Avoids copies and has no schema */
279-
lazy val toRdd: RDD[Row] = executedPlan.execute()
297+
lazy val toRdd: RDD[Row] = {
298+
logical match {
299+
case s: SetCommand => eagerlyProcess(s)
300+
case _ => executedPlan.execute()
301+
}
302+
}
280303

281304
protected def stringOrError[A](f: => A): String =
282305
try f.toString catch { case e: Throwable => e.toString }

sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.{HashPartitioner, RangePartitioner, SparkConf}
2222
import org.apache.spark.rdd.ShuffledRDD
23-
import org.apache.spark.sql.Row
23+
import org.apache.spark.sql.{SQLConf, SQLContext, Row}
2424
import org.apache.spark.sql.catalyst.errors.attachTree
2525
import org.apache.spark.sql.catalyst.expressions.{MutableProjection, RowOrdering}
2626
import org.apache.spark.sql.catalyst.plans.physical._
@@ -86,9 +86,9 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
8686
* [[catalyst.plans.physical.Distribution Distribution]] requirements for each operator by inserting
8787
* [[Exchange]] Operators where required.
8888
*/
89-
private[sql] object AddExchange extends Rule[SparkPlan] {
89+
private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
9090
// TODO: Determine the number of partitions.
91-
val numPartitions = 150
91+
def numPartitions = sqlContext.numShufflePartitions
9292

9393
def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
9494
case operator: SparkPlan =>

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.execution
1919

20-
import org.apache.spark.sql.{SQLContext, execution}
20+
import org.apache.spark.sql.{SQLConf, SQLContext, execution}
2121
import org.apache.spark.sql.catalyst.expressions._
2222
import org.apache.spark.sql.catalyst.planning._
2323
import org.apache.spark.sql.catalyst.plans._
@@ -193,8 +193,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
193193

194194
// Can we automate these 'pass through' operations?
195195
object BasicOperators extends Strategy {
196-
// TODO: Set
197-
val numPartitions = 200
196+
def numPartitions = self.numPartitions
197+
198198
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
199199
case logical.Distinct(child) =>
200200
execution.Aggregate(
@@ -234,11 +234,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
234234
}
235235
}
236236

237-
// TODO: this should be merged with SPARK-1508's SetCommandStrategy
238237
case class CommandStrategy(context: SQLContext) extends Strategy {
239238
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
239+
case logical.SetCommand(key, value) =>
240+
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
240241
case logical.ExplainCommand(child) =>
241-
val qe = context.mkQueryExecution(child)
242+
val qe = context.executePlan(child)
242243
Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context))
243244
case _ => Nil
244245
}

sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,45 @@
1717

1818
package org.apache.spark.sql.execution
1919

20+
import org.apache.spark.annotation.DeveloperApi
2021
import org.apache.spark.rdd.RDD
2122
import org.apache.spark.sql.{SQLContext, Row}
2223
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute}
2324

25+
/**
26+
* :: DeveloperApi ::
27+
*/
28+
@DeveloperApi
29+
case class SetCommandPhysical(key: Option[String], value: Option[String], output: Seq[Attribute])
30+
(@transient context: SQLContext) extends LeafNode {
31+
def execute(): RDD[Row] = (key, value) match {
32+
// Set value for key k; the action itself would
33+
// have been performed in QueryExecution eagerly.
34+
case (Some(k), Some(v)) => context.emptyResult
35+
// Query the value bound to key k.
36+
case (Some(k), None) =>
37+
val resultString = context.getOption(k) match {
38+
case Some(v) => s"$k=$v"
39+
case None => s"$k is undefined"
40+
}
41+
context.sparkContext.parallelize(Seq(new GenericRow(Array[Any](resultString))), 1)
42+
// Query all key-value pairs that are set in the SQLConf of the context.
43+
case (None, None) =>
44+
val pairs = context.getAll
45+
val rows = pairs.map { case (k, v) =>
46+
new GenericRow(Array[Any](s"$k=$v"))
47+
}.toSeq
48+
// Assume config parameters can fit into one split (machine) ;)
49+
context.sparkContext.parallelize(rows, 1)
50+
// The only other case is invalid semantics and is impossible.
51+
case _ => context.emptyResult
52+
}
53+
}
54+
55+
/**
56+
* :: DeveloperApi ::
57+
*/
58+
@DeveloperApi
2459
case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
2560
(@transient context: SQLContext) extends UnaryNode {
2661
def execute(): RDD[Row] = {

0 commit comments

Comments
 (0)