Skip to content

Commit efd82db

Browse files
Clean up semantics of several cases of SET.
- Add planner & physical plan for SET so that the pipeline propagates it cleanly. - For HiveContext: make SQLConf and HiveConf equal after initialization of the context.
1 parent c1017c2 commit efd82db

File tree

5 files changed

+55
-13
lines changed

5 files changed

+55
-13
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import java.util.Properties
21+
2022
import scala.collection.mutable
2123

2224
/**
@@ -30,6 +32,12 @@ class SQLConf {
3032
settings.clear()
3133
}
3234

35+
def this(props: Properties) = {
36+
this()
37+
import scala.collection.JavaConversions._ // implicits for java.util.Properties
38+
props.foreach { case (k, v) => this.settings(k) = v }
39+
}
40+
3341
def set(key: String, value: String): SQLConf = {
3442
if (key == null) {
3543
throw new NullPointerException("null key")

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
5858
self =>
5959

6060
@transient
61-
val sqlConf: SQLConf = new SQLConf
61+
lazy val sqlConf: SQLConf = new SQLConf
6262

6363
@transient
6464
protected[sql] lazy val catalog: Catalog = new SimpleCatalog
@@ -193,9 +193,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
193193
protected[sql] class SparkPlanner extends SparkStrategies {
194194
val sparkContext = self.sparkContext
195195

196-
def sqlConf = self.sqlConf
196+
val sqlConf = self.sqlConf
197197

198198
val strategies: Seq[Strategy] =
199+
SetCommandStrategy(self) ::
199200
TakeOrdered ::
200201
PartialAggregation ::
201202
HashJoin ::
@@ -250,7 +251,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
250251
protected[sql] val planner = new SparkPlanner
251252

252253
@transient
253-
protected lazy val emptyResult =
254+
protected[sql] lazy val emptyResult =
254255
sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
255256

256257
/**
@@ -281,8 +282,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
281282
lazy val toRdd: RDD[Row] = {
282283
logical match {
283284
case SetCommand(key, value) =>
284-
sqlConf.set(key, value)
285-
emptyResult // TODO: should this return something else? Single row consisting of one 0?
285+
if (key.isDefined && value.isDefined) {
286+
sqlConf.set(key.get, value.get)
287+
}
288+
// It doesn't matter what we return here, since toRdd is used
289+
// to force the evaluation happen eagerly. To query the results,
290+
// one must use SchemaRDD operations to extract them.
291+
emptyResult
286292
case _ => executedPlan.execute()
287293
}
288294
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,4 +217,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
217217
case _ => Nil
218218
}
219219
}
220+
221+
case class SetCommandStrategy(context: SQLContext) extends Strategy {
222+
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
223+
case logical.SetCommand(key, value) =>
224+
Seq(execution.SetCommandPhysical(key, value, context))
225+
case _ => Nil
226+
}
227+
}
228+
220229
}

sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class SQLConfSuite extends QueryTest {
4949
conf.clear()
5050
}
5151

52+
// TODO: this is more suitable for a "SetCommandSuite"? Or put them inside the query suite?
53+
5254
test("parse SQL set commands") {
5355
sql(s"set $testKey=$testVal")
5456
assert(conf.get(testKey, testVal + "_") == testVal)
@@ -64,22 +66,26 @@ class SQLConfSuite extends QueryTest {
6466
sql(s"set $key=$vs")
6567
assert(conf.get(key, "0") == vs)
6668

67-
sql(s"set$key=")
69+
sql(s"set $key=")
6870
assert(conf.get(key, "0") == "")
6971

7072
conf.clear()
7173
}
7274

7375
test("set itself returns all config variables currently specified in Hive or SQLConf") {
74-
assert(sql("set").collect() === Seq(""))
76+
def fromRows(row: Array[Row]): Array[String] = row.map(_.getString(0))
77+
78+
assert(sql("set").collect().size == 0)
7579

7680
sql(s"SET $testKey=$testVal")
77-
assert(sql("set").collect() === Seq(s"$testKey=$testVal"))
81+
assert(fromRows(sql("set").collect()) sameElements Array(s"$testKey=$testVal"))
7882

7983
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
80-
// TODO: should this be 1-elem Seq or 2-elem?
81-
assert(sql("set").collect() ===
82-
Seq(s"$testKey=$testVal\n${testKey + testKey}=${testVal + testVal}"))
84+
assert(fromRows(sql("set").collect()) sameElements
85+
Array(
86+
s"$testKey=$testVal",
87+
s"${testKey + testKey}=${testVal + testVal}"
88+
))
8389
}
8490

8591
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,13 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
133133
}
134134
}
135135

136+
/**
137+
* Contract: after initialization of this HiveContext, these two confs should
138+
* contain exactly the same key-value pairs throughout the life time of `this`.
139+
*/
136140
@transient protected[hive] lazy val hiveconf = new HiveConf(classOf[SessionState])
141+
@transient override lazy val sqlConf: SQLConf = new SQLConf(hiveconf.getAllProperties)
142+
137143
@transient protected[hive] lazy val sessionState = new SessionState(hiveconf)
138144

139145
sessionState.err = new PrintStream(outputBuffer, true, "UTF-8")
@@ -222,6 +228,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
222228
val hiveContext = self
223229

224230
override val strategies: Seq[Strategy] = Seq(
231+
SetCommandStrategy(self),
225232
TakeOrdered,
226233
ParquetOperations,
227234
HiveTableScans,
@@ -259,8 +266,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
259266
analyzed match {
260267
case SetCommand(key, value) =>
261268
// Record the set command inside SQLConf, as well as have Hive execute it.
262-
sqlConf.set(key, value)
263-
processCmd(s"set $key=$value")
269+
if (key.isDefined && value.isDefined) {
270+
sqlConf.set(key.get, value.get)
271+
processCmd(s"SET $key=$value")
272+
}
273+
// Only the above case needs to be executed in Hive eagerly (i.e. now).
274+
// The other cases will be taken care of when the actual results are
275+
// being extracted.
276+
emptyResult
264277
case NativeCommand(cmd) =>
265278
processCmd(cmd)
266279
case _ =>

0 commit comments

Comments
 (0)