diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 89c41f31ff234..c68d7ccab4d10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2638,21 +2638,24 @@ object SQLConf { .booleanConf .createWithDefault(true) - val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED = - buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.enabled") + val COALESCE_BUCKETS_IN_JOIN_ENABLED = + buildConf("spark.sql.bucketing.coalesceBucketsInJoin.enabled") .doc("When true, if two bucketed tables with the different number of buckets are joined, " + "the side with a bigger number of buckets will be coalesced to have the same number " + - "of buckets as the other side. Bucket coalescing is applied only to sort-merge joins " + - "and only when the bigger number of buckets is divisible by the smaller number of buckets.") + "of buckets as the other side. Bigger number of buckets is divisible by the smaller " + + "number of buckets. Bucket coalescing is applied to sort-merge joins and " + + "shuffled hash join. Note: Coalescing bucketed table can avoid unnecessary shuffling " + + "in join, but it also reduces parallelism and could possibly cause OOM for " + + "shuffled hash join.") .version("3.1.0") .booleanConf .createWithDefault(false) - val COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO = - buildConf("spark.sql.bucketing.coalesceBucketsInSortMergeJoin.maxBucketRatio") + val COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO = + buildConf("spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio") .doc("The ratio of the number of two buckets being coalesced should be less than or " + "equal to this value for bucket coalescing to be applied. This configuration only " + - s"has an effect when '${COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key}' is set to true.") + s"has an effect when '${COALESCE_BUCKETS_IN_JOIN_ENABLED.key}' is set to true.") .version("3.1.0") .intConf .checkValue(_ > 0, "The difference must be positive.") @@ -3269,6 +3272,11 @@ class SQLConf extends Serializable with Logging { def metadataCacheTTL: Long = getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + def coalesceBucketsInJoinEnabled: Boolean = getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED) + + def coalesceBucketsInJoinMaxBucketRatio: Int = + getConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 791e432269632..e4b9322016cf0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.adaptive.{AdaptiveExecutionContext, InsertAdaptiveSparkPlan} -import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInSortMergeJoin +import org.apache.spark.sql.execution.bucketing.CoalesceBucketsInJoin import org.apache.spark.sql.execution.dynamicpruning.PlanDynamicPruningFilters import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.execution.streaming.{IncrementalExecution, OffsetSeqMetadata} @@ -332,7 +332,7 @@ object QueryExecution { // as the original plan is hidden behind `AdaptiveSparkPlanExec`. adaptiveExecutionRule.toSeq ++ Seq( - CoalesceBucketsInSortMergeJoin(sparkSession.sessionState.conf), + CoalesceBucketsInJoin(sparkSession.sessionState.conf), PlanDynamicPruningFilters(sparkSession), PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala new file mode 100644 index 0000000000000..22f308f331449 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import scala.annotation.tailrec + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.joins.{BaseJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule coalesces one side of the `SortMergeJoin` and `ShuffledHashJoin` + * if the following conditions are met: + * - Two bucketed tables are joined. + * - Join keys match with output partition expressions on their respective sides. + * - The larger bucket number is divisible by the smaller bucket number. + * - COALESCE_BUCKETS_IN_JOIN_ENABLED is set to true. + * - The ratio of the number of buckets is less than the value set in + * COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO. + */ +case class CoalesceBucketsInJoin(conf: SQLConf) extends Rule[SparkPlan] { + private def updateNumCoalescedBucketsInScan( + plan: SparkPlan, + numCoalescedBuckets: Int): SparkPlan = { + plan transformUp { + case f: FileSourceScanExec => + f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) + } + } + + private def updateNumCoalescedBuckets( + join: BaseJoinExec, + numLeftBuckets: Int, + numRightBucket: Int, + numCoalescedBuckets: Int): BaseJoinExec = { + if (numCoalescedBuckets != numLeftBuckets) { + val leftCoalescedChild = + updateNumCoalescedBucketsInScan(join.left, numCoalescedBuckets) + join match { + case j: SortMergeJoinExec => j.copy(left = leftCoalescedChild) + case j: ShuffledHashJoinExec => j.copy(left = leftCoalescedChild) + } + } else { + val rightCoalescedChild = + updateNumCoalescedBucketsInScan(join.right, numCoalescedBuckets) + join match { + case j: SortMergeJoinExec => j.copy(right = rightCoalescedChild) + case j: ShuffledHashJoinExec => j.copy(right = rightCoalescedChild) + } + } + } + + private def isCoalesceSHJStreamSide( + join: ShuffledHashJoinExec, + numLeftBuckets: Int, + numRightBucket: Int, + numCoalescedBuckets: Int): Boolean = { + if (numCoalescedBuckets == numLeftBuckets) { + join.buildSide != BuildRight + } else { + join.buildSide != BuildLeft + } + } + + def apply(plan: SparkPlan): SparkPlan = { + if (!conf.coalesceBucketsInJoinEnabled) { + return plan + } + + plan transform { + case ExtractJoinWithBuckets(join, numLeftBuckets, numRightBuckets) + if math.max(numLeftBuckets, numRightBuckets) / math.min(numLeftBuckets, numRightBuckets) <= + conf.coalesceBucketsInJoinMaxBucketRatio => + val numCoalescedBuckets = math.min(numLeftBuckets, numRightBuckets) + join match { + case j: SortMergeJoinExec => + updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + case j: ShuffledHashJoinExec + // Only coalesce the buckets for shuffled hash join stream side, + // to avoid OOM for build side. + if isCoalesceSHJStreamSide(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) => + updateNumCoalescedBuckets(j, numLeftBuckets, numRightBuckets, numCoalescedBuckets) + case other => other + } + case other => other + } + } +} + +/** + * An extractor that extracts `SortMergeJoinExec` and `ShuffledHashJoin`, + * where both sides of the join have the bucketed tables, + * are consisted of only the scan operation, + * and numbers of buckets are not equal but divisible. + */ +object ExtractJoinWithBuckets { + @tailrec + private def hasScanOperation(plan: SparkPlan): Boolean = plan match { + case f: FilterExec => hasScanOperation(f.child) + case p: ProjectExec => hasScanOperation(p.child) + case _: FileSourceScanExec => true + case _ => false + } + + private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = { + plan.collectFirst { + case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty && + f.optionalNumCoalescedBuckets.isEmpty => + f.relation.bucketSpec.get + } + } + + /** + * The join keys should match with expressions for output partitioning. Note that + * the ordering does not matter because it will be handled in `EnsureRequirements`. + */ + private def satisfiesOutputPartitioning( + keys: Seq[Expression], + partitioning: Partitioning): Boolean = { + partitioning match { + case HashPartitioning(exprs, _) if exprs.length == keys.length => + exprs.forall(e => keys.exists(_.semanticEquals(e))) + case _ => false + } + } + + private def isApplicable(j: BaseJoinExec): Boolean = { + (j.isInstanceOf[SortMergeJoinExec] || + j.isInstanceOf[ShuffledHashJoinExec]) && + hasScanOperation(j.left) && + hasScanOperation(j.right) && + satisfiesOutputPartitioning(j.leftKeys, j.left.outputPartitioning) && + satisfiesOutputPartitioning(j.rightKeys, j.right.outputPartitioning) + } + + private def isDivisible(numBuckets1: Int, numBuckets2: Int): Boolean = { + val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) + // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller + // number of buckets because bucket id is calculated by modding the total number of buckets. + numBuckets1 != numBuckets2 && large % small == 0 + } + + def unapply(plan: SparkPlan): Option[(BaseJoinExec, Int, Int)] = { + plan match { + case j: BaseJoinExec if isApplicable(j) => + val leftBucket = getBucketSpec(j.left) + val rightBucket = getBucketSpec(j.right) + if (leftBucket.isDefined && rightBucket.isDefined && + isDivisible(leftBucket.get.numBuckets, rightBucket.get.numBuckets)) { + Some(j, leftBucket.get.numBuckets, rightBucket.get.numBuckets) + } else { + None + } + case _ => None + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala deleted file mode 100644 index 3bb0597ecd87c..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoin.scala +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.bucketing - -import org.apache.spark.sql.catalyst.catalog.BucketSpec -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, ProjectExec, SparkPlan} -import org.apache.spark.sql.execution.joins.SortMergeJoinExec -import org.apache.spark.sql.internal.SQLConf - -/** - * This rule coalesces one side of the `SortMergeJoin` if the following conditions are met: - * - Two bucketed tables are joined. - * - Join keys match with output partition expressions on their respective sides. - * - The larger bucket number is divisible by the smaller bucket number. - * - COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED is set to true. - * - The ratio of the number of buckets is less than the value set in - * COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO. - */ -case class CoalesceBucketsInSortMergeJoin(conf: SQLConf) extends Rule[SparkPlan] { - private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): Option[Int] = { - assert(numBuckets1 != numBuckets2) - val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) - // A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller - // number of buckets because bucket id is calculated by modding the total number of buckets. - if (large % small == 0 && - large / small <= conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO)) { - Some(small) - } else { - None - } - } - - private def updateNumCoalescedBuckets(plan: SparkPlan, numCoalescedBuckets: Int): SparkPlan = { - plan.transformUp { - case f: FileSourceScanExec => - f.copy(optionalNumCoalescedBuckets = Some(numCoalescedBuckets)) - } - } - - def apply(plan: SparkPlan): SparkPlan = { - if (!conf.getConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED)) { - return plan - } - - plan transform { - case ExtractSortMergeJoinWithBuckets(smj, numLeftBuckets, numRightBuckets) - if numLeftBuckets != numRightBuckets => - mayCoalesce(numLeftBuckets, numRightBuckets, conf).map { numCoalescedBuckets => - if (numCoalescedBuckets != numLeftBuckets) { - smj.copy(left = updateNumCoalescedBuckets(smj.left, numCoalescedBuckets)) - } else { - smj.copy(right = updateNumCoalescedBuckets(smj.right, numCoalescedBuckets)) - } - }.getOrElse(smj) - case other => other - } - } -} - -/** - * An extractor that extracts `SortMergeJoinExec` where both sides of the join have the bucketed - * tables and are consisted of only the scan operation. - */ -object ExtractSortMergeJoinWithBuckets { - private def isScanOperation(plan: SparkPlan): Boolean = plan match { - case f: FilterExec => isScanOperation(f.child) - case p: ProjectExec => isScanOperation(p.child) - case _: FileSourceScanExec => true - case _ => false - } - - private def getBucketSpec(plan: SparkPlan): Option[BucketSpec] = { - plan.collectFirst { - case f: FileSourceScanExec if f.relation.bucketSpec.nonEmpty && - f.optionalNumCoalescedBuckets.isEmpty => - f.relation.bucketSpec.get - } - } - - /** - * The join keys should match with expressions for output partitioning. Note that - * the ordering does not matter because it will be handled in `EnsureRequirements`. - */ - private def satisfiesOutputPartitioning( - keys: Seq[Expression], - partitioning: Partitioning): Boolean = { - partitioning match { - case HashPartitioning(exprs, _) if exprs.length == keys.length => - exprs.forall(e => keys.exists(_.semanticEquals(e))) - case _ => false - } - } - - private def isApplicable(s: SortMergeJoinExec): Boolean = { - isScanOperation(s.left) && - isScanOperation(s.right) && - satisfiesOutputPartitioning(s.leftKeys, s.left.outputPartitioning) && - satisfiesOutputPartitioning(s.rightKeys, s.right.outputPartitioning) - } - - def unapply(plan: SparkPlan): Option[(SortMergeJoinExec, Int, Int)] = { - plan match { - case s: SortMergeJoinExec if isApplicable(s) => - val leftBucket = getBucketSpec(s.left) - val rightBucket = getBucketSpec(s.right) - if (leftBucket.isDefined && rightBucket.isDefined) { - Some(s, leftBucket.get.numBuckets, rightBucket.get.numBuckets) - } else { - None - } - case _ => None - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 70303792fdf1a..e43a4147ceb63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -347,7 +347,7 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite test("Coalesced bucket info should be a part of explain string") { withTable("t1", "t2") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { Seq(1, 2).toDF("i").write.bucketBy(8, "i").saveAsTable("t1") Seq(2, 3).toDF("i").write.bucketBy(4, "i").saveAsTable("t2") val df1 = spark.table("t1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala similarity index 55% rename from sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala index 6a70045c55e64..89aee37a4246f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInSortMergeJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoinSuite.scala @@ -19,17 +19,21 @@ package org.apache.spark.sql.execution.bucketing import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.optimizer.BuildLeft +import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide} import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.execution.{BinaryExecNode, FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InMemoryFileIndex, PartitionSpec} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types.{IntegerType, StructType} -class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkSession { +class CoalesceBucketsInJoinSuite extends SQLTestUtils with SharedSparkSession { + private val SORT_MERGE_JOIN = "sortMergeJoin" + private val SHUFFLED_HASH_JOIN = "shuffledHashJoin" + private val BROADCAST_HASH_JOIN = "broadcastHashJoin" + case class RelationSetting( cols: Seq[Attribute], numBuckets: Int, @@ -47,11 +51,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS rightKeys: Seq[Attribute], leftRelation: RelationSetting, rightRelation: RelationSetting, - isSortMergeJoin: Boolean) + joinOperator: String, + shjBuildSide: Option[BuildSide]) object JoinSetting { - def apply(l: RelationSetting, r: RelationSetting, isSortMergeJoin: Boolean): JoinSetting = { - JoinSetting(l.cols, r.cols, l, r, isSortMergeJoin) + def apply( + l: RelationSetting, + r: RelationSetting, + joinOperator: String = SORT_MERGE_JOIN, + shjBuildSide: Option[BuildSide] = None): JoinSetting = { + JoinSetting(l.cols, r.cols, l, r, joinOperator, shjBuildSide) } } @@ -73,17 +82,24 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS leftRelation = setting.rightRelation, rightRelation = setting.leftRelation) - Seq(setting, swappedSetting).foreach { case s => + val settings = if (setting.joinOperator != SHUFFLED_HASH_JOIN) { + Seq(setting, swappedSetting) + } else { + Seq(setting) + } + settings.foreach { s => val lScan = newFileSourceScanExec(s.leftRelation) val rScan = newFileSourceScanExec(s.rightRelation) - val join = if (s.isSortMergeJoin) { + val join = if (s.joinOperator == SORT_MERGE_JOIN) { SortMergeJoinExec(s.leftKeys, s.rightKeys, Inner, None, lScan, rScan) + } else if (s.joinOperator == SHUFFLED_HASH_JOIN) { + ShuffledHashJoinExec(s.leftKeys, s.rightKeys, Inner, s.shjBuildSide.get, None, lScan, rScan) } else { BroadcastHashJoinExec( s.leftKeys, s.rightKeys, Inner, BuildLeft, None, lScan, rScan) } - val plan = CoalesceBucketsInSortMergeJoin(spark.sessionState.conf)(join) + val plan = CoalesceBucketsInJoin(spark.sessionState.conf)(join) def verify(expected: Option[Int], subPlan: SparkPlan): Unit = { val coalesced = subPlan.collect { @@ -91,7 +107,7 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS f.optionalNumCoalescedBuckets.get } if (expected.isDefined) { - assert(coalesced.size == 1 && coalesced(0) == expected.get) + assert(coalesced.size == 1 && coalesced.head == expected.get) } else { assert(coalesced.isEmpty) } @@ -103,46 +119,73 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS } test("bucket coalescing - basic") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SORT_MERGE_JOIN)) run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, Some(4)), isSortMergeJoin = true)) + RelationSetting(4, None), RelationSetting(8, Some(4)), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") { - run(JoinSetting(RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = true)) + + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) } } - test("bucket coalescing should work only for sort merge join") { + test("bucket coalescing should work only for sort merge join and shuffled hash join") { Seq(true, false).foreach { enabled => - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> enabled.toString) { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> enabled.toString) { run(JoinSetting( - RelationSetting(4, None), RelationSetting(8, None), isSortMergeJoin = false)) + RelationSetting(4, None), RelationSetting(8, None), joinOperator = BROADCAST_HASH_JOIN)) } } } + test("bucket coalescing shouldn't be applied to shuffled hash join build side") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildRight))) + } + } + test("bucket coalescing shouldn't be applied when the number of buckets are the same") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { - run(JoinSetting(RelationSetting(8, None), RelationSetting(8, None), isSortMergeJoin = true)) + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(8, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(8, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) } } test("number of bucket is not divisible by other number of bucket") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { - run(JoinSetting(RelationSetting(3, None), RelationSetting(8, None), isSortMergeJoin = true)) + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { + run(JoinSetting( + RelationSetting(3, None), RelationSetting(8, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(3, None), RelationSetting(8, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) } } test("the ratio of the number of buckets is greater than max allowed") { - withSQLConf( - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { - run(JoinSetting(RelationSetting(4, None), RelationSetting(16, None), isSortMergeJoin = true)) + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SORT_MERGE_JOIN)) + run(JoinSetting( + RelationSetting(4, None), RelationSetting(16, None), joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) } } test("join keys should match with output partitioning") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { val lCols = Seq( AttributeReference("l1", IntegerType)(), AttributeReference("l2", IntegerType)()) @@ -160,7 +203,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS rightKeys = Seq(rCols.head), leftRelation = lRel, rightRelation = rRel, - isSortMergeJoin = true)) + joinOperator = SORT_MERGE_JOIN, + shjBuildSide = None)) + + run(JoinSetting( + leftKeys = Seq(lCols.head), + rightKeys = Seq(rCols.head), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) // The following should not be coalesced because join keys do not match with output // partitioning (more expressions). @@ -169,7 +221,16 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), leftRelation = lRel, rightRelation = rRel, - isSortMergeJoin = true)) + joinOperator = SORT_MERGE_JOIN, + shjBuildSide = None)) + + run(JoinSetting( + leftKeys = lCols :+ AttributeReference("l3", IntegerType)(), + rightKeys = rCols :+ AttributeReference("r3", IntegerType)(), + leftRelation = lRel, + rightRelation = rRel, + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) // The following will be coalesced since ordering should not matter because it will be // adjusted in `EnsureRequirements`. @@ -178,7 +239,24 @@ class CoalesceBucketsInSortMergeJoinSuite extends SQLTestUtils with SharedSparkS rightKeys = rCols.reverse, leftRelation = lRel, rightRelation = RelationSetting(rCols, 8, Some(4)), - isSortMergeJoin = true)) + joinOperator = SORT_MERGE_JOIN, + shjBuildSide = None)) + + run(JoinSetting( + leftKeys = lCols.reverse, + rightKeys = rCols.reverse, + leftRelation = lRel, + rightRelation = RelationSetting(rCols, 8, Some(4)), + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildLeft))) + + run(JoinSetting( + leftKeys = rCols.reverse, + rightKeys = lCols.reverse, + leftRelation = RelationSetting(rCols, 8, Some(4)), + rightRelation = lRel, + joinOperator = SHUFFLED_HASH_JOIN, + shjBuildSide = Some(BuildRight))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index b6767eb3132ea..98886d271e977 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -876,7 +876,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } test("bucket coalescing eliminates shuffle") { - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { // The side with bucketedTableTestSpec1 will be coalesced to have 4 output partitions. // Currently, sort will be introduced for the side that is coalesced. val testSpec1 = BucketedTableTestSpec( @@ -911,7 +911,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "false") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "false") { // Coalescing buckets is disabled by a config. run( BucketedTableTestSpec( @@ -921,8 +921,8 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } withSQLConf( - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_MAX_BUCKET_RATIO.key -> "2") { + SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true", + SQLConf.COALESCE_BUCKETS_IN_JOIN_MAX_BUCKET_RATIO.key -> "2") { // Coalescing buckets is not applied because the ratio of the number of buckets (3) // is greater than max allowed (2). run( @@ -932,7 +932,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { Some(BucketSpec(4, Seq("i", "j"), Seq("i", "j"))), expectedShuffle = true)) } - withSQLConf(SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + withSQLConf(SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { run( // Coalescing buckets is not applied because the bigger number of buckets (8) is not // divisible by the smaller number of buckets (7). @@ -950,7 +950,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withSQLConf( SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0", - SQLConf.COALESCE_BUCKETS_IN_SORT_MERGE_JOIN_ENABLED.key -> "true") { + SQLConf.COALESCE_BUCKETS_IN_JOIN_ENABLED.key -> "true") { def verify( query: String, expectedNumShuffles: Int, @@ -964,7 +964,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } if (expectedCoalescedNumBuckets.isDefined) { assert(scans.length == 1) - assert(scans(0).optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) + assert(scans.head.optionalNumCoalescedBuckets == expectedCoalescedNumBuckets) } else { assert(scans.isEmpty) }