Skip to content

Commit f34122d

Browse files
committed
Merge remote-tracking branch 'apache/master' into newCodeGen
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
2 parents 67b1c48 + c7db274 commit f34122d

File tree

22 files changed

+290
-147
lines changed

22 files changed

+290
-147
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
2020
import org.apache.spark.sql.catalyst.{errors, trees}
2121
import org.apache.spark.sql.catalyst.errors.TreeNodeException
2222
import org.apache.spark.sql.catalyst.expressions._
23-
import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
23+
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
2424
import org.apache.spark.sql.catalyst.trees.TreeNode
2525

2626
/**
@@ -36,7 +36,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
3636
case class UnresolvedRelation(
3737
databaseName: Option[String],
3838
tableName: String,
39-
alias: Option[String] = None) extends BaseRelation {
39+
alias: Option[String] = None) extends LeafNode {
4040
override def output = Nil
4141
override lazy val resolved = false
4242
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/BaseRelation.scala

Lines changed: 0 additions & 24 deletions
This file was deleted.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.trees
2626
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
2727
self: Product =>
2828

29+
/**
30+
* Estimates of various statistics. The default estimation logic simply lazily multiplies the
31+
* corresponding statistic produced by the children. To override this behavior, override
32+
* `statistics` and assign it an overriden version of `Statistics`.
33+
*
34+
* '''NOTE''': concrete and/or overriden versions of statistics fields should pay attention to the
35+
* performance of the implementations. The reason is that estimations might get triggered in
36+
* performance-critical processes, such as query plan planning.
37+
*
38+
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
39+
* defaults to the product of children's `sizeInBytes`.
40+
*/
41+
case class Statistics(
42+
sizeInBytes: BigInt
43+
)
44+
lazy val statistics: Statistics = Statistics(
45+
sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product
46+
)
47+
2948
/**
3049
* Returns the set of attributes that are referenced by this node
3150
* during evaluation.
@@ -92,6 +111,9 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
92111
abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
93112
self: Product =>
94113

114+
override lazy val statistics: Statistics =
115+
throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
116+
95117
// Leaf nodes by definition cannot reference any input attributes.
96118
override def references = Set.empty
97119
}

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,34 @@ import java.util.Properties
2121

2222
import scala.collection.JavaConverters._
2323

24+
object SQLConf {
25+
val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
26+
val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
27+
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
28+
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
29+
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
30+
val CODEGEN_ENABLED = "spark.sql.codegen"
31+
32+
object Deprecated {
33+
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
34+
}
35+
}
36+
2437
/**
25-
* SQLConf holds mutable config parameters and hints. These can be set and
26-
* queried either by passing SET commands into Spark SQL's DSL
27-
* functions (sql(), hql(), etc.), or by programmatically using setters and
28-
* getters of this class.
38+
* A trait that enables the setting and getting of mutable config parameters/hints.
39+
*
40+
* In the presence of a SQLContext, these can be set and queried by passing SET commands
41+
* into Spark SQL's query functions (sql(), hql(), etc.). Otherwise, users of this trait can
42+
* modify the hints by programmatically calling the setters and getters of this trait.
2943
*
30-
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
44+
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
3145
*/
3246
trait SQLConf {
3347
import SQLConf._
3448

49+
@transient protected[spark] val settings = java.util.Collections.synchronizedMap(
50+
new java.util.HashMap[String, String]())
51+
3552
/** ************************ Spark SQL Params/Hints ******************* */
3653
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
3754

@@ -52,28 +69,33 @@ trait SQLConf {
5269

5370
/**
5471
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
55-
* a broadcast value during the physical executions of join operations. Setting this to 0
72+
* a broadcast value during the physical executions of join operations. Setting this to -1
5673
* effectively disables auto conversion.
57-
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
74+
*
75+
* Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is also 10000.
5876
*/
59-
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
77+
private[spark] def autoBroadcastJoinThreshold: Int =
78+
get(AUTO_BROADCASTJOIN_THRESHOLD, "10000").toInt
6079

61-
/** A comma-separated list of table names marked to be broadcasted during joins. */
62-
private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
80+
/**
81+
* The default size in bytes to assign to a logical operator's estimation statistics. By default,
82+
* it is set to a larger value than `autoConvertJoinSize`, hence any logical operator without a
83+
* properly implemented estimation of this statistic will not be incorrectly broadcasted in joins.
84+
*/
85+
private[spark] def defaultSizeInBytes: Long =
86+
getOption(DEFAULT_SIZE_IN_BYTES).map(_.toLong).getOrElse(autoBroadcastJoinThreshold + 1)
6387

6488
/** ********************** SQLConf functionality methods ************ */
6589

66-
@transient
67-
private val settings = java.util.Collections.synchronizedMap(
68-
new java.util.HashMap[String, String]())
69-
7090
def set(props: Properties): Unit = {
71-
props.asScala.foreach { case (k, v) => this.settings.put(k, v) }
91+
settings.synchronized {
92+
props.asScala.foreach { case (k, v) => settings.put(k, v) }
93+
}
7294
}
7395

7496
def set(key: String, value: String): Unit = {
7597
require(key != null, "key cannot be null")
76-
require(value != null, s"value cannot be null for $key")
98+
require(value != null, s"value cannot be null for key: $key")
7799
settings.put(key, value)
78100
}
79101

@@ -100,16 +122,5 @@ trait SQLConf {
100122
private[spark] def clear() {
101123
settings.clear()
102124
}
103-
104125
}
105126

106-
object SQLConf {
107-
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
108-
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
109-
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
110-
val CODEGEN_ENABLED = "spark.sql.codegen"
111-
112-
object Deprecated {
113-
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
114-
}
115-
}

sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration
2424

2525
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
2626
import org.apache.spark.rdd.RDD
27-
import org.apache.spark.sql.catalyst.analysis._
2827
import org.apache.spark.sql.catalyst.ScalaReflection
29-
import org.apache.spark.sql.catalyst.expressions._
28+
import org.apache.spark.sql.catalyst.analysis._
3029
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
31-
import org.apache.spark.sql.catalyst.types._
30+
import org.apache.spark.sql.catalyst.expressions._
3231
import org.apache.spark.sql.catalyst.optimizer.Optimizer
3332
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
34-
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
33+
import org.apache.spark.sql.catalyst.rules.RuleExecutor
34+
import org.apache.spark.sql.catalyst.types._
3535
import org.apache.spark.sql.columnar.InMemoryRelation
3636
import org.apache.spark.sql.execution._
3737
import org.apache.spark.sql.execution.SparkStrategies
@@ -86,15 +86,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
8686
* @group userf
8787
*/
8888
implicit def createSchemaRDD[A <: Product: TypeTag](rdd: RDD[A]) =
89-
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))
89+
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
9090

