Skip to content

Commit 4ebf362

Browse files
First cut at SQLConf inside SQLContext.
1 parent 894ecde commit 4ebf362

File tree

5 files changed

+126
-2
lines changed

5 files changed

+126
-2
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import scala.collection.mutable
21+
22+
/**
23+
* SQLConf holds potentially query-dependent, mutable config parameters and hints.
24+
*/
25+
class SQLConf {
26+
27+
private val settings = new mutable.HashMap[String, String]()
28+
29+
def set(key: String, value: String): SQLConf = {
30+
if (key == null) {
31+
throw new NullPointerException("null key")
32+
}
33+
if (value == null) {
34+
throw new NullPointerException("null value")
35+
}
36+
settings(key) = value
37+
this
38+
}
39+
40+
def get(key: String): String = {
41+
settings.getOrElse(key, throw new NoSuchElementException(key))
42+
}
43+
44+
def get(key: String, defaultValue: String): String = {
45+
settings.getOrElse(key, defaultValue)
46+
}
47+
48+
def getAll: Array[(String, String)] = settings.clone().toArray
49+
50+
def getOption(key: String): Option[String] = {
51+
settings.get(key)
52+
}
53+
54+
def contains(key: String): Boolean = settings.contains(key)
55+
56+
def toDebugString: String = {
57+
settings.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n")
58+
}
59+
60+
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import org.apache.spark.sql.execution._
4040
import org.apache.spark.sql.execution.SparkStrategies
4141

4242
import org.apache.spark.sql.parquet.ParquetRelation
43+
import org.apache.spark.sql.SQLConf
4344

4445
/**
4546
* :: AlphaComponent ::
@@ -57,6 +58,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
5758

5859
self =>
5960

61+
@transient
62+
val sqlConf: SQLConf = new SQLConf
63+
6064
@transient
6165
protected[sql] lazy val catalog: Catalog = new SimpleCatalog
6266
@transient
@@ -190,6 +194,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
190194
protected[sql] class SparkPlanner extends SparkStrategies {
191195
val sparkContext = self.sparkContext
192196

197+
def sqlConf = self.sqlConf
198+
193199
val strategies: Seq[Strategy] =
194200
TakeOrdered ::
195201
PartialAggregation ::

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
2929
self: SQLContext#SparkPlanner =>
3030

3131
object HashJoin extends Strategy with PredicateHelper {
32+
33+
// var broadCastTables =
34+
// sqlConf.get("spark.sql.hints.broadcastTables", "").split(",")
35+
3236
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
3337
// Find inner joins where at least some predicates can be evaluated by matching hash keys
3438
// using the HashFilteredJoin pattern.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.apache.spark.sql.test._
21+
22+
/* Implicits */
23+
//import TestSQLContext._
24+
//import TestData._
25+
26+
class SQLConfSuite extends QueryTest {
27+
28+
// Make sure the tables are loaded.
29+
// TestData
30+
31+
test("basic setting and getting in SQLConf") {
32+
val conf = TestSQLContext.sqlConf
33+
val testKey = "test.key"
34+
val testVal = "test.val"
35+
36+
assert(conf != null)
37+
assert(conf.getOption(testKey).isEmpty)
38+
assert(conf.getAll.toSet === Set())
39+
40+
conf.set(testKey, testVal)
41+
assert(conf.get(testKey) == testVal)
42+
assert(conf.get(testKey, testVal + "_") == testVal)
43+
assert(conf.getOption(testKey) == Some(testVal))
44+
assert(conf.contains(testKey))
45+
46+
// Tests SQLConf as accessed from a SQLContext is mutable after
47+
// the latter is initialized, unlike SparkConf inside a SparkContext.
48+
assert(TestSQLContext.sqlConf.get(testKey) == testVal)
49+
assert(TestSQLContext.sqlConf.get(testKey, testVal + "_") == testVal)
50+
assert(TestSQLContext.sqlConf.getOption(testKey) == Some(testVal))
51+
assert(TestSQLContext.sqlConf.contains(testKey))
52+
}
53+
54+
}

sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ class PlannerSuite extends FunSuite {
3030
test("unions are collapsed") {
3131
val query = testData.unionAll(testData).unionAll(testData).logicalPlan
3232
val planned = BasicOperators(query).head
33-
val logicalUnions = query collect { case u: logical.Union => u}
34-
val physicalUnions = planned collect { case u: execution.Union => u}
33+
val logicalUnions = query collect { case u: logical.Union => u }
34+
val physicalUnions = planned collect { case u: execution.Union => u }
3535

3636
assert(logicalUnions.size === 2)
3737
assert(physicalUnions.size === 1)

0 commit comments

Comments
 (0)