Skip to content

Commit 73412be

Browse files
Move SQLConf to Catalyst & add default val for sizeInBytes.
Conflicts: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala
1 parent 7a60ab7 commit 73412be

File tree

4 files changed

+56
-27
lines changed

4 files changed

+56
-27
lines changed

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala renamed to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SQLConf.scala

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,34 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql
18+
package org.apache.spark.sql.catalyst.planning
1919

2020
import java.util.Properties
2121

2222
import scala.collection.JavaConverters._
2323

24+
private object SQLConf {
25+
@transient protected[spark] val confSettings = java.util.Collections.synchronizedMap(
26+
new java.util.HashMap[String, String]())
27+
}
28+
2429
/**
25-
* SQLConf holds mutable config parameters and hints. These can be set and
26-
* queried either by passing SET commands into Spark SQL's DSL
27-
* functions (sql(), hql(), etc.), or by programmatically using setters and
28-
* getters of this class.
30+
* A trait that enables the setting and getting of mutable config parameters/hints. The central
31+
* location for storing them is uniquely located in the same-name private companion object.
32+
* Therefore, all classes that mix in this trait share all the hints.
33+
*
34+
* In the presence of a SQLContext, these can be set and queried either by passing SET commands
35+
* into Spark SQL's DSL functions (sql(), hql(), etc.). Otherwise, users of this trait can
36+
* modify the hints by programmatically calling the setters and getters of this trait.
2937
*
3038
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
3139
*/
3240
trait SQLConf {
3341
import SQLConf._
3442

43+
import SQLConf._
44+
protected[spark] val settings = confSettings
45+
3546
/** ************************ Spark SQL Params/Hints ******************* */
3647
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
3748

@@ -40,50 +51,58 @@ trait SQLConf {
4051

4152
/**
4253
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
43-
* a broadcast value during the physical executions of join operations. Setting this to 0
54+
* a broadcast value during the physical executions of join operations. Setting this to -1
4455
* effectively disables auto conversion.
45-
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
56+
*
57+
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
4658
*/
4759
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
4860

49-
/** ********************** SQLConf functionality methods ************ */
61+
/**
62+
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
63+
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
64+
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
65+
*/
66+
private[spark] def statsDefaultSizeInBytes: Long =
67+
getOption("spark.sql.catalyst.stats.sizeInBytes").map(_.toLong)
68+
.getOrElse(autoConvertJoinSize + 1)
5069

51-
@transient
52-
private val settings = java.util.Collections.synchronizedMap(
53-
new java.util.HashMap[String, String]())
70+
/** ********************** SQLConf functionality methods ************ */
5471

5572
def set(props: Properties): Unit = {
56-
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
73+
confSettings.synchronized {
74+
props.asScala.foreach { case (k, v) => confSettings.put(k, v) }
75+
}
5776
}
5877

5978
def set(key: String, value: String): Unit = {
6079
require(key != null, "key cannot be null")
61-
require(value != null, s"value cannot be null for $key")
62-
settings.put(key, value)
80+
require(value != null, s"value cannot be null for key: $key")
81+
confSettings.put(key, value)
6382
}
6483

6584
def get(key: String): String = {
66-
Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key))
85+
Option(confSettings.get(key)).getOrElse(throw new NoSuchElementException(key))
6786
}
6887

6988
def get(key: String, defaultValue: String): String = {
70-
Option(settings.get(key)).getOrElse(defaultValue)
89+
Option(confSettings.get(key)).getOrElse(defaultValue)
7190
}
7291

73-
def getAll: Array[(String, String)] = settings.synchronized { settings.asScala.toArray }
92+
def getAll: Array[(String, String)] = confSettings.synchronized { confSettings.asScala.toArray }
7493

75-
def getOption(key: String): Option[String] = Option(settings.get(key))
94+
def getOption(key: String): Option[String] = Option(confSettings.get(key))
7695

77-
def contains(key: String): Boolean = settings.containsKey(key)
96+
def contains(key: String): Boolean = confSettings.containsKey(key)
7897

7998
def toDebugString: String = {
80-
settings.synchronized {
81-
settings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n")
99+
confSettings.synchronized {
100+
confSettings.asScala.toArray.sorted.map{ case (k, v) => s"$k=$v" }.mkString("\n")
82101
}
83102
}
84103

85104
private[spark] def clear() {
86-
settings.clear()
105+
confSettings.clear()
87106
}
88107

89108
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,24 @@ package org.apache.spark.sql.catalyst.plans.logical
1919

2020
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2121
import org.apache.spark.sql.catalyst.expressions._
22+
import org.apache.spark.sql.catalyst.planning.SQLConf
2223
import org.apache.spark.sql.catalyst.plans.QueryPlan
2324
import org.apache.spark.sql.catalyst.types.StructType
2425
import org.apache.spark.sql.catalyst.trees
2526

26-
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
27+
abstract class LogicalPlan extends QueryPlan[LogicalPlan] with SQLConf {
2728
self: Product =>
2829

30+
// TODO: make a case class?
2931
protected class Statistics {
3032
lazy val childrenStats = children.map(_.statistics)
33+
3134
lazy val numTuples: Long = childrenStats.map(_.numTuples).sum
32-
lazy val sizeInBytes: Long = childrenStats.map(_.sizeInBytes).sum
35+
36+
lazy val sizeInBytes: Long = {
37+
val sum = childrenStats.map(_.sizeInBytes).sum
38+
if (sum == 0) statsDefaultSizeInBytes else sum
39+
}
3340
}
3441

3542
/**

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,23 @@
1717

1818
package org.apache.spark.sql
1919

20+
2021
import scala.language.implicitConversions
2122
import scala.reflect.runtime.universe.TypeTag
2223

2324
import org.apache.hadoop.conf.Configuration
2425

2526
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
2627
import org.apache.spark.rdd.RDD
27-
import org.apache.spark.sql.catalyst.analysis._
2828
import org.apache.spark.sql.catalyst.ScalaReflection
29-
import org.apache.spark.sql.catalyst.expressions._
29+
import org.apache.spark.sql.catalyst.analysis._
3030
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
31-
import org.apache.spark.sql.catalyst.types._
31+
import org.apache.spark.sql.catalyst.expressions._
3232
import org.apache.spark.sql.catalyst.optimizer.Optimizer
33+
import org.apache.spark.sql.catalyst.planning.SQLConf
3334
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3435
import org.apache.spark.sql.catalyst.rules.RuleExecutor
36+
import org.apache.spark.sql.catalyst.types._
3537
import org.apache.spark.sql.columnar.InMemoryRelation
3638
import org.apache.spark.sql.execution._
3739
import org.apache.spark.sql.execution.SparkStrategies

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
8282
if left.statistics.sizeInBytes <= sqlContext.autoConvertJoinSize =>
8383
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
8484

85+
// TODO: use optimization here as well
8586
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
8687
val hashJoin =
8788
execution.ShuffledHashJoin(

0 commit comments

Comments
 (0)