9191
/**
9292
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
9393
*
9494
* @group userf
9595
*/
9696
def parquetFile(path: String): SchemaRDD =
97-
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration)))
97+
new SchemaRDD(this, parquet.ParquetRelation(path, Some(sparkContext.hadoopConfiguration), this))
9898

9999
/**
100100
* Loads a JSON file (one object per line), returning the result as a [[SchemaRDD]].
@@ -127,7 +127,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
127127
*/
128128
@Experimental
129129
def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
130-
new SchemaRDD(this, JsonRDD.inferSchema(json, samplingRatio))
130+
new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))
131131

132132
/**
133133
* :: Experimental ::
@@ -160,7 +160,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
160160
conf: Configuration = new Configuration()): SchemaRDD = {
161161
new SchemaRDD(
162162
this,
163-
ParquetRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf))
163+
ParquetRelation.createEmpty(
164+
path, ScalaReflection.attributesFor[A], allowExisting, conf, this))
164165
}
165166

166167
/**
@@ -170,11 +171,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
170171
* @group userf
171172
*/
172173
def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = {
173-
val name = tableName
174-
val newPlan = rdd.logicalPlan transform {
175-
case s @ SparkLogicalPlan(ExistingRdd(_, _), _) => s.copy(tableName = name)
176-
}
177-
catalog.registerTable(None, tableName, newPlan)
174+
catalog.registerTable(None, tableName, rdd.logicalPlan)
178175
}
179176

