Skip to content

Commit c7db274

Browse files
concretevitaminmarmbrus
authored andcommitted
[SPARK-2393][SQL] Cost estimation optimization framework for Catalyst logical plans & sample usage.
The idea is that every Catalyst logical plan gets hold of a Statistics class, the usage of which provides useful estimations on various statistics. See the implementations of `MetastoreRelation`. This patch also includes several usages of the estimation interface in the planner. For instance, we now use physical table sizes from the estimate interface to convert an equi-join to a broadcast join (when doing so is beneficial, as determined by a size threshold). Finally, there are a couple minor accompanying changes including: - Remove the not-in-use `BaseRelation`. - Make SparkLogicalPlan take a `SQLContext` in the second param list. Author: Zongheng Yang <[email protected]> Closes #1238 from concretevitamin/estimates and squashes the following commits: 329071d [Zongheng Yang] Address review comments; turn config name from string to field in SQLConf. 8663e84 [Zongheng Yang] Use BigInt for stat; for logical leaves, by default throw an exception. 2f2fb89 [Zongheng Yang] Fix statistics for SparkLogicalPlan. 9951305 [Zongheng Yang] Remove childrenStats. 16fc60a [Zongheng Yang] Avoid calling statistics on plans if auto join conversion is disabled. 8bd2816 [Zongheng Yang] Add a note on performance of statistics. 6e594b8 [Zongheng Yang] Get size info from metastore for MetastoreRelation. 01b7a3e [Zongheng Yang] Update scaladoc for a field and move it to @param section. 549061c [Zongheng Yang] Remove numTuples in Statistics for now. 729a8e2 [Zongheng Yang] Update docs to be more explicit. 573e644 [Zongheng Yang] Remove singleton SQLConf and move back `settings` to the trait. 2d99eb5 [Zongheng Yang] {Cleanup, use synchronized in, enrich} StatisticsSuite. ca5b825 [Zongheng Yang] Inject SQLContext into SparkLogicalPlan, removing SQLConf mixin from it. 43d38a6 [Zongheng Yang] Revert optimization for BroadcastNestedLoopJoin (this fixes tests). 0ef9e5b [Zongheng Yang] Use multiplication instead of sum for default estimates. 4ef0d26 [Zongheng Yang] Make Statistics a case class. 3ba8f3e [Zongheng Yang] Add comment. e5bcf5b [Zongheng Yang] Fix optimization conditions & update scala docs to explain. 7d9216a [Zongheng Yang] Apply estimation to planning ShuffleHashJoin & BroadcastNestedLoopJoin. 73cde01 [Zongheng Yang] Move SQLConf back. Assign default sizeInBytes to SparkLogicalPlan. 73412be [Zongheng Yang] Move SQLConf to Catalyst & add default val for sizeInBytes. 7a60ab7 [Zongheng Yang] s/Estimates/Statistics, s/cardinality/numTuples. de3ae13 [Zongheng Yang] Add parquetAfter() properly in test. dcff9bd [Zongheng Yang] Cleanups. 84301a4 [Zongheng Yang] Refactors. 5bf5586 [Zongheng Yang] Typo. 56a8e6e [Zongheng Yang] Prototype impl of estimations for Catalyst logical plans.
1 parent dc96536 commit c7db274

File tree

18 files changed

+256
-124
lines changed

18 files changed

+256
-124
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
2020
import org.apache.spark.sql.catalyst.{errors, trees}
2121
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2222
import org.apache.spark.sql.catalyst.expressions._
23-
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
23+
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
2424
import org.apache.spark.sql.catalyst.trees.TreeNode
2525

