@@ -20,7 +20,8 @@ package org.apache.spark.sql.hive
2020import scala .reflect .ClassTag
2121
2222import org .apache .spark .sql .QueryTest
23- import org .apache .spark .sql .execution .BroadcastHashJoin
23+ import org .apache .spark .sql .execution .{BroadcastHashJoin , ShuffledHashJoin }
24+ import org .apache .spark .sql .hive .test .TestHive
2425import org .apache .spark .sql .hive .test .TestHive ._
2526import org .apache .spark .sql .parquet .{ParquetRelation , ParquetTestData }
2627import org .apache .spark .util .Utils
@@ -36,7 +37,7 @@ class StatisticsSuite extends QueryTest {
3637 }
3738 assert(sizes.size === 1 )
3839 assert(sizes(0 )._1 == sizes(0 )._2, " after .newInstance, estimates are different from before" )
39- assert(sizes(0 )._1 > 0 )
40+ assert(sizes(0 )._1 > 1 , " 1 is the default, indicating the absence of a meaningful estimate " )
4041
4142 Utils .deleteRecursively(ParquetTestData .testDir)
4243 }
@@ -46,7 +47,8 @@ class StatisticsSuite extends QueryTest {
4647 val sizes = rdd.queryExecution.analyzed.collect { case mr : MetastoreRelation =>
4748 mr.statistics.sizeInBytes
4849 }
49- assert(sizes.size === 1 && sizes(0 ) > 0 )
50+ assert(sizes.size === 1 )
51+ assert(sizes(0 ) > 1 , " 1 is the default, indicating the absence of a meaningful estimate" )
5052 }
5153
5254 test(" auto converts to broadcast hash join, by size estimate of a relation" ) {
@@ -62,8 +64,7 @@ class StatisticsSuite extends QueryTest {
6264
6365 // Assert src has a size smaller than the threshold.
6466 val sizes = rdd.queryExecution.analyzed.collect {
65- case r if ct.runtimeClass.isAssignableFrom(r.getClass) =>
66- r.statistics.sizeInBytes
67+ case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
6768 }
6869 assert(sizes.size === 2 && sizes(0 ) <= autoConvertJoinSize,
6970 s " query should contain two relations, each of which has size smaller than autoConvertSize " )
@@ -74,16 +75,22 @@ class StatisticsSuite extends QueryTest {
7475 assert(bhj.size === 1 ,
7576 s " actual query plans do not contain broadcast join: ${rdd.queryExecution}" )
7677
77- checkAnswer(rdd, expectedAnswer)
78+ checkAnswer(rdd, expectedAnswer) // check correctness of output
79+
80+ TestHive .settings.synchronized {
81+ val tmp = autoConvertJoinSize
7882
79- // TODO(zongheng): synchronize on TestHive.settings, or use Sequential/Stepwise.
80- val tmp = autoConvertJoinSize
81- hql(""" SET spark.sql.auto.convert.join.size=0""" )
82- rdd = hql(query)
83- bhj = rdd.queryExecution.sparkPlan.collect { case j : BroadcastHashJoin => j }
84- assert(bhj.isEmpty)
83+ hql(""" SET spark.sql.auto.convert.join.size=-1""" )
84+ rdd = hql(query)
85+ bhj = rdd.queryExecution.sparkPlan.collect { case j : BroadcastHashJoin => j }
86+ assert(bhj.isEmpty, " BroadcastHashJoin still planned even though it is switched off" )
8587
86- hql(s """ SET spark.sql.auto.convert.join.size= $tmp""" )
88+ val shj = rdd.queryExecution.sparkPlan.collect { case j : ShuffledHashJoin => j }
89+ assert(shj.size === 1 ,
90+ " ShuffledHashJoin should be planned when BroadcastHashJoin is turned off" )
91+
92+ hql(s """ SET spark.sql.auto.convert.join.size= $tmp""" )
93+ }
8794
8895 after()
8996 }
0 commit comments