180177
/**
@@ -212,7 +209,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
212209
case inMem @ InMemoryRelation(_, _, e: ExistingRdd) =>
213210
inMem.cachedColumnBuffers.unpersist()
214211
catalog.unregisterTable(None, tableName)
215-
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
212+
catalog.registerTable(None, tableName, SparkLogicalPlan(e)(self))
216213
case inMem: InMemoryRelation =>
217214
inMem.cachedColumnBuffers.unpersist()
218215
catalog.unregisterTable(None, tableName)
@@ -413,7 +410,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
413410
new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
414411
}
415412
}
416-
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
413+
new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
417414
}
418415

419416
}

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,8 @@ class SchemaRDD(
418418
* @group schema
419419
*/
420420
private def applySchema(rdd: RDD[Row]): SchemaRDD = {
421-
new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd)))
421+
new SchemaRDD(sqlContext,
422+
SparkLogicalPlan(ExistingRdd(queryExecution.analyzed.output, rdd))(sqlContext))
422423
}
423424

424425
// =======================================================================

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private[sql] trait SchemaRDDLike {
5656
// happen right away to let these side effects take place eagerly.
5757
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
5858
queryExecution.toRdd
59-
SparkLogicalPlan(queryExecution.executedPlan)
59+
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
6060
case _ =>
6161
baseLogicalPlan
6262
}

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
7272
conf: Configuration = new Configuration()): JavaSchemaRDD = {
7373
new JavaSchemaRDD(
7474
sqlContext,
75-
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf))
75+
ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf, sqlContext))
7676
}
7777

7878
/**
@@ -92,7 +92,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
9292
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
9393
}
9494
}
95-
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
95+
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(sqlContext))
9696
}
9797

9898
/**
@@ -101,7 +101,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
101101
def parquetFile(path: String): JavaSchemaRDD =
102102
new JavaSchemaRDD(
103103
sqlContext,
104-
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration)))
104+
ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext))
105105

106106
/**
107107
* Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
@@ -120,7 +120,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
120120
* @group userf
121121
*/
122122
def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
123-
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(json, 1.0))
123+
new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))
124124

125125
/**
126126
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only

sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,26 +130,26 @@ case class GeneratedAggregate(
130130
// Builds a new custom class for holding the results of aggregation for a group.
131131
val initialValues = computeFunctions.flatMap(_.initialValues)
132132
val newAggregationBuffer = newProjection(initialValues, child.output)
133-
logger.info(s"Initial values: ${initialValues.mkString(",")}")
133+
log.info(s"Initial values: ${initialValues.mkString(",")}")
134134

135135
// A projection that computes the group given an input tuple.
136136
val groupProjection = newProjection(groupingExpressions, child.output)
137-
logger.info(s"Grouping Projection: ${groupingExpressions.mkString(",")}")
137+
log.info(s"Grouping Projection: ${groupingExpressions.mkString(",")}")
138138

139139
// A projection that is used to update the aggregate values for a group given a new tuple.
140140
// This projection should be targeted at the current values for the group and then applied
141141
// to a joined row of the current values with the new input row.
142142
val updateExpressions = computeFunctions.flatMap(_.update)
143143
val updateSchema = computeFunctions.flatMap(_.schema) ++ child.output
144144
val updateProjection = newMutableProjection(updateExpressions, updateSchema)()
145-
logger.info(s"Update Expressions: ${updateExpressions.mkString(",")}")
145+
log.info(s"Update Expressions: ${updateExpressions.mkString(",")}")
146146

147147
// A projection that produces the final result, given a computation.
148148
val resultProjectionBuilder =
149149
newMutableProjection(
150150
resultExpressions,
151151
(namedGroups.map(_._2.toAttribute) ++ computationSchema).toSeq)
152-
logger.info(s"Result Projection: ${resultExpressions.mkString(",")}")
152+
log.info(s"Result Projection: ${resultExpressions.mkString(",")}")
153153

154154
val joinedRow = new JoinedRow
155155

0 commit comments

Comments
 (0)