2626
/**
@@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
3636
case class UnresolvedRelation(
3737
databaseName: Option[String],
3838
tableName: String,
39-
alias: Option[String] = None) extends BaseRelation {
39+
alias: Option[String] = None) extends LeafNode {
4040
override def output = Nil
4141
override lazy val resolved = false
4242
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala

Lines changed: 0 additions & 24 deletions
This file was deleted.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.trees
2626
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
2727
self: Product =>
2828

29+
/**
30+
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
31+
* corresponding statistic produced by the children. To override this behavior, override
32+
* `statistics` and assign it an overriden version of `Statistics`.
33+
*
34+
* '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the
35+
* performance of the implementations. The reason is that estimations might get triggered in
36+
* performance-critical processes, such as query plan planning.
37+
*
38+
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
39+
* defaults to the product of children's `sizeInBytes`.
40+
*/
41+
case class Statistics(
42+
sizeInBytes: BigInt
43+
)
44+
lazy val statistics: Statistics = Statistics(
45+
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product
46+
)
47+
2948
/**
3049
* Returns the set of attributes that are referenced by this node
3150
* during evaluation.
@@ -92,6 +111,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
92111
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
93112
self: Product =>
94113

114+
override lazy val statistics: Statistics =
115+
throw new UnsupportedOperationException("default leaf nodes don't have meaningful Statistics")
116+
95117
// Leaf nodes by definition cannot reference any input attributes.
96118
override def references = Set.empty
97119
}

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,31 @@ import java.util.Properties
2121

2222
import scala.collection.JavaConverters._
2323

24+
object SQLConf {
25+
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
26+
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
27+
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
28+
29+
object Deprecated {
30+
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
31+
}
32+
}
33+
2434
/**
25-
* SQLConf holds mutable config parameters and hints. These can be set and
26-
* queried either by passing SET commands into Spark SQL's DSL
27-
* functions (sql(), hql(), etc.), or by programmatically using setters and
28-
* getters of this class.
35+
* A trait that enables the setting and getting of mutable config parameters/hints.
36+
*
37+
* In the presence of a SQLContext, these can be set and queried by passing SET commands
38+
* into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can
39+
* modify the hints by programmatically calling the setters and getters of this trait.
2940
*
30-
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
41+
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
3142
*/
3243
trait SQLConf {
3344
import SQLConf._
3445

46+
@transient protected[spark] val settings = java.util.Collections.synchronizedMap(
47+
new java.util.HashMap[String, String]())
48+
3549
/** ************************ Spark SQL Params/Hints ******************* */
3650
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
3751

@@ -40,28 +54,33 @@ trait SQLConf {
4054

4155
/**
4256
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
43-
* a broadcast value during the physical executions of join operations. Setting this to 0
57+
* a broadcast value during the physical executions of join operations. Setting this to -1
4458
* effectively disables auto conversion.
45-
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
59+
*
60+
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
4661
*/
47-
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
62+
private[spark] def autoBroadcastJoinThreshold: Int =
63+
get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
4864

49-
/** A comma-separated list of table names marked to be broadcasted during joins. */
50-
private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
65+
/**
66+
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
67+
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
68+
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
69+
*/
70+
private[spark] def defaultSizeInBytes: Long =
71+
getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)
5172

5273
/** ********************** SQLConf functionality methods ************ */
5374

54-
@transient
55-
private val settings = java.util.Collections.synchronizedMap(
56-
new java.util.HashMap[String, String]())
57-
5875
def set(props: Properties): Unit = {
59-
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
76+
settings.synchronized {
77+
props.asScala.foreach { case (k, v) => settings.put(k, v) }
78+
}
6079
}
6180

6281
def set(key: String, value: String): Unit = {
6382
require(key != null, "key cannot be null")
64-
require(value != null, s"value cannot be null for $key")
83+
require(value != null, s"value cannot be null for key: $key")
6584
settings.put(key, value)
6685
}
6786

@@ -90,13 +109,3 @@ trait SQLConf {
90109
}
91110

92111
}
93-
94-
object SQLConf {
95-
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
96-
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
97-
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
98-
99-
object Deprecated {
100-
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
101-
}
102-
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration
2424

2525
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
2626
import org.apache.spark.rdd.RDD
27-
import org.apache.spark.sql.catalyst.analysis._
2827
import org.apache.spark.sql.catalyst.ScalaReflection
29-
import org.apache.spark.sql.catalyst.expressions._
28+
import org.apache.spark.sql.catalyst.analysis._
3029
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
31-
import org.apache.spark.sql.catalyst.types._
30+
import org.apache.spark.sql.catalyst.expressions._
3231
import org.apache.spark.sql.catalyst.optimizer.Optimizer
3332
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3433
import org.apache.spark.sql.catalyst.rules.RuleExecutor
34+
import org.apache.spark.sql.catalyst.types._
3535
import org.apache.spark.sql.columnar.InMemoryRelation
3636
import org.apache.spark.sql.execution._
3737
import org.apache.spark.sql.execution.SparkStrategies
@@ -86,7 +86,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
8686
* @group userf
8787
*/
8888
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
89-
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
89+
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
9090

9191
/**
9292
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
@@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
127127
*/
128128
@Experimental
129129
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
130-
new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
130+
new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))
131131

