Skip to content

Commit 56a8e6e

Browse files
Prototype impl of estimations for Catalyst logical plans.
- Also add simple size-getters for ParquetRelation and MetastoreRelation. - Add a rule to auto-convert equi-joins to BroadcastHashJoin, if a table has smaller size, based on the above getter (for MetastoreRelation).
1 parent 800ecff commit 56a8e6e

File tree

6 files changed

+95
-16
lines changed

6 files changed

+95
-16
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ import org.apache.spark.sql.catalyst.trees
2626
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
2727
self: Product =>
2828

29+
protected class Estimates {
30+
lazy val childrenEstimations = children.map(_.estimates)
31+
lazy val cardinality: Long = childrenEstimations.map(_.cardinality).sum
32+
lazy val numTuples: Long = childrenEstimations.map(_.size).sum
33+
lazy val size: Long = childrenEstimations.map(_.numTuples).sum
34+
}
35+
36+
/**
37+
* Estimates of various statistics.
38+
*/
39+
lazy val estimates: Estimates = new Estimates
40+
2941
/**
3042
* Returns the set of attributes that are referenced by this node
3143
* during evaluation.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
7171
condition,
7272
left,
7373
right @ PhysicalOperation(_, _, b: BaseRelation))
74-
if broadcastTables.contains(b.tableName) =>
75-
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
74+
if broadcastTables.contains(b.tableName)
75+
|| (right.estimates.size <= sqlContext.autoConvertJoinSize) =>
76+
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildRight)
7677

7778
case ExtractEquiJoinKeys(
7879
Inner,
@@ -81,7 +82,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
8182
condition,
8283
left @ PhysicalOperation(_, _, b: BaseRelation),
8384
right)
84-
if broadcastTables.contains(b.tableName) =>
85+
if broadcastTables.contains(b.tableName)
86+
|| (left.estimates.size <= sqlContext.autoConvertJoinSize) =>
8587
broadcastHashJoin(leftKeys, rightKeys, left, right, condition, BuildLeft)
8688

8789
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>

sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@ import java.io.IOException
2222
import org.apache.hadoop.conf.Configuration
2323
import org.apache.hadoop.fs.Path
2424
import org.apache.hadoop.fs.permission.FsAction
25+
import org.apache.hadoop.mapreduce.Job
2526

2627
import parquet.hadoop.ParquetOutputFormat
2728
import parquet.hadoop.metadata.CompressionCodecName
29+
import parquet.hadoop.util.ContextUtil
2830
import parquet.schema.MessageType
2931

32+
import org.apache.spark.sql.SQLContext
3033
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
3134
import org.apache.spark.sql.catalyst.expressions.Attribute
3235
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
@@ -43,12 +46,22 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LeafNode}
4346
*
4447
* @param path The path to the Parquet file.
4548
*/
49+
// TODO: make me a BaseRelation? For HashJoin strategy.
4650
private[sql] case class ParquetRelation(
4751
path: String,
4852
@transient conf: Option[Configuration] = None) extends LeafNode with MultiInstanceRelation {
4953

5054
self: Product =>
5155

56+
@transient override lazy val estimates = new Estimates {
57+
// TODO: investigate getting encoded column statistics in the parquet file?
58+
override lazy val size: Long = {
59+
val hdfsPath = new Path(path)
60+
val fs = hdfsPath.getFileSystem(conf.getOrElse(ContextUtil.getConfiguration(new Job())))
61+
fs.getContentSummary(hdfsPath).getLength // TODO: in bytes or system-dependent?
62+
}
63+
}
64+
5265
/** Schema derived from ParquetFile */
5366
def parquetSchema: MessageType =
5467
ParquetTypesConverter

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.sql
2020
import org.apache.spark.sql.TestData._
2121
import org.apache.spark.sql.catalyst.plans.{LeftOuter, RightOuter, FullOuter, Inner}
2222
import org.apache.spark.sql.execution._
23+
import org.apache.spark.sql.parquet.ParquetRelation
2324
import org.apache.spark.sql.test.TestSQLContext
2425
import org.apache.spark.sql.test.TestSQLContext._
2526

@@ -28,6 +29,14 @@ class JoinSuite extends QueryTest {
2829
// Ensures tables are loaded.
2930
TestData
3031

32+
test("parquet") {
33+
val data = parquetFile("../../points.parquet") // local file!
34+
val sizes = data.logicalPlan.collect { case j: ParquetRelation =>
35+
j.newInstance.estimates.size // also works without .newInstance
36+
}.toSeq
37+
assert(sizes.size === 1 && sizes(0) > 0)
38+
}
39+
3140
test("equi-join is hash-join") {
3241
val x = testData2.as('x)
3342
val y = testData2.as('y)

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ package org.apache.spark.sql.hive
1919

2020
import scala.util.parsing.combinator.RegexParsers
2121

22+
import org.apache.hadoop.fs.Path
23+
import org.apache.hadoop.hive.conf.HiveConf
2224
import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo}
2325
import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
2426
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
2527
import org.apache.hadoop.hive.ql.plan.TableDesc
26-
import org.apache.hadoop.hive.ql.session.SessionState
2728
import org.apache.hadoop.hive.serde2.Deserializer
2829

2930
import org.apache.spark.annotation.DeveloperApi
@@ -64,9 +65,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
6465

6566
// Since HiveQL is case insensitive for table names we make them all lowercase.
6667
MetastoreRelation(
67-
databaseName,
68-
tblName,
69-
alias)(table.getTTable, partitions.map(part => part.getTPartition))
68+
databaseName, tblName, alias)(
69+
table.getTTable, partitions.map(part => part.getTPartition))(
70+
hive.hiveconf, table.getPath)
7071
}
7172

7273
def createTable(
@@ -251,7 +252,11 @@ object HiveMetastoreTypes extends RegexParsers {
251252
private[hive] case class MetastoreRelation
252253
(databaseName: String, tableName: String, alias: Option[String])
253254
(val table: TTable, val partitions: Seq[TPartition])
255+
(@transient hiveConf: HiveConf, @transient path: Path)
254256
extends BaseRelation {
257+
258+
self: Product =>
259+
255260
// TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of table and
256261
// use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements of partitions.
257262
// Right now, using org.apache.hadoop.hive.ql.metadata.Table and
@@ -264,6 +269,19 @@ private[hive] case class MetastoreRelation
264269
new Partition(hiveQlTable, p)
265270
}
266271

272+
// TODO: are there any stats in hiveQlTable.getSkewedInfo that we can use?
273+
@transient override lazy val estimates = new Estimates {
274+
// Size getters adapted from
275+
// https://github.com/apache/hive/blob/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java
276+
override lazy val size: Long =
277+
maybeGetSize(hiveConf, hiveQlTable.getProperty("totalSize"), path)
278+
279+
private[this] def maybeGetSize(conf: HiveConf, size: String, path: Path): Long = {
280+
val res = try { Some(size.toLong) } catch { case _: Exception => None }
281+
res.getOrElse { path.getFileSystem(conf).getContentSummary(path).getLength }
282+
}
283+
}
284+
267285
val tableDesc = new TableDesc(
268286
Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]],
269287
hiveQlTable.getInputFormatClass,
@@ -275,14 +293,14 @@ private[hive] case class MetastoreRelation
275293
hiveQlTable.getMetadata
276294
)
277295

278-
implicit class SchemaAttribute(f: FieldSchema) {
279-
def toAttribute = AttributeReference(
280-
f.getName,
281-
HiveMetastoreTypes.toDataType(f.getType),
282-
// Since data can be dumped in randomly with no validation, everything is nullable.
283-
nullable = true
284-
)(qualifiers = tableName +: alias.toSeq)
285-
}
296+
implicit class SchemaAttribute(f: FieldSchema) {
297+
def toAttribute = AttributeReference(
298+
f.getName,
299+
HiveMetastoreTypes.toDataType(f.getType),
300+
// Since data can be dumped in randomly with no validation, everything is nullable.
301+
nullable = true
302+
)(qualifiers = tableName +: alias.toSeq)
303+
}
286304

287305
// Must be a stable value since new attributes are born here.
288306
val partitionKeys = hiveQlTable.getPartitionKeys.map(_.toAttribute)

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
package org.apache.spark.sql.hive.execution
1919

20+
import org.apache.spark.sql.execution.{BuildRight, BroadcastHashJoin}
21+
2022
import scala.util.Try
2123

24+
import org.apache.spark.sql.{SchemaRDD, Row}
25+
import org.apache.spark.sql.hive.MetastoreRelation
2226
import org.apache.spark.sql.hive.test.TestHive
2327
import org.apache.spark.sql.hive.test.TestHive._
24-
import org.apache.spark.sql.{SchemaRDD, Row}
2528

2629
case class TestData(a: Int, b: String)
2730

@@ -48,6 +51,28 @@ class HiveQuerySuite extends HiveComparisonTest {
4851
"Incorrect number of rows in created table")
4952
}
5053

54+
// TODO: put me in a separate EstimateSuite?
55+
test("BHJ by size") {
56+
hql("""SET spark.sql.join.broadcastTables=""") // reset broadcast tables
57+
// TODO: use two different tables?
58+
// assume src has small size
59+
val rdd = hql("""SELECT * FROM src a JOIN src b ON a.key = b.key""")
60+
val physical = rdd.queryExecution.sparkPlan
61+
val bhj = physical.collect { case j: BroadcastHashJoin => j }
62+
println(s"${rdd.queryExecution}")
63+
assert(bhj.size === 1)
64+
}
65+
66+
// TODO: put me in a separate EstimateSuite?
67+
test("estimates the size of a MetastoreRelation") {
68+
val rdd = hql("""SELECT * FROM src""")
69+
println(s"${rdd.queryExecution}")
70+
val sizes = rdd.queryExecution.analyzed.collect { case mr: MetastoreRelation =>
71+
mr.estimates.size
72+
}.toSeq
73+
assert(sizes.size === 1 && sizes(0) > 0)
74+
}
75+
5176
createQueryTest("between",
5277
"SELECT * FROM src WHERE key Between 1 and 2")
5378

0 commit comments

Comments
 (0)