Skip to content

Commit 5819623

Browse files
xingchaozhGitHub Enterprise
authored andcommitted
[CARMEL-5986] Make Coalesce/Rebucketing effective when bucketing disabled by planner (#961)
* [CARMEL-5986] Make Coalesce/Rebucketing effective when bucketing disabled by planner * fix ut
1 parent d6fd502 commit 5819623

File tree

7 files changed

+30
-15
lines changed

7 files changed

+30
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,12 +329,12 @@ object QueryExecution {
329329
PlanSubqueries(sparkSession),
330330
EliminateHintPlaceHolder,
331331
EnsureRequirements,
332-
DisableUnnecessaryBucketedScan,
333332
// `RemoveRedundantSorts` needs to be added after `EnsureRequirements` to guarantee the same
334333
// number of partitions when instantiating PartitioningCollection.
335334
RemoveRedundantSorts,
336335
EnsureRepartitionForWriting,
337336
EliminateShuffleExec,
337+
DisableUnnecessaryBucketedScan,
338338
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules),
339339
CollapseCodegenStages(),
340340
ReuseExchange,

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,10 @@ case class AdaptiveSparkPlanExec(
9191
private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
9292
eliminateHintPlaceHolder,
9393
ensureRequirements,
94-
DisableUnnecessaryBucketedScan,
9594
removeRedundantSorts,
9695
EnsureRepartitionForWriting,
97-
EliminateShuffleExec
96+
EliminateShuffleExec,
97+
DisableUnnecessaryBucketedScan
9898
) ++ context.session.sessionState.queryStagePrepRules
9999

100100
@transient private val initialPlan = context.session.withActive {

sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ package org.apache.spark.sql.execution.bucketing
1919

2020
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashClusteredDistribution}
2121
import org.apache.spark.sql.catalyst.rules.Rule
22-
import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SortExec, SparkPlan}
22+
import org.apache.spark.sql.execution._
2323
import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
2424
import org.apache.spark.sql.execution.exchange.Exchange
25-
import org.apache.spark.sql.internal.SQLConf
2625