132132
/**
133133
* :: Experimental ::
@@ -170,11 +170,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
170170
* @group userf
171171
*/
172172
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
173-
val name = tableName
174-
val newPlan = rdd.logicalPlan transform {
175-
case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name)
176-
}
177-
catalog.registerTable(None, tableName, newPlan)
173+
catalog.registerTable(None, tableName, rdd.logicalPlan)
178174
}
179175

180176
/**
@@ -212,7 +208,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
212208
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
213209
inMem.cachedColumnBuffers.unpersist()
214210
catalog.unregisterTable(None, tableName)
215-
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
211+
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
216212
case inMem: InMemoryRelation =>
217213
inMem.cachedColumnBuffers.unpersist()
218214
catalog.unregisterTable(None, tableName)
@@ -405,7 +401,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
405401
new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
406402
}
407403
}
408-
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
404+
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
409405
}
410406

411407
}

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,8 @@ class SchemaRDD(
418418
* @group schema
419419
*/
420420
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
421-
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd)))
421+
new SchemaRDD(sqlContext,
422+
SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext))
422423
}
423424

424425
// =======================================================================

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike {
5656
// happen right away to let these side effects take place eagerly.
5757
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
5858
queryExecution.toRdd
59-
SparkLogicalPlan(queryExecution.executedPlan)
59+
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
6060
case _ =>
6161
baseLogicalPlan
6262
}

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
9292
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
9393
}
9494
}
95-
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
95+
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext))
9696
}
9797

9898
/**
@@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
120120
* @group userf
121121
*/
122122
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
123-
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
123+
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))
124124

125125
/**
126126
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ package org.apache.spark.sql.execution
1919

2020
import org.apache.spark.annotation.DeveloperApi
2121
import org.apache.spark.rdd.RDD
22-
import org.apache.spark.sql.{Logging, Row}
22+
import org.apache.spark.sql.{Logging, Row, SQLContext}
2323
import org.apache.spark.sql.catalyst.trees
2424
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
2525
import org.apache.spark.sql.catalyst.expressions.GenericRow
2626
import org.apache.spark.sql.catalyst.plans.QueryPlan
27-
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
27+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2828
import org.apache.spark.sql.catalyst.plans.physical._
2929

3030
/**
@@ -66,8 +66,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
6666
* linking.
6767
*/
6868
@DeveloperApi
69-
case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "SparkLogicalPlan")
70-
extends BaseRelation with MultiInstanceRelation {
69+
case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQLContext)
70+
extends LogicalPlan with MultiInstanceRelation {
7171

7272
def output = alreadyPlanned.output
7373
override def references = Set.empty
@@ -78,9 +78,15 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan, tableName: String = "Spar
7878
alreadyPlanned match {
7979
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
8080
case _ => sys.error("Multiple instance of the same relation detected.")
81-
}, tableName)
82-
.asInstanceOf[this.type]
81+
})(sqlContext).asInstanceOf[this.type]
8382
}
83+
84+
@transient override lazy val statistics = Statistics(
85+
// TODO: Instead of returning a default value here, find a way to return a meaningful size
86+
// estimate for RDDs. See PR 1238 for more discussions.
87+
sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
88+
)
89+
8490
}
8591

8692
private[sql] trait LeafNode extends SparkPlan with trees.LeafNode[SparkPlan] {

0 commit comments

Comments
 (0)