2726
/**
2827
* Disable unnecessary bucketed table scan based on actual physical query plan.
@@ -120,7 +119,12 @@ object DisableUnnecessaryBucketedScan extends Rule[SparkPlan] {
120119
}
121120

122121
private def hasInterestingPartition(plan: SparkPlan): Boolean = {
123-
plan.requiredChildDistribution.exists {
122+
val isReplacedShuffle = plan match {
123+
case _: RebucketingExec | _: CoalesceExec => true
124+
case _ => false
125+
}
126+
127+
isReplacedShuffle || plan.requiredChildDistribution.exists {
124128
case _: ClusteredDistribution | _: HashClusteredDistribution | AllTuples => true
125129
case _ => false
126130
}

sql/core/src/test/scala/org/apache/spark/sql/DynamicBloomFilterJoinPruningSuite.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,25 @@ abstract class DynamicBloomFilterPruningSuiteBase
4343
private val tableFormat: String = "parquet"
4444
private var originalAutoBroadcastJoinThreshold: Long = _
4545
private var originalParquetCompressionCodec: String = _
46+
private var originalAutoBucketedScan: Boolean = _
4647

4748

4849
override def beforeAll(): Unit = {
4950
super.beforeAll()
5051
originalAutoBroadcastJoinThreshold = conf.autoBroadcastJoinThreshold
5152
originalParquetCompressionCodec = conf.parquetCompressionCodec
53+
originalAutoBucketedScan = conf.autoBucketedScanEnabled
5254
conf.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, -1L)
55+
conf.setConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED, true)
5356
}
5457

5558
override def afterAll(): Unit = {
5659
spark.sessionState.conf.unsetConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED)
5760
spark.sessionState.conf.unsetConf(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY)
61+
spark.sessionState.conf.unsetConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED)
5862
conf.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, originalAutoBroadcastJoinThreshold)
5963
conf.setConf(SQLConf.PARQUET_COMPRESSION, originalParquetCompressionCodec)
64+
conf.setConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED, originalAutoBucketedScan)
6065
super.afterAll()
6166
}
6267

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1381,7 +1381,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
13811381
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
13821382
SQLConf.ENABLE_COALESCE.key -> "true",
13831383
SQLConf.ENABLE_REBUCKETING.key -> "false",
1384-
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen) {
1384+
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen,
1385+
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
13851386
Seq("inner").foreach { joinType =>
13861387
val df = spark.sql(s"select a.* from " +
13871388
s"(select * from $tblName distribute by col2) a " +

sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
174174
"""
175175
SELECT /*+ broadcast(t1) merge(t3)*/ * FROM t1 JOIN t2 JOIN t3
176176
ON t1.i = t2.i AND t2.i = t3.i
177-
""".stripMargin, 2, 3), // TODO 2->3 if ENABLE_REBUCKETING=false
177+
""".stripMargin, 3, 3), // TODO 3->2 if ENABLE_REBUCKETING=false
178178
(
179179
"""
180180
SELECT /*+ merge(t1) broadcast(t3)*/ * FROM t1 JOIN t2 JOIN t3
@@ -184,7 +184,7 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
184184
"""
185185
SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3
186186
ON t1.i = t2.i AND t2.i = t3.i
187-
""".stripMargin, 2, 3), // TODO 2->3 if ENABLE_REBUCKETING=false
187+
""".stripMargin, 3, 3), // TODO 3->2 if ENABLE_REBUCKETING=false
188188
// Multiple joins on non-bucketed columns
189189
(
190190
"""
@@ -195,7 +195,7 @@ abstract class DisableUnnecessaryBucketedScanSuite extends QueryTest
195195
"""
196196
SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3
197197
ON t1.i = t2.j AND t2.j = t3.i
198-
""".stripMargin, 1, 3), // TODO 1->2 if ENABLE_REBUCKETING=false
198+
""".stripMargin, 2, 3), // TODO 2->1 if ENABLE_REBUCKETING=false
199199
(
200200
"""
201201
SELECT /*+ merge(t1, t3)*/ * FROM t1 JOIN t2 JOIN t3

sql/core/src/test/scala/org/apache/spark/sql/sources/MultipleBucketJoinSuite.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ class MultipleBucketJoinSuite extends SharedSparkSession with AdaptiveSparkPlanH
177177
test("Join two bucketed tables with double bucket number of another") {
178178
val res = 0 until 2000 map(i => Row(i, i.toString, i, i.toString))
179179
withSQLConf(SQLConf.ENABLE_REBUCKETING.key -> "false",
180-
SQLConf.ENABLE_COALESCE.key -> "true") {
180+
SQLConf.ENABLE_COALESCE.key -> "true",
181+
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
181182
testBucketing(s"SELECT * FROM $table1 a JOIN $table2 b ON a.key = b.key",
182183
BucketedTableTestSpec(bucketSpec = bucketSpec1, expectSort = true),
183184
BucketedTableTestSpec(bucketSpec = bucketSpec2),
@@ -305,7 +306,8 @@ class MultipleBucketJoinSuite extends SharedSparkSession with AdaptiveSparkPlanH
305306
""".stripMargin
306307
val res = 0 until 2000 map(i => Row(i, i.toString, "part0", i, i.toString, "part1"))
307308
withSQLConf(SQLConf.ENABLE_REBUCKETING.key -> "false",
308-
SQLConf.ENABLE_COALESCE.key -> "true") {
309+
SQLConf.ENABLE_COALESCE.key -> "true",
310+
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
309311
testBucketing(query,
310312
BucketedTableTestSpec(bucketSpec = bucketSpec1, expectSort = true),
311313
BucketedTableTestSpec(bucketSpec = bucketSpec2),
@@ -338,7 +340,8 @@ class MultipleBucketJoinSuite extends SharedSparkSession with AdaptiveSparkPlanH
338340
val res1 = 0 until 2000 map(i => Row(i, i.toString, "part0", i, i.toString, "part0"))
339341
val res2 = 0 until 2000 map(i => Row(i, i.toString, "part1", i, i.toString, "part0"))
340342
withSQLConf(SQLConf.ENABLE_REBUCKETING.key -> "false",
341-
SQLConf.ENABLE_COALESCE.key -> "true") {
343+
SQLConf.ENABLE_COALESCE.key -> "true",
344+
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
342345
testBucketing(
343346
query,
344347
BucketedTableTestSpec(bucketSpec = bucketSpec1, expectSort = true),
@@ -377,7 +380,8 @@ class MultipleBucketJoinSuite extends SharedSparkSession with AdaptiveSparkPlanH
377380
val res1 = 0 until 2000 map(i => Row(i, i.toString, "part0", i, i.toString, "part0"))
378381
val res2 = 0 until 2000 map(i => Row(i, i.toString, "part0", i, i.toString, "part1"))
379382
withSQLConf(SQLConf.ENABLE_REBUCKETING.key -> "false",
380-
SQLConf.ENABLE_COALESCE.key -> "true") {
383+
SQLConf.ENABLE_COALESCE.key -> "true",
384+
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
381385
testBucketing(
382386
query,
383387
BucketedTableTestSpec(bucketSpec = bucketSpec1, expectSort = true),
@@ -415,7 +419,8 @@ class MultipleBucketJoinSuite extends SharedSparkSession with AdaptiveSparkPlanH
415419
val res4 = 0 until 2000 map(i => Row(i, i.toString, "part1", i, i.toString, "part1"))
416420

417421
withSQLConf(SQLConf.ENABLE_REBUCKETING.key -> "false",
418-
SQLConf.ENABLE_COALESCE.key -> "true") {
422+
SQLConf.ENABLE_COALESCE.key -> "true",
423+
SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
419424
testBucketing(
420425
query,
421426
BucketedTableTestSpec(bucketSpec = bucketSpec1, expectSort = true),

0 commit comments

Comments
 (0)